rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_io::Timer;
  10use async_std::task;
  11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  12use collections::{HashMap, HashSet};
  13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{
  21    any::TypeId,
  22    future::Future,
  23    sync::Arc,
  24    time::{Duration, Instant},
  25};
  26use store::{Store, Worktree};
  27use surf::StatusCode;
  28use tide::log;
  29use tide::{
  30    http::headers::{HeaderName, CONNECTION, UPGRADE},
  31    Request, Response,
  32};
  33use time::OffsetDateTime;
  34
  35type MessageHandler = Box<
  36    dyn Send
  37        + Sync
  38        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  39>;
  40
  41pub struct Server {
  42    peer: Arc<Peer>,
  43    store: RwLock<Store>,
  44    app_state: Arc<AppState>,
  45    handlers: HashMap<TypeId, MessageHandler>,
  46    notifications: Option<mpsc::UnboundedSender<()>>,
  47}
  48
  49pub trait Executor: Send + Clone {
  50    type Timer: Send + Future;
  51    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  52    fn timer(&self, duration: Duration) -> Self::Timer;
  53}
  54
  55#[derive(Clone)]
  56pub struct RealExecutor;
  57
  58const MESSAGE_COUNT_PER_PAGE: usize = 100;
  59const MAX_MESSAGE_LEN: usize = 1024;
  60
  61impl Server {
  62    pub fn new(
  63        app_state: Arc<AppState>,
  64        peer: Arc<Peer>,
  65        notifications: Option<mpsc::UnboundedSender<()>>,
  66    ) -> Arc<Self> {
  67        let mut server = Self {
  68            peer,
  69            app_state,
  70            store: Default::default(),
  71            handlers: Default::default(),
  72            notifications,
  73        };
  74
  75        server
  76            .add_request_handler(Server::ping)
  77            .add_request_handler(Server::register_project)
  78            .add_message_handler(Server::unregister_project)
  79            .add_request_handler(Server::share_project)
  80            .add_message_handler(Server::unshare_project)
  81            .add_request_handler(Server::join_project)
  82            .add_message_handler(Server::leave_project)
  83            .add_request_handler(Server::register_worktree)
  84            .add_message_handler(Server::unregister_worktree)
  85            .add_request_handler(Server::update_worktree)
  86            .add_message_handler(Server::update_diagnostic_summary)
  87            .add_message_handler(Server::disk_based_diagnostics_updating)
  88            .add_message_handler(Server::disk_based_diagnostics_updated)
  89            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
  90            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
  91            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
  92            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
  93            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
  94            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
  95            .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
  96            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
  97            .add_request_handler(
  98                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
  99            )
 100            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 101            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 102            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 103            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 104            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 105            .add_message_handler(Server::close_buffer)
 106            .add_request_handler(Server::update_buffer)
 107            .add_message_handler(Server::update_buffer_file)
 108            .add_message_handler(Server::buffer_reloaded)
 109            .add_message_handler(Server::buffer_saved)
 110            .add_request_handler(Server::save_buffer)
 111            .add_request_handler(Server::get_channels)
 112            .add_request_handler(Server::get_users)
 113            .add_request_handler(Server::join_channel)
 114            .add_message_handler(Server::leave_channel)
 115            .add_request_handler(Server::send_channel_message)
 116            .add_request_handler(Server::get_channel_messages);
 117
 118        Arc::new(server)
 119    }
 120
 121    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 122    where
 123        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 124        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 125        M: EnvelopedMessage,
 126    {
 127        let prev_handler = self.handlers.insert(
 128            TypeId::of::<M>(),
 129            Box::new(move |server, envelope| {
 130                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 131                (handler)(server, *envelope).boxed()
 132            }),
 133        );
 134        if prev_handler.is_some() {
 135            panic!("registered a handler for the same message twice");
 136        }
 137        self
 138    }
 139
 140    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 141    where
 142        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 143        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 144        M: RequestMessage,
 145    {
 146        self.add_message_handler(move |server, envelope| {
 147            let receipt = envelope.receipt();
 148            let response = (handler)(server.clone(), envelope);
 149            async move {
 150                match response.await {
 151                    Ok(response) => {
 152                        server.peer.respond(receipt, response)?;
 153                        Ok(())
 154                    }
 155                    Err(error) => {
 156                        server.peer.respond_with_error(
 157                            receipt,
 158                            proto::Error {
 159                                message: error.to_string(),
 160                            },
 161                        )?;
 162                        Err(error)
 163                    }
 164                }
 165            }
 166        })
 167    }
 168
 169    pub fn handle_connection<E: Executor>(
 170        self: &Arc<Self>,
 171        connection: Connection,
 172        addr: String,
 173        user_id: UserId,
 174        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 175        executor: E,
 176    ) -> impl Future<Output = ()> {
 177        let mut this = self.clone();
 178        async move {
 179            let (connection_id, handle_io, mut incoming_rx) = this
 180                .peer
 181                .add_connection(connection, {
 182                    let executor = executor.clone();
 183                    move |duration| {
 184                        let timer = executor.timer(duration);
 185                        async move {
 186                            timer.await;
 187                        }
 188                    }
 189                })
 190                .await;
 191
 192            if let Some(send_connection_id) = send_connection_id.as_mut() {
 193                let _ = send_connection_id.send(connection_id).await;
 194            }
 195
 196            this.state_mut().add_connection(connection_id, user_id);
 197            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 198                log::error!("error updating contacts for {:?}: {}", user_id, err);
 199            }
 200
 201            let handle_io = handle_io.fuse();
 202            futures::pin_mut!(handle_io);
 203            loop {
 204                let next_message = incoming_rx.next().fuse();
 205                futures::pin_mut!(next_message);
 206                futures::select_biased! {
 207                    result = handle_io => {
 208                        if let Err(err) = result {
 209                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 210                        }
 211                        break;
 212                    }
 213                    message = next_message => {
 214                        if let Some(message) = message {
 215                            let start_time = Instant::now();
 216                            let type_name = message.payload_type_name();
 217                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 218                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 219                                let notifications = this.notifications.clone();
 220                                let is_background = message.is_background();
 221                                let handle_message = (handler)(this.clone(), message);
 222                                let handle_message = async move {
 223                                    if let Err(err) = handle_message.await {
 224                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 225                                    } else {
 226                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 227                                    }
 228                                    if let Some(mut notifications) = notifications {
 229                                        let _ = notifications.send(()).await;
 230                                    }
 231                                };
 232                                if is_background {
 233                                    executor.spawn_detached(handle_message);
 234                                } else {
 235                                    handle_message.await;
 236                                }
 237                            } else {
 238                                log::warn!("unhandled message: {}", type_name);
 239                            }
 240                        } else {
 241                            log::info!("rpc connection closed {:?}", addr);
 242                            break;
 243                        }
 244                    }
 245                }
 246            }
 247
 248            if let Err(err) = this.sign_out(connection_id).await {
 249                log::error!("error signing out connection {:?} - {:?}", addr, err);
 250            }
 251        }
 252    }
 253
 254    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 255        self.peer.disconnect(connection_id);
 256        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 257
 258        for (project_id, project) in removed_connection.hosted_projects {
 259            if let Some(share) = project.share {
 260                broadcast(
 261                    connection_id,
 262                    share.guests.keys().copied().collect(),
 263                    |conn_id| {
 264                        self.peer
 265                            .send(conn_id, proto::UnshareProject { project_id })
 266                    },
 267                )?;
 268            }
 269        }
 270
 271        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 272            broadcast(connection_id, peer_ids, |conn_id| {
 273                self.peer.send(
 274                    conn_id,
 275                    proto::RemoveProjectCollaborator {
 276                        project_id,
 277                        peer_id: connection_id.0,
 278                    },
 279                )
 280            })?;
 281        }
 282
 283        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 284        Ok(())
 285    }
 286
 287    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 288        Ok(proto::Ack {})
 289    }
 290
 291    async fn register_project(
 292        mut self: Arc<Server>,
 293        request: TypedEnvelope<proto::RegisterProject>,
 294    ) -> tide::Result<proto::RegisterProjectResponse> {
 295        let project_id = {
 296            let mut state = self.state_mut();
 297            let user_id = state.user_id_for_connection(request.sender_id)?;
 298            state.register_project(request.sender_id, user_id)
 299        };
 300        Ok(proto::RegisterProjectResponse { project_id })
 301    }
 302
 303    async fn unregister_project(
 304        mut self: Arc<Server>,
 305        request: TypedEnvelope<proto::UnregisterProject>,
 306    ) -> tide::Result<()> {
 307        let project = self
 308            .state_mut()
 309            .unregister_project(request.payload.project_id, request.sender_id)?;
 310        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 311        Ok(())
 312    }
 313
 314    async fn share_project(
 315        mut self: Arc<Server>,
 316        request: TypedEnvelope<proto::ShareProject>,
 317    ) -> tide::Result<proto::Ack> {
 318        self.state_mut()
 319            .share_project(request.payload.project_id, request.sender_id);
 320        Ok(proto::Ack {})
 321    }
 322
 323    async fn unshare_project(
 324        mut self: Arc<Server>,
 325        request: TypedEnvelope<proto::UnshareProject>,
 326    ) -> tide::Result<()> {
 327        let project_id = request.payload.project_id;
 328        let project = self
 329            .state_mut()
 330            .unshare_project(project_id, request.sender_id)?;
 331
 332        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 333            self.peer
 334                .send(conn_id, proto::UnshareProject { project_id })
 335        })?;
 336        self.update_contacts_for_users(&project.authorized_user_ids)?;
 337        Ok(())
 338    }
 339
 340    async fn join_project(
 341        mut self: Arc<Server>,
 342        request: TypedEnvelope<proto::JoinProject>,
 343    ) -> tide::Result<proto::JoinProjectResponse> {
 344        let project_id = request.payload.project_id;
 345
 346        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 347        let (response, connection_ids, contact_user_ids) = self
 348            .state_mut()
 349            .join_project(request.sender_id, user_id, project_id)
 350            .and_then(|joined| {
 351                let share = joined.project.share()?;
 352                let peer_count = share.guests.len();
 353                let mut collaborators = Vec::with_capacity(peer_count);
 354                collaborators.push(proto::Collaborator {
 355                    peer_id: joined.project.host_connection_id.0,
 356                    replica_id: 0,
 357                    user_id: joined.project.host_user_id.to_proto(),
 358                });
 359                let worktrees = share
 360                    .worktrees
 361                    .iter()
 362                    .filter_map(|(id, shared_worktree)| {
 363                        let worktree = joined.project.worktrees.get(&id)?;
 364                        Some(proto::Worktree {
 365                            id: *id,
 366                            root_name: worktree.root_name.clone(),
 367                            entries: shared_worktree.entries.values().cloned().collect(),
 368                            diagnostic_summaries: shared_worktree
 369                                .diagnostic_summaries
 370                                .values()
 371                                .cloned()
 372                                .collect(),
 373                            visible: worktree.visible,
 374                        })
 375                    })
 376                    .collect();
 377                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 378                    if *peer_conn_id != request.sender_id {
 379                        collaborators.push(proto::Collaborator {
 380                            peer_id: peer_conn_id.0,
 381                            replica_id: *peer_replica_id as u32,
 382                            user_id: peer_user_id.to_proto(),
 383                        });
 384                    }
 385                }
 386                let response = proto::JoinProjectResponse {
 387                    worktrees,
 388                    replica_id: joined.replica_id as u32,
 389                    collaborators,
 390                };
 391                let connection_ids = joined.project.connection_ids();
 392                let contact_user_ids = joined.project.authorized_user_ids();
 393                Ok((response, connection_ids, contact_user_ids))
 394            })?;
 395
 396        broadcast(request.sender_id, connection_ids, |conn_id| {
 397            self.peer.send(
 398                conn_id,
 399                proto::AddProjectCollaborator {
 400                    project_id,
 401                    collaborator: Some(proto::Collaborator {
 402                        peer_id: request.sender_id.0,
 403                        replica_id: response.replica_id,
 404                        user_id: user_id.to_proto(),
 405                    }),
 406                },
 407            )
 408        })?;
 409        self.update_contacts_for_users(&contact_user_ids)?;
 410        Ok(response)
 411    }
 412
 413    async fn leave_project(
 414        mut self: Arc<Server>,
 415        request: TypedEnvelope<proto::LeaveProject>,
 416    ) -> tide::Result<()> {
 417        let sender_id = request.sender_id;
 418        let project_id = request.payload.project_id;
 419        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 420
 421        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 422            self.peer.send(
 423                conn_id,
 424                proto::RemoveProjectCollaborator {
 425                    project_id,
 426                    peer_id: sender_id.0,
 427                },
 428            )
 429        })?;
 430        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 431
 432        Ok(())
 433    }
 434
 435    async fn register_worktree(
 436        mut self: Arc<Server>,
 437        request: TypedEnvelope<proto::RegisterWorktree>,
 438    ) -> tide::Result<proto::Ack> {
 439        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 440
 441        let mut contact_user_ids = HashSet::default();
 442        contact_user_ids.insert(host_user_id);
 443        for github_login in &request.payload.authorized_logins {
 444            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 445            contact_user_ids.insert(contact_user_id);
 446        }
 447
 448        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 449        let guest_connection_ids;
 450        {
 451            let mut state = self.state_mut();
 452            guest_connection_ids = state
 453                .read_project(request.payload.project_id, request.sender_id)?
 454                .guest_connection_ids();
 455            state.register_worktree(
 456                request.payload.project_id,
 457                request.payload.worktree_id,
 458                request.sender_id,
 459                Worktree {
 460                    authorized_user_ids: contact_user_ids.clone(),
 461                    root_name: request.payload.root_name.clone(),
 462                    visible: request.payload.visible,
 463                },
 464            )?;
 465        }
 466        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 467            self.peer
 468                .forward_send(request.sender_id, connection_id, request.payload.clone())
 469        })?;
 470        self.update_contacts_for_users(&contact_user_ids)?;
 471        Ok(proto::Ack {})
 472    }
 473
 474    async fn unregister_worktree(
 475        mut self: Arc<Server>,
 476        request: TypedEnvelope<proto::UnregisterWorktree>,
 477    ) -> tide::Result<()> {
 478        let project_id = request.payload.project_id;
 479        let worktree_id = request.payload.worktree_id;
 480        let (worktree, guest_connection_ids) =
 481            self.state_mut()
 482                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 483        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 484            self.peer.send(
 485                conn_id,
 486                proto::UnregisterWorktree {
 487                    project_id,
 488                    worktree_id,
 489                },
 490            )
 491        })?;
 492        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 493        Ok(())
 494    }
 495
 496    async fn update_worktree(
 497        mut self: Arc<Server>,
 498        request: TypedEnvelope<proto::UpdateWorktree>,
 499    ) -> tide::Result<proto::Ack> {
 500        let connection_ids = self.state_mut().update_worktree(
 501            request.sender_id,
 502            request.payload.project_id,
 503            request.payload.worktree_id,
 504            &request.payload.removed_entries,
 505            &request.payload.updated_entries,
 506        )?;
 507
 508        broadcast(request.sender_id, connection_ids, |connection_id| {
 509            self.peer
 510                .forward_send(request.sender_id, connection_id, request.payload.clone())
 511        })?;
 512
 513        Ok(proto::Ack {})
 514    }
 515
 516    async fn update_diagnostic_summary(
 517        mut self: Arc<Server>,
 518        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 519    ) -> tide::Result<()> {
 520        let summary = request
 521            .payload
 522            .summary
 523            .clone()
 524            .ok_or_else(|| anyhow!("invalid summary"))?;
 525        let receiver_ids = self.state_mut().update_diagnostic_summary(
 526            request.payload.project_id,
 527            request.payload.worktree_id,
 528            request.sender_id,
 529            summary,
 530        )?;
 531
 532        broadcast(request.sender_id, receiver_ids, |connection_id| {
 533            self.peer
 534                .forward_send(request.sender_id, connection_id, request.payload.clone())
 535        })?;
 536        Ok(())
 537    }
 538
 539    async fn disk_based_diagnostics_updating(
 540        self: Arc<Server>,
 541        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 542    ) -> tide::Result<()> {
 543        let receiver_ids = self
 544            .state()
 545            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 546        broadcast(request.sender_id, receiver_ids, |connection_id| {
 547            self.peer
 548                .forward_send(request.sender_id, connection_id, request.payload.clone())
 549        })?;
 550        Ok(())
 551    }
 552
 553    async fn disk_based_diagnostics_updated(
 554        self: Arc<Server>,
 555        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 556    ) -> tide::Result<()> {
 557        let receiver_ids = self
 558            .state()
 559            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 560        broadcast(request.sender_id, receiver_ids, |connection_id| {
 561            self.peer
 562                .forward_send(request.sender_id, connection_id, request.payload.clone())
 563        })?;
 564        Ok(())
 565    }
 566
 567    async fn forward_project_request<T>(
 568        self: Arc<Server>,
 569        request: TypedEnvelope<T>,
 570    ) -> tide::Result<T::Response>
 571    where
 572        T: EntityMessage + RequestMessage,
 573    {
 574        let host_connection_id = self
 575            .state()
 576            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 577            .host_connection_id;
 578        Ok(self
 579            .peer
 580            .forward_request(request.sender_id, host_connection_id, request.payload)
 581            .await?)
 582    }
 583
 584    async fn close_buffer(
 585        self: Arc<Server>,
 586        request: TypedEnvelope<proto::CloseBuffer>,
 587    ) -> tide::Result<()> {
 588        let host_connection_id = self
 589            .state()
 590            .read_project(request.payload.project_id, request.sender_id)?
 591            .host_connection_id;
 592        self.peer
 593            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 594        Ok(())
 595    }
 596
 597    async fn save_buffer(
 598        self: Arc<Server>,
 599        request: TypedEnvelope<proto::SaveBuffer>,
 600    ) -> tide::Result<proto::BufferSaved> {
 601        let host;
 602        let mut guests;
 603        {
 604            let state = self.state();
 605            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 606            host = project.host_connection_id;
 607            guests = project.guest_connection_ids()
 608        }
 609
 610        let response = self
 611            .peer
 612            .forward_request(request.sender_id, host, request.payload.clone())
 613            .await?;
 614
 615        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 616        broadcast(host, guests, |conn_id| {
 617            self.peer.forward_send(host, conn_id, response.clone())
 618        })?;
 619
 620        Ok(response)
 621    }
 622
 623    async fn update_buffer(
 624        self: Arc<Server>,
 625        request: TypedEnvelope<proto::UpdateBuffer>,
 626    ) -> tide::Result<proto::Ack> {
 627        let receiver_ids = self
 628            .state()
 629            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 630        broadcast(request.sender_id, receiver_ids, |connection_id| {
 631            self.peer
 632                .forward_send(request.sender_id, connection_id, request.payload.clone())
 633        })?;
 634        Ok(proto::Ack {})
 635    }
 636
 637    async fn update_buffer_file(
 638        self: Arc<Server>,
 639        request: TypedEnvelope<proto::UpdateBufferFile>,
 640    ) -> tide::Result<()> {
 641        let receiver_ids = self
 642            .state()
 643            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 644        broadcast(request.sender_id, receiver_ids, |connection_id| {
 645            self.peer
 646                .forward_send(request.sender_id, connection_id, request.payload.clone())
 647        })?;
 648        Ok(())
 649    }
 650
 651    async fn buffer_reloaded(
 652        self: Arc<Server>,
 653        request: TypedEnvelope<proto::BufferReloaded>,
 654    ) -> tide::Result<()> {
 655        let receiver_ids = self
 656            .state()
 657            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 658        broadcast(request.sender_id, receiver_ids, |connection_id| {
 659            self.peer
 660                .forward_send(request.sender_id, connection_id, request.payload.clone())
 661        })?;
 662        Ok(())
 663    }
 664
 665    async fn buffer_saved(
 666        self: Arc<Server>,
 667        request: TypedEnvelope<proto::BufferSaved>,
 668    ) -> tide::Result<()> {
 669        let receiver_ids = self
 670            .state()
 671            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 672        broadcast(request.sender_id, receiver_ids, |connection_id| {
 673            self.peer
 674                .forward_send(request.sender_id, connection_id, request.payload.clone())
 675        })?;
 676        Ok(())
 677    }
 678
 679    async fn get_channels(
 680        self: Arc<Server>,
 681        request: TypedEnvelope<proto::GetChannels>,
 682    ) -> tide::Result<proto::GetChannelsResponse> {
 683        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 684        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 685        Ok(proto::GetChannelsResponse {
 686            channels: channels
 687                .into_iter()
 688                .map(|chan| proto::Channel {
 689                    id: chan.id.to_proto(),
 690                    name: chan.name,
 691                })
 692                .collect(),
 693        })
 694    }
 695
 696    async fn get_users(
 697        self: Arc<Server>,
 698        request: TypedEnvelope<proto::GetUsers>,
 699    ) -> tide::Result<proto::GetUsersResponse> {
 700        let user_ids = request
 701            .payload
 702            .user_ids
 703            .into_iter()
 704            .map(UserId::from_proto)
 705            .collect();
 706        let users = self
 707            .app_state
 708            .db
 709            .get_users_by_ids(user_ids)
 710            .await?
 711            .into_iter()
 712            .map(|user| proto::User {
 713                id: user.id.to_proto(),
 714                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 715                github_login: user.github_login,
 716            })
 717            .collect();
 718        Ok(proto::GetUsersResponse { users })
 719    }
 720
 721    fn update_contacts_for_users<'a>(
 722        self: &Arc<Server>,
 723        user_ids: impl IntoIterator<Item = &'a UserId>,
 724    ) -> anyhow::Result<()> {
 725        let mut result = Ok(());
 726        let state = self.state();
 727        for user_id in user_ids {
 728            let contacts = state.contacts_for_user(*user_id);
 729            for connection_id in state.connection_ids_for_user(*user_id) {
 730                if let Err(error) = self.peer.send(
 731                    connection_id,
 732                    proto::UpdateContacts {
 733                        contacts: contacts.clone(),
 734                    },
 735                ) {
 736                    result = Err(error);
 737                }
 738            }
 739        }
 740        result
 741    }
 742
 743    async fn join_channel(
 744        mut self: Arc<Self>,
 745        request: TypedEnvelope<proto::JoinChannel>,
 746    ) -> tide::Result<proto::JoinChannelResponse> {
 747        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 748        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 749        if !self
 750            .app_state
 751            .db
 752            .can_user_access_channel(user_id, channel_id)
 753            .await?
 754        {
 755            Err(anyhow!("access denied"))?;
 756        }
 757
 758        self.state_mut().join_channel(request.sender_id, channel_id);
 759        let messages = self
 760            .app_state
 761            .db
 762            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 763            .await?
 764            .into_iter()
 765            .map(|msg| proto::ChannelMessage {
 766                id: msg.id.to_proto(),
 767                body: msg.body,
 768                timestamp: msg.sent_at.unix_timestamp() as u64,
 769                sender_id: msg.sender_id.to_proto(),
 770                nonce: Some(msg.nonce.as_u128().into()),
 771            })
 772            .collect::<Vec<_>>();
 773        Ok(proto::JoinChannelResponse {
 774            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 775            messages,
 776        })
 777    }
 778
 779    async fn leave_channel(
 780        mut self: Arc<Self>,
 781        request: TypedEnvelope<proto::LeaveChannel>,
 782    ) -> tide::Result<()> {
 783        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 784        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 785        if !self
 786            .app_state
 787            .db
 788            .can_user_access_channel(user_id, channel_id)
 789            .await?
 790        {
 791            Err(anyhow!("access denied"))?;
 792        }
 793
 794        self.state_mut()
 795            .leave_channel(request.sender_id, channel_id);
 796
 797        Ok(())
 798    }
 799
 800    async fn send_channel_message(
 801        self: Arc<Self>,
 802        request: TypedEnvelope<proto::SendChannelMessage>,
 803    ) -> tide::Result<proto::SendChannelMessageResponse> {
 804        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 805        let user_id;
 806        let connection_ids;
 807        {
 808            let state = self.state();
 809            user_id = state.user_id_for_connection(request.sender_id)?;
 810            connection_ids = state.channel_connection_ids(channel_id)?;
 811        }
 812
 813        // Validate the message body.
 814        let body = request.payload.body.trim().to_string();
 815        if body.len() > MAX_MESSAGE_LEN {
 816            return Err(anyhow!("message is too long"))?;
 817        }
 818        if body.is_empty() {
 819            return Err(anyhow!("message can't be blank"))?;
 820        }
 821
 822        let timestamp = OffsetDateTime::now_utc();
 823        let nonce = request
 824            .payload
 825            .nonce
 826            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 827
 828        let message_id = self
 829            .app_state
 830            .db
 831            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 832            .await?
 833            .to_proto();
 834        let message = proto::ChannelMessage {
 835            sender_id: user_id.to_proto(),
 836            id: message_id,
 837            body,
 838            timestamp: timestamp.unix_timestamp() as u64,
 839            nonce: Some(nonce),
 840        };
 841        broadcast(request.sender_id, connection_ids, |conn_id| {
 842            self.peer.send(
 843                conn_id,
 844                proto::ChannelMessageSent {
 845                    channel_id: channel_id.to_proto(),
 846                    message: Some(message.clone()),
 847                },
 848            )
 849        })?;
 850        Ok(proto::SendChannelMessageResponse {
 851            message: Some(message),
 852        })
 853    }
 854
 855    async fn get_channel_messages(
 856        self: Arc<Self>,
 857        request: TypedEnvelope<proto::GetChannelMessages>,
 858    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 859        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 860        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 861        if !self
 862            .app_state
 863            .db
 864            .can_user_access_channel(user_id, channel_id)
 865            .await?
 866        {
 867            Err(anyhow!("access denied"))?;
 868        }
 869
 870        let messages = self
 871            .app_state
 872            .db
 873            .get_channel_messages(
 874                channel_id,
 875                MESSAGE_COUNT_PER_PAGE,
 876                Some(MessageId::from_proto(request.payload.before_message_id)),
 877            )
 878            .await?
 879            .into_iter()
 880            .map(|msg| proto::ChannelMessage {
 881                id: msg.id.to_proto(),
 882                body: msg.body,
 883                timestamp: msg.sent_at.unix_timestamp() as u64,
 884                sender_id: msg.sender_id.to_proto(),
 885                nonce: Some(msg.nonce.as_u128().into()),
 886            })
 887            .collect::<Vec<_>>();
 888
 889        Ok(proto::GetChannelMessagesResponse {
 890            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 891            messages,
 892        })
 893    }
 894
 895    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 896        self.store.read()
 897    }
 898
 899    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 900        self.store.write()
 901    }
 902}
 903
 904impl Executor for RealExecutor {
 905    type Timer = Timer;
 906
 907    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 908        task::spawn(future);
 909    }
 910
 911    fn timer(&self, duration: Duration) -> Self::Timer {
 912        Timer::after(duration)
 913    }
 914}
 915
 916fn broadcast<F>(
 917    sender_id: ConnectionId,
 918    receiver_ids: Vec<ConnectionId>,
 919    mut f: F,
 920) -> anyhow::Result<()>
 921where
 922    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 923{
 924    let mut result = Ok(());
 925    for receiver_id in receiver_ids {
 926        if receiver_id != sender_id {
 927            if let Err(error) = f(receiver_id) {
 928                if result.is_ok() {
 929                    result = Err(error);
 930                }
 931            }
 932        }
 933    }
 934    result
 935}
 936
 937pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 938    let server = Server::new(app.state().clone(), rpc.clone(), None);
 939    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 940        let server = server.clone();
 941        async move {
 942            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 943
 944            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 945            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 946            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 947            let client_protocol_version: Option<u32> = request
 948                .header("X-Zed-Protocol-Version")
 949                .and_then(|v| v.as_str().parse().ok());
 950
 951            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 952                return Ok(Response::new(StatusCode::UpgradeRequired));
 953            }
 954
 955            let header = match request.header("Sec-Websocket-Key") {
 956                Some(h) => h.as_str(),
 957                None => return Err(anyhow!("expected sec-websocket-key"))?,
 958            };
 959
 960            let user_id = process_auth_header(&request).await?;
 961
 962            let mut response = Response::new(StatusCode::SwitchingProtocols);
 963            response.insert_header(UPGRADE, "websocket");
 964            response.insert_header(CONNECTION, "Upgrade");
 965            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 966            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 967            response.insert_header("Sec-Websocket-Version", "13");
 968
 969            let http_res: &mut tide::http::Response = response.as_mut();
 970            let upgrade_receiver = http_res.recv_upgrade().await;
 971            let addr = request.remote().unwrap_or("unknown").to_string();
 972            task::spawn(async move {
 973                if let Some(stream) = upgrade_receiver.await {
 974                    server
 975                        .handle_connection(
 976                            Connection::new(
 977                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 978                            ),
 979                            addr,
 980                            user_id,
 981                            None,
 982                            RealExecutor,
 983                        )
 984                        .await;
 985                }
 986            });
 987
 988            Ok(response)
 989        }
 990    });
 991}
 992
 993fn header_contains_ignore_case<T>(
 994    request: &tide::Request<T>,
 995    header_name: HeaderName,
 996    value: &str,
 997) -> bool {
 998    request
 999        .header(header_name)
1000        .map(|h| {
1001            h.as_str()
1002                .split(',')
1003                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1004        })
1005        .unwrap_or(false)
1006}
1007
1008#[cfg(test)]
1009mod tests {
1010    use super::*;
1011    use crate::{
1012        auth,
1013        db::{tests::TestDb, UserId},
1014        github, AppState, Config,
1015    };
1016    use ::rpc::Peer;
1017    use client::{
1018        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1019        EstablishConnectionError, UserStore,
1020    };
1021    use collections::BTreeMap;
1022    use editor::{
1023        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1024        Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1025    };
1026    use gpui::{executor, ModelHandle, TestAppContext};
1027    use language::{
1028        tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1029        LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1030    };
1031    use lsp;
1032    use parking_lot::Mutex;
1033    use postage::{barrier, watch};
1034    use project::{
1035        fs::{FakeFs, Fs as _},
1036        search::SearchQuery,
1037        worktree::WorktreeHandle,
1038        DiagnosticSummary, Project, ProjectPath,
1039    };
1040    use rand::prelude::*;
1041    use rpc::PeerId;
1042    use serde_json::json;
1043    use sqlx::types::time::OffsetDateTime;
1044    use std::{
1045        cell::Cell,
1046        env,
1047        ops::Deref,
1048        path::{Path, PathBuf},
1049        rc::Rc,
1050        sync::{
1051            atomic::{AtomicBool, Ordering::SeqCst},
1052            Arc,
1053        },
1054        time::Duration,
1055    };
1056    use workspace::{Settings, Workspace, WorkspaceParams};
1057
1058    #[cfg(test)]
1059    #[ctor::ctor]
1060    fn init_logger() {
1061        if std::env::var("RUST_LOG").is_ok() {
1062            env_logger::init();
1063        }
1064    }
1065
1066    #[gpui::test(iterations = 10)]
1067    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1068        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1069        let lang_registry = Arc::new(LanguageRegistry::test());
1070        let fs = FakeFs::new(cx_a.background());
1071        cx_a.foreground().forbid_parking();
1072
1073        // Connect to a server as 2 clients.
1074        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1075        let client_a = server.create_client(cx_a, "user_a").await;
1076        let client_b = server.create_client(cx_b, "user_b").await;
1077
1078        // Share a project as client A
1079        fs.insert_tree(
1080            "/a",
1081            json!({
1082                ".zed.toml": r#"collaborators = ["user_b"]"#,
1083                "a.txt": "a-contents",
1084                "b.txt": "b-contents",
1085            }),
1086        )
1087        .await;
1088        let project_a = cx_a.update(|cx| {
1089            Project::local(
1090                client_a.clone(),
1091                client_a.user_store.clone(),
1092                lang_registry.clone(),
1093                fs.clone(),
1094                cx,
1095            )
1096        });
1097        let (worktree_a, _) = project_a
1098            .update(cx_a, |p, cx| {
1099                p.find_or_create_local_worktree("/a", true, cx)
1100            })
1101            .await
1102            .unwrap();
1103        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1104        worktree_a
1105            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1106            .await;
1107        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1108        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1109
1110        // Join that project as client B
1111        let project_b = Project::remote(
1112            project_id,
1113            client_b.clone(),
1114            client_b.user_store.clone(),
1115            lang_registry.clone(),
1116            fs.clone(),
1117            &mut cx_b.to_async(),
1118        )
1119        .await
1120        .unwrap();
1121
1122        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1123            assert_eq!(
1124                project
1125                    .collaborators()
1126                    .get(&client_a.peer_id)
1127                    .unwrap()
1128                    .user
1129                    .github_login,
1130                "user_a"
1131            );
1132            project.replica_id()
1133        });
1134        project_a
1135            .condition(&cx_a, |tree, _| {
1136                tree.collaborators()
1137                    .get(&client_b.peer_id)
1138                    .map_or(false, |collaborator| {
1139                        collaborator.replica_id == replica_id_b
1140                            && collaborator.user.github_login == "user_b"
1141                    })
1142            })
1143            .await;
1144
1145        // Open the same file as client B and client A.
1146        let buffer_b = project_b
1147            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1148            .await
1149            .unwrap();
1150        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1151        buffer_b.read_with(cx_b, |buf, cx| {
1152            assert_eq!(buf.read(cx).text(), "b-contents")
1153        });
1154        project_a.read_with(cx_a, |project, cx| {
1155            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1156        });
1157        let buffer_a = project_a
1158            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1159            .await
1160            .unwrap();
1161
1162        let editor_b = cx_b.add_view(window_b, |cx| {
1163            Editor::for_buffer(
1164                buffer_b,
1165                None,
1166                watch::channel_with(Settings::test(cx)).1,
1167                cx,
1168            )
1169        });
1170
1171        // TODO
1172        // // Create a selection set as client B and see that selection set as client A.
1173        // buffer_a
1174        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1175        //     .await;
1176
1177        // Edit the buffer as client B and see that edit as client A.
1178        editor_b.update(cx_b, |editor, cx| {
1179            editor.handle_input(&Input("ok, ".into()), cx)
1180        });
1181        buffer_a
1182            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1183            .await;
1184
1185        // TODO
1186        // // Remove the selection set as client B, see those selections disappear as client A.
1187        cx_b.update(move |_| drop(editor_b));
1188        // buffer_a
1189        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1190        //     .await;
1191
1192        // Dropping the client B's project removes client B from client A's collaborators.
1193        cx_b.update(move |_| drop(project_b));
1194        project_a
1195            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1196            .await;
1197    }
1198
1199    #[gpui::test(iterations = 10)]
1200    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1201        let lang_registry = Arc::new(LanguageRegistry::test());
1202        let fs = FakeFs::new(cx_a.background());
1203        cx_a.foreground().forbid_parking();
1204
1205        // Connect to a server as 2 clients.
1206        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1207        let client_a = server.create_client(cx_a, "user_a").await;
1208        let client_b = server.create_client(cx_b, "user_b").await;
1209
1210        // Share a project as client A
1211        fs.insert_tree(
1212            "/a",
1213            json!({
1214                ".zed.toml": r#"collaborators = ["user_b"]"#,
1215                "a.txt": "a-contents",
1216                "b.txt": "b-contents",
1217            }),
1218        )
1219        .await;
1220        let project_a = cx_a.update(|cx| {
1221            Project::local(
1222                client_a.clone(),
1223                client_a.user_store.clone(),
1224                lang_registry.clone(),
1225                fs.clone(),
1226                cx,
1227            )
1228        });
1229        let (worktree_a, _) = project_a
1230            .update(cx_a, |p, cx| {
1231                p.find_or_create_local_worktree("/a", true, cx)
1232            })
1233            .await
1234            .unwrap();
1235        worktree_a
1236            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1237            .await;
1238        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1239        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1240        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1241        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1242
1243        // Join that project as client B
1244        let project_b = Project::remote(
1245            project_id,
1246            client_b.clone(),
1247            client_b.user_store.clone(),
1248            lang_registry.clone(),
1249            fs.clone(),
1250            &mut cx_b.to_async(),
1251        )
1252        .await
1253        .unwrap();
1254        project_b
1255            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1256            .await
1257            .unwrap();
1258
1259        // Unshare the project as client A
1260        project_a
1261            .update(cx_a, |project, cx| project.unshare(cx))
1262            .await
1263            .unwrap();
1264        project_b
1265            .condition(cx_b, |project, _| project.is_read_only())
1266            .await;
1267        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1268        cx_b.update(|_| {
1269            drop(project_b);
1270        });
1271
1272        // Share the project again and ensure guests can still join.
1273        project_a
1274            .update(cx_a, |project, cx| project.share(cx))
1275            .await
1276            .unwrap();
1277        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1278
1279        let project_b2 = Project::remote(
1280            project_id,
1281            client_b.clone(),
1282            client_b.user_store.clone(),
1283            lang_registry.clone(),
1284            fs.clone(),
1285            &mut cx_b.to_async(),
1286        )
1287        .await
1288        .unwrap();
1289        project_b2
1290            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1291            .await
1292            .unwrap();
1293    }
1294
1295    #[gpui::test(iterations = 10)]
1296    async fn test_propagate_saves_and_fs_changes(
1297        cx_a: &mut TestAppContext,
1298        cx_b: &mut TestAppContext,
1299        cx_c: &mut TestAppContext,
1300    ) {
1301        let lang_registry = Arc::new(LanguageRegistry::test());
1302        let fs = FakeFs::new(cx_a.background());
1303        cx_a.foreground().forbid_parking();
1304
1305        // Connect to a server as 3 clients.
1306        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1307        let client_a = server.create_client(cx_a, "user_a").await;
1308        let client_b = server.create_client(cx_b, "user_b").await;
1309        let client_c = server.create_client(cx_c, "user_c").await;
1310
1311        // Share a worktree as client A.
1312        fs.insert_tree(
1313            "/a",
1314            json!({
1315                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1316                "file1": "",
1317                "file2": ""
1318            }),
1319        )
1320        .await;
1321        let project_a = cx_a.update(|cx| {
1322            Project::local(
1323                client_a.clone(),
1324                client_a.user_store.clone(),
1325                lang_registry.clone(),
1326                fs.clone(),
1327                cx,
1328            )
1329        });
1330        let (worktree_a, _) = project_a
1331            .update(cx_a, |p, cx| {
1332                p.find_or_create_local_worktree("/a", true, cx)
1333            })
1334            .await
1335            .unwrap();
1336        worktree_a
1337            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1338            .await;
1339        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1340        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1341        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1342
1343        // Join that worktree as clients B and C.
1344        let project_b = Project::remote(
1345            project_id,
1346            client_b.clone(),
1347            client_b.user_store.clone(),
1348            lang_registry.clone(),
1349            fs.clone(),
1350            &mut cx_b.to_async(),
1351        )
1352        .await
1353        .unwrap();
1354        let project_c = Project::remote(
1355            project_id,
1356            client_c.clone(),
1357            client_c.user_store.clone(),
1358            lang_registry.clone(),
1359            fs.clone(),
1360            &mut cx_c.to_async(),
1361        )
1362        .await
1363        .unwrap();
1364        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1365        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1366
1367        // Open and edit a buffer as both guests B and C.
1368        let buffer_b = project_b
1369            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1370            .await
1371            .unwrap();
1372        let buffer_c = project_c
1373            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1374            .await
1375            .unwrap();
1376        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1377        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1378
1379        // Open and edit that buffer as the host.
1380        let buffer_a = project_a
1381            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1382            .await
1383            .unwrap();
1384
1385        buffer_a
1386            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1387            .await;
1388        buffer_a.update(cx_a, |buf, cx| {
1389            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1390        });
1391
1392        // Wait for edits to propagate
1393        buffer_a
1394            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1395            .await;
1396        buffer_b
1397            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1398            .await;
1399        buffer_c
1400            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1401            .await;
1402
1403        // Edit the buffer as the host and concurrently save as guest B.
1404        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1405        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1406        save_b.await.unwrap();
1407        assert_eq!(
1408            fs.load("/a/file1".as_ref()).await.unwrap(),
1409            "hi-a, i-am-c, i-am-b, i-am-a"
1410        );
1411        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1412        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1413        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1414
1415        worktree_a.flush_fs_events(cx_a).await;
1416
1417        // Make changes on host's file system, see those changes on guest worktrees.
1418        fs.rename(
1419            "/a/file1".as_ref(),
1420            "/a/file1-renamed".as_ref(),
1421            Default::default(),
1422        )
1423        .await
1424        .unwrap();
1425
1426        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1427            .await
1428            .unwrap();
1429        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1430
1431        worktree_a
1432            .condition(&cx_a, |tree, _| {
1433                tree.paths()
1434                    .map(|p| p.to_string_lossy())
1435                    .collect::<Vec<_>>()
1436                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1437            })
1438            .await;
1439        worktree_b
1440            .condition(&cx_b, |tree, _| {
1441                tree.paths()
1442                    .map(|p| p.to_string_lossy())
1443                    .collect::<Vec<_>>()
1444                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1445            })
1446            .await;
1447        worktree_c
1448            .condition(&cx_c, |tree, _| {
1449                tree.paths()
1450                    .map(|p| p.to_string_lossy())
1451                    .collect::<Vec<_>>()
1452                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1453            })
1454            .await;
1455
1456        // Ensure buffer files are updated as well.
1457        buffer_a
1458            .condition(&cx_a, |buf, _| {
1459                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1460            })
1461            .await;
1462        buffer_b
1463            .condition(&cx_b, |buf, _| {
1464                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1465            })
1466            .await;
1467        buffer_c
1468            .condition(&cx_c, |buf, _| {
1469                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1470            })
1471            .await;
1472    }
1473
1474    #[gpui::test(iterations = 10)]
1475    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1476        cx_a.foreground().forbid_parking();
1477        let lang_registry = Arc::new(LanguageRegistry::test());
1478        let fs = FakeFs::new(cx_a.background());
1479
1480        // Connect to a server as 2 clients.
1481        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1482        let client_a = server.create_client(cx_a, "user_a").await;
1483        let client_b = server.create_client(cx_b, "user_b").await;
1484
1485        // Share a project as client A
1486        fs.insert_tree(
1487            "/dir",
1488            json!({
1489                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1490                "a.txt": "a-contents",
1491            }),
1492        )
1493        .await;
1494
1495        let project_a = cx_a.update(|cx| {
1496            Project::local(
1497                client_a.clone(),
1498                client_a.user_store.clone(),
1499                lang_registry.clone(),
1500                fs.clone(),
1501                cx,
1502            )
1503        });
1504        let (worktree_a, _) = project_a
1505            .update(cx_a, |p, cx| {
1506                p.find_or_create_local_worktree("/dir", true, cx)
1507            })
1508            .await
1509            .unwrap();
1510        worktree_a
1511            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1512            .await;
1513        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1514        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1515        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1516
1517        // Join that project as client B
1518        let project_b = Project::remote(
1519            project_id,
1520            client_b.clone(),
1521            client_b.user_store.clone(),
1522            lang_registry.clone(),
1523            fs.clone(),
1524            &mut cx_b.to_async(),
1525        )
1526        .await
1527        .unwrap();
1528
1529        // Open a buffer as client B
1530        let buffer_b = project_b
1531            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1532            .await
1533            .unwrap();
1534
1535        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1536        buffer_b.read_with(cx_b, |buf, _| {
1537            assert!(buf.is_dirty());
1538            assert!(!buf.has_conflict());
1539        });
1540
1541        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1542        buffer_b
1543            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1544            .await;
1545        buffer_b.read_with(cx_b, |buf, _| {
1546            assert!(!buf.has_conflict());
1547        });
1548
1549        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1550        buffer_b.read_with(cx_b, |buf, _| {
1551            assert!(buf.is_dirty());
1552            assert!(!buf.has_conflict());
1553        });
1554    }
1555
1556    #[gpui::test(iterations = 10)]
1557    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1558        cx_a.foreground().forbid_parking();
1559        let lang_registry = Arc::new(LanguageRegistry::test());
1560        let fs = FakeFs::new(cx_a.background());
1561
1562        // Connect to a server as 2 clients.
1563        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1564        let client_a = server.create_client(cx_a, "user_a").await;
1565        let client_b = server.create_client(cx_b, "user_b").await;
1566
1567        // Share a project as client A
1568        fs.insert_tree(
1569            "/dir",
1570            json!({
1571                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1572                "a.txt": "a-contents",
1573            }),
1574        )
1575        .await;
1576
1577        let project_a = cx_a.update(|cx| {
1578            Project::local(
1579                client_a.clone(),
1580                client_a.user_store.clone(),
1581                lang_registry.clone(),
1582                fs.clone(),
1583                cx,
1584            )
1585        });
1586        let (worktree_a, _) = project_a
1587            .update(cx_a, |p, cx| {
1588                p.find_or_create_local_worktree("/dir", true, cx)
1589            })
1590            .await
1591            .unwrap();
1592        worktree_a
1593            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1594            .await;
1595        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1596        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1597        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1598
1599        // Join that project as client B
1600        let project_b = Project::remote(
1601            project_id,
1602            client_b.clone(),
1603            client_b.user_store.clone(),
1604            lang_registry.clone(),
1605            fs.clone(),
1606            &mut cx_b.to_async(),
1607        )
1608        .await
1609        .unwrap();
1610        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1611
1612        // Open a buffer as client B
1613        let buffer_b = project_b
1614            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1615            .await
1616            .unwrap();
1617        buffer_b.read_with(cx_b, |buf, _| {
1618            assert!(!buf.is_dirty());
1619            assert!(!buf.has_conflict());
1620        });
1621
1622        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1623            .await
1624            .unwrap();
1625        buffer_b
1626            .condition(&cx_b, |buf, _| {
1627                buf.text() == "new contents" && !buf.is_dirty()
1628            })
1629            .await;
1630        buffer_b.read_with(cx_b, |buf, _| {
1631            assert!(!buf.has_conflict());
1632        });
1633    }
1634
1635    #[gpui::test(iterations = 10)]
1636    async fn test_editing_while_guest_opens_buffer(
1637        cx_a: &mut TestAppContext,
1638        cx_b: &mut TestAppContext,
1639    ) {
1640        cx_a.foreground().forbid_parking();
1641        let lang_registry = Arc::new(LanguageRegistry::test());
1642        let fs = FakeFs::new(cx_a.background());
1643
1644        // Connect to a server as 2 clients.
1645        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1646        let client_a = server.create_client(cx_a, "user_a").await;
1647        let client_b = server.create_client(cx_b, "user_b").await;
1648
1649        // Share a project as client A
1650        fs.insert_tree(
1651            "/dir",
1652            json!({
1653                ".zed.toml": r#"collaborators = ["user_b"]"#,
1654                "a.txt": "a-contents",
1655            }),
1656        )
1657        .await;
1658        let project_a = cx_a.update(|cx| {
1659            Project::local(
1660                client_a.clone(),
1661                client_a.user_store.clone(),
1662                lang_registry.clone(),
1663                fs.clone(),
1664                cx,
1665            )
1666        });
1667        let (worktree_a, _) = project_a
1668            .update(cx_a, |p, cx| {
1669                p.find_or_create_local_worktree("/dir", true, cx)
1670            })
1671            .await
1672            .unwrap();
1673        worktree_a
1674            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1675            .await;
1676        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1677        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1678        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1679
1680        // Join that project as client B
1681        let project_b = Project::remote(
1682            project_id,
1683            client_b.clone(),
1684            client_b.user_store.clone(),
1685            lang_registry.clone(),
1686            fs.clone(),
1687            &mut cx_b.to_async(),
1688        )
1689        .await
1690        .unwrap();
1691
1692        // Open a buffer as client A
1693        let buffer_a = project_a
1694            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1695            .await
1696            .unwrap();
1697
1698        // Start opening the same buffer as client B
1699        let buffer_b = cx_b
1700            .background()
1701            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1702
1703        // Edit the buffer as client A while client B is still opening it.
1704        cx_b.background().simulate_random_delay().await;
1705        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1706        cx_b.background().simulate_random_delay().await;
1707        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1708
1709        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1710        let buffer_b = buffer_b.await.unwrap();
1711        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1712    }
1713
1714    #[gpui::test(iterations = 10)]
1715    async fn test_leaving_worktree_while_opening_buffer(
1716        cx_a: &mut TestAppContext,
1717        cx_b: &mut TestAppContext,
1718    ) {
1719        cx_a.foreground().forbid_parking();
1720        let lang_registry = Arc::new(LanguageRegistry::test());
1721        let fs = FakeFs::new(cx_a.background());
1722
1723        // Connect to a server as 2 clients.
1724        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1725        let client_a = server.create_client(cx_a, "user_a").await;
1726        let client_b = server.create_client(cx_b, "user_b").await;
1727
1728        // Share a project as client A
1729        fs.insert_tree(
1730            "/dir",
1731            json!({
1732                ".zed.toml": r#"collaborators = ["user_b"]"#,
1733                "a.txt": "a-contents",
1734            }),
1735        )
1736        .await;
1737        let project_a = cx_a.update(|cx| {
1738            Project::local(
1739                client_a.clone(),
1740                client_a.user_store.clone(),
1741                lang_registry.clone(),
1742                fs.clone(),
1743                cx,
1744            )
1745        });
1746        let (worktree_a, _) = project_a
1747            .update(cx_a, |p, cx| {
1748                p.find_or_create_local_worktree("/dir", true, cx)
1749            })
1750            .await
1751            .unwrap();
1752        worktree_a
1753            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1754            .await;
1755        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1756        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1757        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1758
1759        // Join that project as client B
1760        let project_b = Project::remote(
1761            project_id,
1762            client_b.clone(),
1763            client_b.user_store.clone(),
1764            lang_registry.clone(),
1765            fs.clone(),
1766            &mut cx_b.to_async(),
1767        )
1768        .await
1769        .unwrap();
1770
1771        // See that a guest has joined as client A.
1772        project_a
1773            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1774            .await;
1775
1776        // Begin opening a buffer as client B, but leave the project before the open completes.
1777        let buffer_b = cx_b
1778            .background()
1779            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1780        cx_b.update(|_| drop(project_b));
1781        drop(buffer_b);
1782
1783        // See that the guest has left.
1784        project_a
1785            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1786            .await;
1787    }
1788
1789    #[gpui::test(iterations = 10)]
1790    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1791        cx_a.foreground().forbid_parking();
1792        let lang_registry = Arc::new(LanguageRegistry::test());
1793        let fs = FakeFs::new(cx_a.background());
1794
1795        // Connect to a server as 2 clients.
1796        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1797        let client_a = server.create_client(cx_a, "user_a").await;
1798        let client_b = server.create_client(cx_b, "user_b").await;
1799
1800        // Share a project as client A
1801        fs.insert_tree(
1802            "/a",
1803            json!({
1804                ".zed.toml": r#"collaborators = ["user_b"]"#,
1805                "a.txt": "a-contents",
1806                "b.txt": "b-contents",
1807            }),
1808        )
1809        .await;
1810        let project_a = cx_a.update(|cx| {
1811            Project::local(
1812                client_a.clone(),
1813                client_a.user_store.clone(),
1814                lang_registry.clone(),
1815                fs.clone(),
1816                cx,
1817            )
1818        });
1819        let (worktree_a, _) = project_a
1820            .update(cx_a, |p, cx| {
1821                p.find_or_create_local_worktree("/a", true, cx)
1822            })
1823            .await
1824            .unwrap();
1825        worktree_a
1826            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1827            .await;
1828        let project_id = project_a
1829            .update(cx_a, |project, _| project.next_remote_id())
1830            .await;
1831        project_a
1832            .update(cx_a, |project, cx| project.share(cx))
1833            .await
1834            .unwrap();
1835
1836        // Join that project as client B
1837        let _project_b = Project::remote(
1838            project_id,
1839            client_b.clone(),
1840            client_b.user_store.clone(),
1841            lang_registry.clone(),
1842            fs.clone(),
1843            &mut cx_b.to_async(),
1844        )
1845        .await
1846        .unwrap();
1847
1848        // Client A sees that a guest has joined.
1849        project_a
1850            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1851            .await;
1852
1853        // Drop client B's connection and ensure client A observes client B leaving the project.
1854        client_b.disconnect(&cx_b.to_async()).unwrap();
1855        project_a
1856            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1857            .await;
1858
1859        // Rejoin the project as client B
1860        let _project_b = Project::remote(
1861            project_id,
1862            client_b.clone(),
1863            client_b.user_store.clone(),
1864            lang_registry.clone(),
1865            fs.clone(),
1866            &mut cx_b.to_async(),
1867        )
1868        .await
1869        .unwrap();
1870
1871        // Client A sees that a guest has re-joined.
1872        project_a
1873            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1874            .await;
1875
1876        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1877        server.disconnect_client(client_b.current_user_id(cx_b));
1878        cx_a.foreground().advance_clock(Duration::from_secs(3));
1879        project_a
1880            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1881            .await;
1882    }
1883
1884    #[gpui::test(iterations = 10)]
1885    async fn test_collaborating_with_diagnostics(
1886        cx_a: &mut TestAppContext,
1887        cx_b: &mut TestAppContext,
1888    ) {
1889        cx_a.foreground().forbid_parking();
1890        let mut lang_registry = Arc::new(LanguageRegistry::test());
1891        let fs = FakeFs::new(cx_a.background());
1892
1893        // Set up a fake language server.
1894        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1895        Arc::get_mut(&mut lang_registry)
1896            .unwrap()
1897            .add(Arc::new(Language::new(
1898                LanguageConfig {
1899                    name: "Rust".into(),
1900                    path_suffixes: vec!["rs".to_string()],
1901                    language_server: Some(language_server_config),
1902                    ..Default::default()
1903                },
1904                Some(tree_sitter_rust::language()),
1905            )));
1906
1907        // Connect to a server as 2 clients.
1908        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1909        let client_a = server.create_client(cx_a, "user_a").await;
1910        let client_b = server.create_client(cx_b, "user_b").await;
1911
1912        // Share a project as client A
1913        fs.insert_tree(
1914            "/a",
1915            json!({
1916                ".zed.toml": r#"collaborators = ["user_b"]"#,
1917                "a.rs": "let one = two",
1918                "other.rs": "",
1919            }),
1920        )
1921        .await;
1922        let project_a = cx_a.update(|cx| {
1923            Project::local(
1924                client_a.clone(),
1925                client_a.user_store.clone(),
1926                lang_registry.clone(),
1927                fs.clone(),
1928                cx,
1929            )
1930        });
1931        let (worktree_a, _) = project_a
1932            .update(cx_a, |p, cx| {
1933                p.find_or_create_local_worktree("/a", true, cx)
1934            })
1935            .await
1936            .unwrap();
1937        worktree_a
1938            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1939            .await;
1940        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1941        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1942        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1943
1944        // Cause the language server to start.
1945        let _ = cx_a
1946            .background()
1947            .spawn(project_a.update(cx_a, |project, cx| {
1948                project.open_buffer(
1949                    ProjectPath {
1950                        worktree_id,
1951                        path: Path::new("other.rs").into(),
1952                    },
1953                    cx,
1954                )
1955            }))
1956            .await
1957            .unwrap();
1958
1959        // Simulate a language server reporting errors for a file.
1960        let mut fake_language_server = fake_language_servers.next().await.unwrap();
1961        fake_language_server
1962            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1963                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1964                version: None,
1965                diagnostics: vec![lsp::Diagnostic {
1966                    severity: Some(lsp::DiagnosticSeverity::ERROR),
1967                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1968                    message: "message 1".to_string(),
1969                    ..Default::default()
1970                }],
1971            })
1972            .await;
1973
1974        // Wait for server to see the diagnostics update.
1975        server
1976            .condition(|store| {
1977                let worktree = store
1978                    .project(project_id)
1979                    .unwrap()
1980                    .share
1981                    .as_ref()
1982                    .unwrap()
1983                    .worktrees
1984                    .get(&worktree_id.to_proto())
1985                    .unwrap();
1986
1987                !worktree.diagnostic_summaries.is_empty()
1988            })
1989            .await;
1990
1991        // Join the worktree as client B.
1992        let project_b = Project::remote(
1993            project_id,
1994            client_b.clone(),
1995            client_b.user_store.clone(),
1996            lang_registry.clone(),
1997            fs.clone(),
1998            &mut cx_b.to_async(),
1999        )
2000        .await
2001        .unwrap();
2002
2003        project_b.read_with(cx_b, |project, cx| {
2004            assert_eq!(
2005                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2006                &[(
2007                    ProjectPath {
2008                        worktree_id,
2009                        path: Arc::from(Path::new("a.rs")),
2010                    },
2011                    DiagnosticSummary {
2012                        error_count: 1,
2013                        warning_count: 0,
2014                        ..Default::default()
2015                    },
2016                )]
2017            )
2018        });
2019
2020        // Simulate a language server reporting more errors for a file.
2021        fake_language_server
2022            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2023                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2024                version: None,
2025                diagnostics: vec![
2026                    lsp::Diagnostic {
2027                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2028                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2029                        message: "message 1".to_string(),
2030                        ..Default::default()
2031                    },
2032                    lsp::Diagnostic {
2033                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2034                        range: lsp::Range::new(
2035                            lsp::Position::new(0, 10),
2036                            lsp::Position::new(0, 13),
2037                        ),
2038                        message: "message 2".to_string(),
2039                        ..Default::default()
2040                    },
2041                ],
2042            })
2043            .await;
2044
2045        // Client b gets the updated summaries
2046        project_b
2047            .condition(&cx_b, |project, cx| {
2048                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2049                    == &[(
2050                        ProjectPath {
2051                            worktree_id,
2052                            path: Arc::from(Path::new("a.rs")),
2053                        },
2054                        DiagnosticSummary {
2055                            error_count: 1,
2056                            warning_count: 1,
2057                            ..Default::default()
2058                        },
2059                    )]
2060            })
2061            .await;
2062
2063        // Open the file with the errors on client B. They should be present.
2064        let buffer_b = cx_b
2065            .background()
2066            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2067            .await
2068            .unwrap();
2069
2070        buffer_b.read_with(cx_b, |buffer, _| {
2071            assert_eq!(
2072                buffer
2073                    .snapshot()
2074                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2075                    .map(|entry| entry)
2076                    .collect::<Vec<_>>(),
2077                &[
2078                    DiagnosticEntry {
2079                        range: Point::new(0, 4)..Point::new(0, 7),
2080                        diagnostic: Diagnostic {
2081                            group_id: 0,
2082                            message: "message 1".to_string(),
2083                            severity: lsp::DiagnosticSeverity::ERROR,
2084                            is_primary: true,
2085                            ..Default::default()
2086                        }
2087                    },
2088                    DiagnosticEntry {
2089                        range: Point::new(0, 10)..Point::new(0, 13),
2090                        diagnostic: Diagnostic {
2091                            group_id: 1,
2092                            severity: lsp::DiagnosticSeverity::WARNING,
2093                            message: "message 2".to_string(),
2094                            is_primary: true,
2095                            ..Default::default()
2096                        }
2097                    }
2098                ]
2099            );
2100        });
2101    }
2102
2103    #[gpui::test(iterations = 10)]
2104    async fn test_collaborating_with_completion(
2105        cx_a: &mut TestAppContext,
2106        cx_b: &mut TestAppContext,
2107    ) {
2108        cx_a.foreground().forbid_parking();
2109        let mut lang_registry = Arc::new(LanguageRegistry::test());
2110        let fs = FakeFs::new(cx_a.background());
2111
2112        // Set up a fake language server.
2113        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2114        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2115            completion_provider: Some(lsp::CompletionOptions {
2116                trigger_characters: Some(vec![".".to_string()]),
2117                ..Default::default()
2118            }),
2119            ..Default::default()
2120        });
2121        Arc::get_mut(&mut lang_registry)
2122            .unwrap()
2123            .add(Arc::new(Language::new(
2124                LanguageConfig {
2125                    name: "Rust".into(),
2126                    path_suffixes: vec!["rs".to_string()],
2127                    language_server: Some(language_server_config),
2128                    ..Default::default()
2129                },
2130                Some(tree_sitter_rust::language()),
2131            )));
2132
2133        // Connect to a server as 2 clients.
2134        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2135        let client_a = server.create_client(cx_a, "user_a").await;
2136        let client_b = server.create_client(cx_b, "user_b").await;
2137
2138        // Share a project as client A
2139        fs.insert_tree(
2140            "/a",
2141            json!({
2142                ".zed.toml": r#"collaborators = ["user_b"]"#,
2143                "main.rs": "fn main() { a }",
2144                "other.rs": "",
2145            }),
2146        )
2147        .await;
2148        let project_a = cx_a.update(|cx| {
2149            Project::local(
2150                client_a.clone(),
2151                client_a.user_store.clone(),
2152                lang_registry.clone(),
2153                fs.clone(),
2154                cx,
2155            )
2156        });
2157        let (worktree_a, _) = project_a
2158            .update(cx_a, |p, cx| {
2159                p.find_or_create_local_worktree("/a", true, cx)
2160            })
2161            .await
2162            .unwrap();
2163        worktree_a
2164            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2165            .await;
2166        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2167        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2168        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2169
2170        // Join the worktree as client B.
2171        let project_b = Project::remote(
2172            project_id,
2173            client_b.clone(),
2174            client_b.user_store.clone(),
2175            lang_registry.clone(),
2176            fs.clone(),
2177            &mut cx_b.to_async(),
2178        )
2179        .await
2180        .unwrap();
2181
2182        // Open a file in an editor as the guest.
2183        let buffer_b = project_b
2184            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2185            .await
2186            .unwrap();
2187        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2188        let editor_b = cx_b.add_view(window_b, |cx| {
2189            Editor::for_buffer(
2190                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2191                Some(project_b.clone()),
2192                watch::channel_with(Settings::test(cx)).1,
2193                cx,
2194            )
2195        });
2196
2197        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2198        buffer_b
2199            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2200            .await;
2201
2202        // Type a completion trigger character as the guest.
2203        editor_b.update(cx_b, |editor, cx| {
2204            editor.select_ranges([13..13], None, cx);
2205            editor.handle_input(&Input(".".into()), cx);
2206            cx.focus(&editor_b);
2207        });
2208
2209        // Receive a completion request as the host's language server.
2210        // Return some completions from the host's language server.
2211        cx_a.foreground().start_waiting();
2212        fake_language_server
2213            .handle_request::<lsp::request::Completion, _>(|params, _| {
2214                assert_eq!(
2215                    params.text_document_position.text_document.uri,
2216                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2217                );
2218                assert_eq!(
2219                    params.text_document_position.position,
2220                    lsp::Position::new(0, 14),
2221                );
2222
2223                Some(lsp::CompletionResponse::Array(vec![
2224                    lsp::CompletionItem {
2225                        label: "first_method(…)".into(),
2226                        detail: Some("fn(&mut self, B) -> C".into()),
2227                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2228                            new_text: "first_method($1)".to_string(),
2229                            range: lsp::Range::new(
2230                                lsp::Position::new(0, 14),
2231                                lsp::Position::new(0, 14),
2232                            ),
2233                        })),
2234                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2235                        ..Default::default()
2236                    },
2237                    lsp::CompletionItem {
2238                        label: "second_method(…)".into(),
2239                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2240                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2241                            new_text: "second_method()".to_string(),
2242                            range: lsp::Range::new(
2243                                lsp::Position::new(0, 14),
2244                                lsp::Position::new(0, 14),
2245                            ),
2246                        })),
2247                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2248                        ..Default::default()
2249                    },
2250                ]))
2251            })
2252            .next()
2253            .await
2254            .unwrap();
2255        cx_a.foreground().finish_waiting();
2256
2257        // Open the buffer on the host.
2258        let buffer_a = project_a
2259            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2260            .await
2261            .unwrap();
2262        buffer_a
2263            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2264            .await;
2265
2266        // Confirm a completion on the guest.
2267        editor_b
2268            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2269            .await;
2270        editor_b.update(cx_b, |editor, cx| {
2271            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2272            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2273        });
2274
2275        // Return a resolved completion from the host's language server.
2276        // The resolved completion has an additional text edit.
2277        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2278            |params, _| {
2279                assert_eq!(params.label, "first_method(…)");
2280                lsp::CompletionItem {
2281                    label: "first_method(…)".into(),
2282                    detail: Some("fn(&mut self, B) -> C".into()),
2283                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2284                        new_text: "first_method($1)".to_string(),
2285                        range: lsp::Range::new(
2286                            lsp::Position::new(0, 14),
2287                            lsp::Position::new(0, 14),
2288                        ),
2289                    })),
2290                    additional_text_edits: Some(vec![lsp::TextEdit {
2291                        new_text: "use d::SomeTrait;\n".to_string(),
2292                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2293                    }]),
2294                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2295                    ..Default::default()
2296                }
2297            },
2298        );
2299
2300        // The additional edit is applied.
2301        buffer_a
2302            .condition(&cx_a, |buffer, _| {
2303                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2304            })
2305            .await;
2306        buffer_b
2307            .condition(&cx_b, |buffer, _| {
2308                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2309            })
2310            .await;
2311    }
2312
2313    #[gpui::test(iterations = 10)]
2314    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2315        cx_a.foreground().forbid_parking();
2316        let mut lang_registry = Arc::new(LanguageRegistry::test());
2317        let fs = FakeFs::new(cx_a.background());
2318
2319        // Set up a fake language server.
2320        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2321        Arc::get_mut(&mut lang_registry)
2322            .unwrap()
2323            .add(Arc::new(Language::new(
2324                LanguageConfig {
2325                    name: "Rust".into(),
2326                    path_suffixes: vec!["rs".to_string()],
2327                    language_server: Some(language_server_config),
2328                    ..Default::default()
2329                },
2330                Some(tree_sitter_rust::language()),
2331            )));
2332
2333        // Connect to a server as 2 clients.
2334        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2335        let client_a = server.create_client(cx_a, "user_a").await;
2336        let client_b = server.create_client(cx_b, "user_b").await;
2337
2338        // Share a project as client A
2339        fs.insert_tree(
2340            "/a",
2341            json!({
2342                ".zed.toml": r#"collaborators = ["user_b"]"#,
2343                "a.rs": "let one = two",
2344            }),
2345        )
2346        .await;
2347        let project_a = cx_a.update(|cx| {
2348            Project::local(
2349                client_a.clone(),
2350                client_a.user_store.clone(),
2351                lang_registry.clone(),
2352                fs.clone(),
2353                cx,
2354            )
2355        });
2356        let (worktree_a, _) = project_a
2357            .update(cx_a, |p, cx| {
2358                p.find_or_create_local_worktree("/a", true, cx)
2359            })
2360            .await
2361            .unwrap();
2362        worktree_a
2363            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2364            .await;
2365        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2366        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2367        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2368
2369        // Join the worktree as client B.
2370        let project_b = Project::remote(
2371            project_id,
2372            client_b.clone(),
2373            client_b.user_store.clone(),
2374            lang_registry.clone(),
2375            fs.clone(),
2376            &mut cx_b.to_async(),
2377        )
2378        .await
2379        .unwrap();
2380
2381        let buffer_b = cx_b
2382            .background()
2383            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2384            .await
2385            .unwrap();
2386
2387        let format = project_b.update(cx_b, |project, cx| {
2388            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2389        });
2390
2391        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2392        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2393            Some(vec![
2394                lsp::TextEdit {
2395                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2396                    new_text: "h".to_string(),
2397                },
2398                lsp::TextEdit {
2399                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2400                    new_text: "y".to_string(),
2401                },
2402            ])
2403        });
2404
2405        format.await.unwrap();
2406        assert_eq!(
2407            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2408            "let honey = two"
2409        );
2410    }
2411
2412    #[gpui::test(iterations = 10)]
2413    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2414        cx_a.foreground().forbid_parking();
2415        let mut lang_registry = Arc::new(LanguageRegistry::test());
2416        let fs = FakeFs::new(cx_a.background());
2417        fs.insert_tree(
2418            "/root-1",
2419            json!({
2420                ".zed.toml": r#"collaborators = ["user_b"]"#,
2421                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2422            }),
2423        )
2424        .await;
2425        fs.insert_tree(
2426            "/root-2",
2427            json!({
2428                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2429            }),
2430        )
2431        .await;
2432
2433        // Set up a fake language server.
2434        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2435        Arc::get_mut(&mut lang_registry)
2436            .unwrap()
2437            .add(Arc::new(Language::new(
2438                LanguageConfig {
2439                    name: "Rust".into(),
2440                    path_suffixes: vec!["rs".to_string()],
2441                    language_server: Some(language_server_config),
2442                    ..Default::default()
2443                },
2444                Some(tree_sitter_rust::language()),
2445            )));
2446
2447        // Connect to a server as 2 clients.
2448        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2449        let client_a = server.create_client(cx_a, "user_a").await;
2450        let client_b = server.create_client(cx_b, "user_b").await;
2451
2452        // Share a project as client A
2453        let project_a = cx_a.update(|cx| {
2454            Project::local(
2455                client_a.clone(),
2456                client_a.user_store.clone(),
2457                lang_registry.clone(),
2458                fs.clone(),
2459                cx,
2460            )
2461        });
2462        let (worktree_a, _) = project_a
2463            .update(cx_a, |p, cx| {
2464                p.find_or_create_local_worktree("/root-1", true, cx)
2465            })
2466            .await
2467            .unwrap();
2468        worktree_a
2469            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2470            .await;
2471        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2472        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2473        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2474
2475        // Join the worktree as client B.
2476        let project_b = Project::remote(
2477            project_id,
2478            client_b.clone(),
2479            client_b.user_store.clone(),
2480            lang_registry.clone(),
2481            fs.clone(),
2482            &mut cx_b.to_async(),
2483        )
2484        .await
2485        .unwrap();
2486
2487        // Open the file on client B.
2488        let buffer_b = cx_b
2489            .background()
2490            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2491            .await
2492            .unwrap();
2493
2494        // Request the definition of a symbol as the guest.
2495        let definitions_1 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2496
2497        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2498        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2499            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2500                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2501                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2502            )))
2503        });
2504
2505        let definitions_1 = definitions_1.await.unwrap();
2506        cx_b.read(|cx| {
2507            assert_eq!(definitions_1.len(), 1);
2508            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2509            let target_buffer = definitions_1[0].buffer.read(cx);
2510            assert_eq!(
2511                target_buffer.text(),
2512                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2513            );
2514            assert_eq!(
2515                definitions_1[0].range.to_point(target_buffer),
2516                Point::new(0, 6)..Point::new(0, 9)
2517            );
2518        });
2519
2520        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2521        // the previous call to `definition`.
2522        let definitions_2 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2523        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2524            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2525                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2526                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2527            )))
2528        });
2529
2530        let definitions_2 = definitions_2.await.unwrap();
2531        cx_b.read(|cx| {
2532            assert_eq!(definitions_2.len(), 1);
2533            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2534            let target_buffer = definitions_2[0].buffer.read(cx);
2535            assert_eq!(
2536                target_buffer.text(),
2537                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2538            );
2539            assert_eq!(
2540                definitions_2[0].range.to_point(target_buffer),
2541                Point::new(1, 6)..Point::new(1, 11)
2542            );
2543        });
2544        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2545    }
2546
2547    #[gpui::test(iterations = 10)]
2548    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2549        cx_a.foreground().forbid_parking();
2550        let mut lang_registry = Arc::new(LanguageRegistry::test());
2551        let fs = FakeFs::new(cx_a.background());
2552        fs.insert_tree(
2553            "/root-1",
2554            json!({
2555                ".zed.toml": r#"collaborators = ["user_b"]"#,
2556                "one.rs": "const ONE: usize = 1;",
2557                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2558            }),
2559        )
2560        .await;
2561        fs.insert_tree(
2562            "/root-2",
2563            json!({
2564                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2565            }),
2566        )
2567        .await;
2568
2569        // Set up a fake language server.
2570        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2571        Arc::get_mut(&mut lang_registry)
2572            .unwrap()
2573            .add(Arc::new(Language::new(
2574                LanguageConfig {
2575                    name: "Rust".into(),
2576                    path_suffixes: vec!["rs".to_string()],
2577                    language_server: Some(language_server_config),
2578                    ..Default::default()
2579                },
2580                Some(tree_sitter_rust::language()),
2581            )));
2582
2583        // Connect to a server as 2 clients.
2584        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2585        let client_a = server.create_client(cx_a, "user_a").await;
2586        let client_b = server.create_client(cx_b, "user_b").await;
2587
2588        // Share a project as client A
2589        let project_a = cx_a.update(|cx| {
2590            Project::local(
2591                client_a.clone(),
2592                client_a.user_store.clone(),
2593                lang_registry.clone(),
2594                fs.clone(),
2595                cx,
2596            )
2597        });
2598        let (worktree_a, _) = project_a
2599            .update(cx_a, |p, cx| {
2600                p.find_or_create_local_worktree("/root-1", true, cx)
2601            })
2602            .await
2603            .unwrap();
2604        worktree_a
2605            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2606            .await;
2607        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2608        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2609        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2610
2611        // Join the worktree as client B.
2612        let project_b = Project::remote(
2613            project_id,
2614            client_b.clone(),
2615            client_b.user_store.clone(),
2616            lang_registry.clone(),
2617            fs.clone(),
2618            &mut cx_b.to_async(),
2619        )
2620        .await
2621        .unwrap();
2622
2623        // Open the file on client B.
2624        let buffer_b = cx_b
2625            .background()
2626            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2627            .await
2628            .unwrap();
2629
2630        // Request references to a symbol as the guest.
2631        let references = project_b.update(cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2632
2633        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2634        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2635            assert_eq!(
2636                params.text_document_position.text_document.uri.as_str(),
2637                "file:///root-1/one.rs"
2638            );
2639            Some(vec![
2640                lsp::Location {
2641                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2642                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2643                },
2644                lsp::Location {
2645                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2646                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2647                },
2648                lsp::Location {
2649                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2650                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2651                },
2652            ])
2653        });
2654
2655        let references = references.await.unwrap();
2656        cx_b.read(|cx| {
2657            assert_eq!(references.len(), 3);
2658            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2659
2660            let two_buffer = references[0].buffer.read(cx);
2661            let three_buffer = references[2].buffer.read(cx);
2662            assert_eq!(
2663                two_buffer.file().unwrap().path().as_ref(),
2664                Path::new("two.rs")
2665            );
2666            assert_eq!(references[1].buffer, references[0].buffer);
2667            assert_eq!(
2668                three_buffer.file().unwrap().full_path(cx),
2669                Path::new("three.rs")
2670            );
2671
2672            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2673            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2674            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2675        });
2676    }
2677
2678    #[gpui::test(iterations = 10)]
2679    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2680        cx_a.foreground().forbid_parking();
2681        let lang_registry = Arc::new(LanguageRegistry::test());
2682        let fs = FakeFs::new(cx_a.background());
2683        fs.insert_tree(
2684            "/root-1",
2685            json!({
2686                ".zed.toml": r#"collaborators = ["user_b"]"#,
2687                "a": "hello world",
2688                "b": "goodnight moon",
2689                "c": "a world of goo",
2690                "d": "world champion of clown world",
2691            }),
2692        )
2693        .await;
2694        fs.insert_tree(
2695            "/root-2",
2696            json!({
2697                "e": "disney world is fun",
2698            }),
2699        )
2700        .await;
2701
2702        // Connect to a server as 2 clients.
2703        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2704        let client_a = server.create_client(cx_a, "user_a").await;
2705        let client_b = server.create_client(cx_b, "user_b").await;
2706
2707        // Share a project as client A
2708        let project_a = cx_a.update(|cx| {
2709            Project::local(
2710                client_a.clone(),
2711                client_a.user_store.clone(),
2712                lang_registry.clone(),
2713                fs.clone(),
2714                cx,
2715            )
2716        });
2717        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2718
2719        let (worktree_1, _) = project_a
2720            .update(cx_a, |p, cx| {
2721                p.find_or_create_local_worktree("/root-1", true, cx)
2722            })
2723            .await
2724            .unwrap();
2725        worktree_1
2726            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2727            .await;
2728        let (worktree_2, _) = project_a
2729            .update(cx_a, |p, cx| {
2730                p.find_or_create_local_worktree("/root-2", true, cx)
2731            })
2732            .await
2733            .unwrap();
2734        worktree_2
2735            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2736            .await;
2737
2738        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2739
2740        // Join the worktree as client B.
2741        let project_b = Project::remote(
2742            project_id,
2743            client_b.clone(),
2744            client_b.user_store.clone(),
2745            lang_registry.clone(),
2746            fs.clone(),
2747            &mut cx_b.to_async(),
2748        )
2749        .await
2750        .unwrap();
2751
2752        let results = project_b
2753            .update(cx_b, |project, cx| {
2754                project.search(SearchQuery::text("world", false, false), cx)
2755            })
2756            .await
2757            .unwrap();
2758
2759        let mut ranges_by_path = results
2760            .into_iter()
2761            .map(|(buffer, ranges)| {
2762                buffer.read_with(cx_b, |buffer, cx| {
2763                    let path = buffer.file().unwrap().full_path(cx);
2764                    let offset_ranges = ranges
2765                        .into_iter()
2766                        .map(|range| range.to_offset(buffer))
2767                        .collect::<Vec<_>>();
2768                    (path, offset_ranges)
2769                })
2770            })
2771            .collect::<Vec<_>>();
2772        ranges_by_path.sort_by_key(|(path, _)| path.clone());
2773
2774        assert_eq!(
2775            ranges_by_path,
2776            &[
2777                (PathBuf::from("root-1/a"), vec![6..11]),
2778                (PathBuf::from("root-1/c"), vec![2..7]),
2779                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2780                (PathBuf::from("root-2/e"), vec![7..12]),
2781            ]
2782        );
2783    }
2784
2785    #[gpui::test(iterations = 10)]
2786    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2787        cx_a.foreground().forbid_parking();
2788        let lang_registry = Arc::new(LanguageRegistry::test());
2789        let fs = FakeFs::new(cx_a.background());
2790        fs.insert_tree(
2791            "/root-1",
2792            json!({
2793                ".zed.toml": r#"collaborators = ["user_b"]"#,
2794                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2795            }),
2796        )
2797        .await;
2798
2799        // Set up a fake language server.
2800        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2801        lang_registry.add(Arc::new(Language::new(
2802            LanguageConfig {
2803                name: "Rust".into(),
2804                path_suffixes: vec!["rs".to_string()],
2805                language_server: Some(language_server_config),
2806                ..Default::default()
2807            },
2808            Some(tree_sitter_rust::language()),
2809        )));
2810
2811        // Connect to a server as 2 clients.
2812        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2813        let client_a = server.create_client(cx_a, "user_a").await;
2814        let client_b = server.create_client(cx_b, "user_b").await;
2815
2816        // Share a project as client A
2817        let project_a = cx_a.update(|cx| {
2818            Project::local(
2819                client_a.clone(),
2820                client_a.user_store.clone(),
2821                lang_registry.clone(),
2822                fs.clone(),
2823                cx,
2824            )
2825        });
2826        let (worktree_a, _) = project_a
2827            .update(cx_a, |p, cx| {
2828                p.find_or_create_local_worktree("/root-1", true, cx)
2829            })
2830            .await
2831            .unwrap();
2832        worktree_a
2833            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2834            .await;
2835        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2836        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2837        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2838
2839        // Join the worktree as client B.
2840        let project_b = Project::remote(
2841            project_id,
2842            client_b.clone(),
2843            client_b.user_store.clone(),
2844            lang_registry.clone(),
2845            fs.clone(),
2846            &mut cx_b.to_async(),
2847        )
2848        .await
2849        .unwrap();
2850
2851        // Open the file on client B.
2852        let buffer_b = cx_b
2853            .background()
2854            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2855            .await
2856            .unwrap();
2857
2858        // Request document highlights as the guest.
2859        let highlights = project_b.update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2860
2861        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2862        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2863            |params, _| {
2864                assert_eq!(
2865                    params
2866                        .text_document_position_params
2867                        .text_document
2868                        .uri
2869                        .as_str(),
2870                    "file:///root-1/main.rs"
2871                );
2872                assert_eq!(
2873                    params.text_document_position_params.position,
2874                    lsp::Position::new(0, 34)
2875                );
2876                Some(vec![
2877                    lsp::DocumentHighlight {
2878                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2879                        range: lsp::Range::new(
2880                            lsp::Position::new(0, 10),
2881                            lsp::Position::new(0, 16),
2882                        ),
2883                    },
2884                    lsp::DocumentHighlight {
2885                        kind: Some(lsp::DocumentHighlightKind::READ),
2886                        range: lsp::Range::new(
2887                            lsp::Position::new(0, 32),
2888                            lsp::Position::new(0, 38),
2889                        ),
2890                    },
2891                    lsp::DocumentHighlight {
2892                        kind: Some(lsp::DocumentHighlightKind::READ),
2893                        range: lsp::Range::new(
2894                            lsp::Position::new(0, 41),
2895                            lsp::Position::new(0, 47),
2896                        ),
2897                    },
2898                ])
2899            },
2900        );
2901
2902        let highlights = highlights.await.unwrap();
2903        buffer_b.read_with(cx_b, |buffer, _| {
2904            let snapshot = buffer.snapshot();
2905
2906            let highlights = highlights
2907                .into_iter()
2908                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2909                .collect::<Vec<_>>();
2910            assert_eq!(
2911                highlights,
2912                &[
2913                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2914                    (lsp::DocumentHighlightKind::READ, 32..38),
2915                    (lsp::DocumentHighlightKind::READ, 41..47)
2916                ]
2917            )
2918        });
2919    }
2920
2921    #[gpui::test(iterations = 10)]
2922    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2923        cx_a.foreground().forbid_parking();
2924        let mut lang_registry = Arc::new(LanguageRegistry::test());
2925        let fs = FakeFs::new(cx_a.background());
2926        fs.insert_tree(
2927            "/code",
2928            json!({
2929                "crate-1": {
2930                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2931                    "one.rs": "const ONE: usize = 1;",
2932                },
2933                "crate-2": {
2934                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2935                },
2936                "private": {
2937                    "passwords.txt": "the-password",
2938                }
2939            }),
2940        )
2941        .await;
2942
2943        // Set up a fake language server.
2944        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2945        Arc::get_mut(&mut lang_registry)
2946            .unwrap()
2947            .add(Arc::new(Language::new(
2948                LanguageConfig {
2949                    name: "Rust".into(),
2950                    path_suffixes: vec!["rs".to_string()],
2951                    language_server: Some(language_server_config),
2952                    ..Default::default()
2953                },
2954                Some(tree_sitter_rust::language()),
2955            )));
2956
2957        // Connect to a server as 2 clients.
2958        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2959        let client_a = server.create_client(cx_a, "user_a").await;
2960        let client_b = server.create_client(cx_b, "user_b").await;
2961
2962        // Share a project as client A
2963        let project_a = cx_a.update(|cx| {
2964            Project::local(
2965                client_a.clone(),
2966                client_a.user_store.clone(),
2967                lang_registry.clone(),
2968                fs.clone(),
2969                cx,
2970            )
2971        });
2972        let (worktree_a, _) = project_a
2973            .update(cx_a, |p, cx| {
2974                p.find_or_create_local_worktree("/code/crate-1", true, cx)
2975            })
2976            .await
2977            .unwrap();
2978        worktree_a
2979            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2980            .await;
2981        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2982        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2983        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2984
2985        // Join the worktree as client B.
2986        let project_b = Project::remote(
2987            project_id,
2988            client_b.clone(),
2989            client_b.user_store.clone(),
2990            lang_registry.clone(),
2991            fs.clone(),
2992            &mut cx_b.to_async(),
2993        )
2994        .await
2995        .unwrap();
2996
2997        // Cause the language server to start.
2998        let _buffer = cx_b
2999            .background()
3000            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3001            .await
3002            .unwrap();
3003
3004        // Request the definition of a symbol as the guest.
3005        let symbols = project_b.update(cx_b, |p, cx| p.symbols("two", cx));
3006        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3007        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3008            #[allow(deprecated)]
3009            Some(vec![lsp::SymbolInformation {
3010                name: "TWO".into(),
3011                location: lsp::Location {
3012                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3013                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3014                },
3015                kind: lsp::SymbolKind::CONSTANT,
3016                tags: None,
3017                container_name: None,
3018                deprecated: None,
3019            }])
3020        });
3021
3022        let symbols = symbols.await.unwrap();
3023        assert_eq!(symbols.len(), 1);
3024        assert_eq!(symbols[0].name, "TWO");
3025
3026        // Open one of the returned symbols.
3027        let buffer_b_2 = project_b
3028            .update(cx_b, |project, cx| {
3029                project.open_buffer_for_symbol(&symbols[0], cx)
3030            })
3031            .await
3032            .unwrap();
3033        buffer_b_2.read_with(cx_b, |buffer, _| {
3034            assert_eq!(
3035                buffer.file().unwrap().path().as_ref(),
3036                Path::new("../crate-2/two.rs")
3037            );
3038        });
3039
3040        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3041        let mut fake_symbol = symbols[0].clone();
3042        fake_symbol.path = Path::new("/code/secrets").into();
3043        let error = project_b
3044            .update(cx_b, |project, cx| {
3045                project.open_buffer_for_symbol(&fake_symbol, cx)
3046            })
3047            .await
3048            .unwrap_err();
3049        assert!(error.to_string().contains("invalid symbol signature"));
3050    }
3051
3052    #[gpui::test(iterations = 10)]
3053    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3054        cx_a: &mut TestAppContext,
3055        cx_b: &mut TestAppContext,
3056        mut rng: StdRng,
3057    ) {
3058        cx_a.foreground().forbid_parking();
3059        let mut lang_registry = Arc::new(LanguageRegistry::test());
3060        let fs = FakeFs::new(cx_a.background());
3061        fs.insert_tree(
3062            "/root",
3063            json!({
3064                ".zed.toml": r#"collaborators = ["user_b"]"#,
3065                "a.rs": "const ONE: usize = b::TWO;",
3066                "b.rs": "const TWO: usize = 2",
3067            }),
3068        )
3069        .await;
3070
3071        // Set up a fake language server.
3072        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3073
3074        Arc::get_mut(&mut lang_registry)
3075            .unwrap()
3076            .add(Arc::new(Language::new(
3077                LanguageConfig {
3078                    name: "Rust".into(),
3079                    path_suffixes: vec!["rs".to_string()],
3080                    language_server: Some(language_server_config),
3081                    ..Default::default()
3082                },
3083                Some(tree_sitter_rust::language()),
3084            )));
3085
3086        // Connect to a server as 2 clients.
3087        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3088        let client_a = server.create_client(cx_a, "user_a").await;
3089        let client_b = server.create_client(cx_b, "user_b").await;
3090
3091        // Share a project as client A
3092        let project_a = cx_a.update(|cx| {
3093            Project::local(
3094                client_a.clone(),
3095                client_a.user_store.clone(),
3096                lang_registry.clone(),
3097                fs.clone(),
3098                cx,
3099            )
3100        });
3101
3102        let (worktree_a, _) = project_a
3103            .update(cx_a, |p, cx| {
3104                p.find_or_create_local_worktree("/root", true, cx)
3105            })
3106            .await
3107            .unwrap();
3108        worktree_a
3109            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3110            .await;
3111        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3112        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3113        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3114
3115        // Join the worktree as client B.
3116        let project_b = Project::remote(
3117            project_id,
3118            client_b.clone(),
3119            client_b.user_store.clone(),
3120            lang_registry.clone(),
3121            fs.clone(),
3122            &mut cx_b.to_async(),
3123        )
3124        .await
3125        .unwrap();
3126
3127        let buffer_b1 = cx_b
3128            .background()
3129            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3130            .await
3131            .unwrap();
3132
3133        let definitions;
3134        let buffer_b2;
3135        if rng.gen() {
3136            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3137            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3138        } else {
3139            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3140            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3141        }
3142
3143        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3144        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3145            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3146                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3147                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3148            )))
3149        });
3150
3151        let buffer_b2 = buffer_b2.await.unwrap();
3152        let definitions = definitions.await.unwrap();
3153        assert_eq!(definitions.len(), 1);
3154        assert_eq!(definitions[0].buffer, buffer_b2);
3155    }
3156
3157    #[gpui::test(iterations = 10)]
3158    async fn test_collaborating_with_code_actions(
3159        cx_a: &mut TestAppContext,
3160        cx_b: &mut TestAppContext,
3161    ) {
3162        cx_a.foreground().forbid_parking();
3163        let mut lang_registry = Arc::new(LanguageRegistry::test());
3164        let fs = FakeFs::new(cx_a.background());
3165        let mut path_openers_b = Vec::new();
3166        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3167
3168        // Set up a fake language server.
3169        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3170        Arc::get_mut(&mut lang_registry)
3171            .unwrap()
3172            .add(Arc::new(Language::new(
3173                LanguageConfig {
3174                    name: "Rust".into(),
3175                    path_suffixes: vec!["rs".to_string()],
3176                    language_server: Some(language_server_config),
3177                    ..Default::default()
3178                },
3179                Some(tree_sitter_rust::language()),
3180            )));
3181
3182        // Connect to a server as 2 clients.
3183        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3184        let client_a = server.create_client(cx_a, "user_a").await;
3185        let client_b = server.create_client(cx_b, "user_b").await;
3186
3187        // Share a project as client A
3188        fs.insert_tree(
3189            "/a",
3190            json!({
3191                ".zed.toml": r#"collaborators = ["user_b"]"#,
3192                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3193                "other.rs": "pub fn foo() -> usize { 4 }",
3194            }),
3195        )
3196        .await;
3197        let project_a = cx_a.update(|cx| {
3198            Project::local(
3199                client_a.clone(),
3200                client_a.user_store.clone(),
3201                lang_registry.clone(),
3202                fs.clone(),
3203                cx,
3204            )
3205        });
3206        let (worktree_a, _) = project_a
3207            .update(cx_a, |p, cx| {
3208                p.find_or_create_local_worktree("/a", true, cx)
3209            })
3210            .await
3211            .unwrap();
3212        worktree_a
3213            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3214            .await;
3215        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3216        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3217        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3218
3219        // Join the worktree as client B.
3220        let project_b = Project::remote(
3221            project_id,
3222            client_b.clone(),
3223            client_b.user_store.clone(),
3224            lang_registry.clone(),
3225            fs.clone(),
3226            &mut cx_b.to_async(),
3227        )
3228        .await
3229        .unwrap();
3230        let mut params = cx_b.update(WorkspaceParams::test);
3231        params.languages = lang_registry.clone();
3232        params.client = client_b.client.clone();
3233        params.user_store = client_b.user_store.clone();
3234        params.project = project_b;
3235        params.path_openers = path_openers_b.into();
3236
3237        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3238        let editor_b = workspace_b
3239            .update(cx_b, |workspace, cx| {
3240                workspace.open_path((worktree_id, "main.rs").into(), cx)
3241            })
3242            .await
3243            .unwrap()
3244            .downcast::<Editor>()
3245            .unwrap();
3246
3247        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3248        fake_language_server
3249            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3250                assert_eq!(
3251                    params.text_document.uri,
3252                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3253                );
3254                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3255                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3256                None
3257            })
3258            .next()
3259            .await;
3260
3261        // Move cursor to a location that contains code actions.
3262        editor_b.update(cx_b, |editor, cx| {
3263            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3264            cx.focus(&editor_b);
3265        });
3266
3267        fake_language_server
3268            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3269                assert_eq!(
3270                    params.text_document.uri,
3271                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3272                );
3273                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3274                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3275
3276                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3277                    lsp::CodeAction {
3278                        title: "Inline into all callers".to_string(),
3279                        edit: Some(lsp::WorkspaceEdit {
3280                            changes: Some(
3281                                [
3282                                    (
3283                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3284                                        vec![lsp::TextEdit::new(
3285                                            lsp::Range::new(
3286                                                lsp::Position::new(1, 22),
3287                                                lsp::Position::new(1, 34),
3288                                            ),
3289                                            "4".to_string(),
3290                                        )],
3291                                    ),
3292                                    (
3293                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3294                                        vec![lsp::TextEdit::new(
3295                                            lsp::Range::new(
3296                                                lsp::Position::new(0, 0),
3297                                                lsp::Position::new(0, 27),
3298                                            ),
3299                                            "".to_string(),
3300                                        )],
3301                                    ),
3302                                ]
3303                                .into_iter()
3304                                .collect(),
3305                            ),
3306                            ..Default::default()
3307                        }),
3308                        data: Some(json!({
3309                            "codeActionParams": {
3310                                "range": {
3311                                    "start": {"line": 1, "column": 31},
3312                                    "end": {"line": 1, "column": 31},
3313                                }
3314                            }
3315                        })),
3316                        ..Default::default()
3317                    },
3318                )])
3319            })
3320            .next()
3321            .await;
3322
3323        // Toggle code actions and wait for them to display.
3324        editor_b.update(cx_b, |editor, cx| {
3325            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3326        });
3327        editor_b
3328            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3329            .await;
3330
3331        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3332
3333        // Confirming the code action will trigger a resolve request.
3334        let confirm_action = workspace_b
3335            .update(cx_b, |workspace, cx| {
3336                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3337            })
3338            .unwrap();
3339        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3340            lsp::CodeAction {
3341                title: "Inline into all callers".to_string(),
3342                edit: Some(lsp::WorkspaceEdit {
3343                    changes: Some(
3344                        [
3345                            (
3346                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3347                                vec![lsp::TextEdit::new(
3348                                    lsp::Range::new(
3349                                        lsp::Position::new(1, 22),
3350                                        lsp::Position::new(1, 34),
3351                                    ),
3352                                    "4".to_string(),
3353                                )],
3354                            ),
3355                            (
3356                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3357                                vec![lsp::TextEdit::new(
3358                                    lsp::Range::new(
3359                                        lsp::Position::new(0, 0),
3360                                        lsp::Position::new(0, 27),
3361                                    ),
3362                                    "".to_string(),
3363                                )],
3364                            ),
3365                        ]
3366                        .into_iter()
3367                        .collect(),
3368                    ),
3369                    ..Default::default()
3370                }),
3371                ..Default::default()
3372            }
3373        });
3374
3375        // After the action is confirmed, an editor containing both modified files is opened.
3376        confirm_action.await.unwrap();
3377        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3378            workspace
3379                .active_item(cx)
3380                .unwrap()
3381                .downcast::<Editor>()
3382                .unwrap()
3383        });
3384        code_action_editor.update(cx_b, |editor, cx| {
3385            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3386            editor.undo(&Undo, cx);
3387            assert_eq!(
3388                editor.text(cx),
3389                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3390            );
3391            editor.redo(&Redo, cx);
3392            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3393        });
3394    }
3395
3396    #[gpui::test(iterations = 10)]
3397    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3398        cx_a.foreground().forbid_parking();
3399        let mut lang_registry = Arc::new(LanguageRegistry::test());
3400        let fs = FakeFs::new(cx_a.background());
3401        let mut path_openers_b = Vec::new();
3402        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3403
3404        // Set up a fake language server.
3405        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3406        Arc::get_mut(&mut lang_registry)
3407            .unwrap()
3408            .add(Arc::new(Language::new(
3409                LanguageConfig {
3410                    name: "Rust".into(),
3411                    path_suffixes: vec!["rs".to_string()],
3412                    language_server: Some(language_server_config),
3413                    ..Default::default()
3414                },
3415                Some(tree_sitter_rust::language()),
3416            )));
3417
3418        // Connect to a server as 2 clients.
3419        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3420        let client_a = server.create_client(cx_a, "user_a").await;
3421        let client_b = server.create_client(cx_b, "user_b").await;
3422
3423        // Share a project as client A
3424        fs.insert_tree(
3425            "/dir",
3426            json!({
3427                ".zed.toml": r#"collaborators = ["user_b"]"#,
3428                "one.rs": "const ONE: usize = 1;",
3429                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3430            }),
3431        )
3432        .await;
3433        let project_a = cx_a.update(|cx| {
3434            Project::local(
3435                client_a.clone(),
3436                client_a.user_store.clone(),
3437                lang_registry.clone(),
3438                fs.clone(),
3439                cx,
3440            )
3441        });
3442        let (worktree_a, _) = project_a
3443            .update(cx_a, |p, cx| {
3444                p.find_or_create_local_worktree("/dir", true, cx)
3445            })
3446            .await
3447            .unwrap();
3448        worktree_a
3449            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3450            .await;
3451        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3452        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3453        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3454
3455        // Join the worktree as client B.
3456        let project_b = Project::remote(
3457            project_id,
3458            client_b.clone(),
3459            client_b.user_store.clone(),
3460            lang_registry.clone(),
3461            fs.clone(),
3462            &mut cx_b.to_async(),
3463        )
3464        .await
3465        .unwrap();
3466        let mut params = cx_b.update(WorkspaceParams::test);
3467        params.languages = lang_registry.clone();
3468        params.client = client_b.client.clone();
3469        params.user_store = client_b.user_store.clone();
3470        params.project = project_b;
3471        params.path_openers = path_openers_b.into();
3472
3473        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3474        let editor_b = workspace_b
3475            .update(cx_b, |workspace, cx| {
3476                workspace.open_path((worktree_id, "one.rs").into(), cx)
3477            })
3478            .await
3479            .unwrap()
3480            .downcast::<Editor>()
3481            .unwrap();
3482        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3483
3484        // Move cursor to a location that can be renamed.
3485        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3486            editor.select_ranges([7..7], None, cx);
3487            editor.rename(&Rename, cx).unwrap()
3488        });
3489
3490        fake_language_server
3491            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3492                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3493                assert_eq!(params.position, lsp::Position::new(0, 7));
3494                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3495                    lsp::Position::new(0, 6),
3496                    lsp::Position::new(0, 9),
3497                )))
3498            })
3499            .next()
3500            .await
3501            .unwrap();
3502        prepare_rename.await.unwrap();
3503        editor_b.update(cx_b, |editor, cx| {
3504            let rename = editor.pending_rename().unwrap();
3505            let buffer = editor.buffer().read(cx).snapshot(cx);
3506            assert_eq!(
3507                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3508                6..9
3509            );
3510            rename.editor.update(cx, |rename_editor, cx| {
3511                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3512                    rename_buffer.edit([0..3], "THREE", cx);
3513                });
3514            });
3515        });
3516
3517        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3518            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3519        });
3520        fake_language_server
3521            .handle_request::<lsp::request::Rename, _>(|params, _| {
3522                assert_eq!(
3523                    params.text_document_position.text_document.uri.as_str(),
3524                    "file:///dir/one.rs"
3525                );
3526                assert_eq!(
3527                    params.text_document_position.position,
3528                    lsp::Position::new(0, 6)
3529                );
3530                assert_eq!(params.new_name, "THREE");
3531                Some(lsp::WorkspaceEdit {
3532                    changes: Some(
3533                        [
3534                            (
3535                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3536                                vec![lsp::TextEdit::new(
3537                                    lsp::Range::new(
3538                                        lsp::Position::new(0, 6),
3539                                        lsp::Position::new(0, 9),
3540                                    ),
3541                                    "THREE".to_string(),
3542                                )],
3543                            ),
3544                            (
3545                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3546                                vec![
3547                                    lsp::TextEdit::new(
3548                                        lsp::Range::new(
3549                                            lsp::Position::new(0, 24),
3550                                            lsp::Position::new(0, 27),
3551                                        ),
3552                                        "THREE".to_string(),
3553                                    ),
3554                                    lsp::TextEdit::new(
3555                                        lsp::Range::new(
3556                                            lsp::Position::new(0, 35),
3557                                            lsp::Position::new(0, 38),
3558                                        ),
3559                                        "THREE".to_string(),
3560                                    ),
3561                                ],
3562                            ),
3563                        ]
3564                        .into_iter()
3565                        .collect(),
3566                    ),
3567                    ..Default::default()
3568                })
3569            })
3570            .next()
3571            .await
3572            .unwrap();
3573        confirm_rename.await.unwrap();
3574
3575        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3576            workspace
3577                .active_item(cx)
3578                .unwrap()
3579                .downcast::<Editor>()
3580                .unwrap()
3581        });
3582        rename_editor.update(cx_b, |editor, cx| {
3583            assert_eq!(
3584                editor.text(cx),
3585                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3586            );
3587            editor.undo(&Undo, cx);
3588            assert_eq!(
3589                editor.text(cx),
3590                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3591            );
3592            editor.redo(&Redo, cx);
3593            assert_eq!(
3594                editor.text(cx),
3595                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3596            );
3597        });
3598
3599        // Ensure temporary rename edits cannot be undone/redone.
3600        editor_b.update(cx_b, |editor, cx| {
3601            editor.undo(&Undo, cx);
3602            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3603            editor.undo(&Undo, cx);
3604            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3605            editor.redo(&Redo, cx);
3606            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3607        })
3608    }
3609
3610    #[gpui::test(iterations = 10)]
3611    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3612        cx_a.foreground().forbid_parking();
3613
3614        // Connect to a server as 2 clients.
3615        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3616        let client_a = server.create_client(cx_a, "user_a").await;
3617        let client_b = server.create_client(cx_b, "user_b").await;
3618
3619        // Create an org that includes these 2 users.
3620        let db = &server.app_state.db;
3621        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3622        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3623            .await
3624            .unwrap();
3625        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3626            .await
3627            .unwrap();
3628
3629        // Create a channel that includes all the users.
3630        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3631        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3632            .await
3633            .unwrap();
3634        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3635            .await
3636            .unwrap();
3637        db.create_channel_message(
3638            channel_id,
3639            client_b.current_user_id(&cx_b),
3640            "hello A, it's B.",
3641            OffsetDateTime::now_utc(),
3642            1,
3643        )
3644        .await
3645        .unwrap();
3646
3647        let channels_a = cx_a
3648            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3649        channels_a
3650            .condition(cx_a, |list, _| list.available_channels().is_some())
3651            .await;
3652        channels_a.read_with(cx_a, |list, _| {
3653            assert_eq!(
3654                list.available_channels().unwrap(),
3655                &[ChannelDetails {
3656                    id: channel_id.to_proto(),
3657                    name: "test-channel".to_string()
3658                }]
3659            )
3660        });
3661        let channel_a = channels_a.update(cx_a, |this, cx| {
3662            this.get_channel(channel_id.to_proto(), cx).unwrap()
3663        });
3664        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3665        channel_a
3666            .condition(&cx_a, |channel, _| {
3667                channel_messages(channel)
3668                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3669            })
3670            .await;
3671
3672        let channels_b = cx_b
3673            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3674        channels_b
3675            .condition(cx_b, |list, _| list.available_channels().is_some())
3676            .await;
3677        channels_b.read_with(cx_b, |list, _| {
3678            assert_eq!(
3679                list.available_channels().unwrap(),
3680                &[ChannelDetails {
3681                    id: channel_id.to_proto(),
3682                    name: "test-channel".to_string()
3683                }]
3684            )
3685        });
3686
3687        let channel_b = channels_b.update(cx_b, |this, cx| {
3688            this.get_channel(channel_id.to_proto(), cx).unwrap()
3689        });
3690        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3691        channel_b
3692            .condition(&cx_b, |channel, _| {
3693                channel_messages(channel)
3694                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3695            })
3696            .await;
3697
3698        channel_a
3699            .update(cx_a, |channel, cx| {
3700                channel
3701                    .send_message("oh, hi B.".to_string(), cx)
3702                    .unwrap()
3703                    .detach();
3704                let task = channel.send_message("sup".to_string(), cx).unwrap();
3705                assert_eq!(
3706                    channel_messages(channel),
3707                    &[
3708                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3709                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3710                        ("user_a".to_string(), "sup".to_string(), true)
3711                    ]
3712                );
3713                task
3714            })
3715            .await
3716            .unwrap();
3717
3718        channel_b
3719            .condition(&cx_b, |channel, _| {
3720                channel_messages(channel)
3721                    == [
3722                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3723                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3724                        ("user_a".to_string(), "sup".to_string(), false),
3725                    ]
3726            })
3727            .await;
3728
3729        assert_eq!(
3730            server
3731                .state()
3732                .await
3733                .channel(channel_id)
3734                .unwrap()
3735                .connection_ids
3736                .len(),
3737            2
3738        );
3739        cx_b.update(|_| drop(channel_b));
3740        server
3741            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3742            .await;
3743
3744        cx_a.update(|_| drop(channel_a));
3745        server
3746            .condition(|state| state.channel(channel_id).is_none())
3747            .await;
3748    }
3749
3750    #[gpui::test(iterations = 10)]
3751    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3752        cx_a.foreground().forbid_parking();
3753
3754        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3755        let client_a = server.create_client(cx_a, "user_a").await;
3756
3757        let db = &server.app_state.db;
3758        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3759        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3760        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3761            .await
3762            .unwrap();
3763        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3764            .await
3765            .unwrap();
3766
3767        let channels_a = cx_a
3768            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3769        channels_a
3770            .condition(cx_a, |list, _| list.available_channels().is_some())
3771            .await;
3772        let channel_a = channels_a.update(cx_a, |this, cx| {
3773            this.get_channel(channel_id.to_proto(), cx).unwrap()
3774        });
3775
3776        // Messages aren't allowed to be too long.
3777        channel_a
3778            .update(cx_a, |channel, cx| {
3779                let long_body = "this is long.\n".repeat(1024);
3780                channel.send_message(long_body, cx).unwrap()
3781            })
3782            .await
3783            .unwrap_err();
3784
3785        // Messages aren't allowed to be blank.
3786        channel_a.update(cx_a, |channel, cx| {
3787            channel.send_message(String::new(), cx).unwrap_err()
3788        });
3789
3790        // Leading and trailing whitespace are trimmed.
3791        channel_a
3792            .update(cx_a, |channel, cx| {
3793                channel
3794                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3795                    .unwrap()
3796            })
3797            .await
3798            .unwrap();
3799        assert_eq!(
3800            db.get_channel_messages(channel_id, 10, None)
3801                .await
3802                .unwrap()
3803                .iter()
3804                .map(|m| &m.body)
3805                .collect::<Vec<_>>(),
3806            &["surrounded by whitespace"]
3807        );
3808    }
3809
3810    #[gpui::test(iterations = 10)]
3811    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3812        cx_a.foreground().forbid_parking();
3813
3814        // Connect to a server as 2 clients.
3815        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3816        let client_a = server.create_client(cx_a, "user_a").await;
3817        let client_b = server.create_client(cx_b, "user_b").await;
3818        let mut status_b = client_b.status();
3819
3820        // Create an org that includes these 2 users.
3821        let db = &server.app_state.db;
3822        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3823        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3824            .await
3825            .unwrap();
3826        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3827            .await
3828            .unwrap();
3829
3830        // Create a channel that includes all the users.
3831        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3832        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3833            .await
3834            .unwrap();
3835        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3836            .await
3837            .unwrap();
3838        db.create_channel_message(
3839            channel_id,
3840            client_b.current_user_id(&cx_b),
3841            "hello A, it's B.",
3842            OffsetDateTime::now_utc(),
3843            2,
3844        )
3845        .await
3846        .unwrap();
3847
3848        let channels_a = cx_a
3849            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3850        channels_a
3851            .condition(cx_a, |list, _| list.available_channels().is_some())
3852            .await;
3853
3854        channels_a.read_with(cx_a, |list, _| {
3855            assert_eq!(
3856                list.available_channels().unwrap(),
3857                &[ChannelDetails {
3858                    id: channel_id.to_proto(),
3859                    name: "test-channel".to_string()
3860                }]
3861            )
3862        });
3863        let channel_a = channels_a.update(cx_a, |this, cx| {
3864            this.get_channel(channel_id.to_proto(), cx).unwrap()
3865        });
3866        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3867        channel_a
3868            .condition(&cx_a, |channel, _| {
3869                channel_messages(channel)
3870                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3871            })
3872            .await;
3873
3874        let channels_b = cx_b
3875            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3876        channels_b
3877            .condition(cx_b, |list, _| list.available_channels().is_some())
3878            .await;
3879        channels_b.read_with(cx_b, |list, _| {
3880            assert_eq!(
3881                list.available_channels().unwrap(),
3882                &[ChannelDetails {
3883                    id: channel_id.to_proto(),
3884                    name: "test-channel".to_string()
3885                }]
3886            )
3887        });
3888
3889        let channel_b = channels_b.update(cx_b, |this, cx| {
3890            this.get_channel(channel_id.to_proto(), cx).unwrap()
3891        });
3892        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3893        channel_b
3894            .condition(&cx_b, |channel, _| {
3895                channel_messages(channel)
3896                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3897            })
3898            .await;
3899
3900        // Disconnect client B, ensuring we can still access its cached channel data.
3901        server.forbid_connections();
3902        server.disconnect_client(client_b.current_user_id(&cx_b));
3903        cx_b.foreground().advance_clock(Duration::from_secs(3));
3904        while !matches!(
3905            status_b.next().await,
3906            Some(client::Status::ReconnectionError { .. })
3907        ) {}
3908
3909        channels_b.read_with(cx_b, |channels, _| {
3910            assert_eq!(
3911                channels.available_channels().unwrap(),
3912                [ChannelDetails {
3913                    id: channel_id.to_proto(),
3914                    name: "test-channel".to_string()
3915                }]
3916            )
3917        });
3918        channel_b.read_with(cx_b, |channel, _| {
3919            assert_eq!(
3920                channel_messages(channel),
3921                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3922            )
3923        });
3924
3925        // Send a message from client B while it is disconnected.
3926        channel_b
3927            .update(cx_b, |channel, cx| {
3928                let task = channel
3929                    .send_message("can you see this?".to_string(), cx)
3930                    .unwrap();
3931                assert_eq!(
3932                    channel_messages(channel),
3933                    &[
3934                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3935                        ("user_b".to_string(), "can you see this?".to_string(), true)
3936                    ]
3937                );
3938                task
3939            })
3940            .await
3941            .unwrap_err();
3942
3943        // Send a message from client A while B is disconnected.
3944        channel_a
3945            .update(cx_a, |channel, cx| {
3946                channel
3947                    .send_message("oh, hi B.".to_string(), cx)
3948                    .unwrap()
3949                    .detach();
3950                let task = channel.send_message("sup".to_string(), cx).unwrap();
3951                assert_eq!(
3952                    channel_messages(channel),
3953                    &[
3954                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3955                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3956                        ("user_a".to_string(), "sup".to_string(), true)
3957                    ]
3958                );
3959                task
3960            })
3961            .await
3962            .unwrap();
3963
3964        // Give client B a chance to reconnect.
3965        server.allow_connections();
3966        cx_b.foreground().advance_clock(Duration::from_secs(10));
3967
3968        // Verify that B sees the new messages upon reconnection, as well as the message client B
3969        // sent while offline.
3970        channel_b
3971            .condition(&cx_b, |channel, _| {
3972                channel_messages(channel)
3973                    == [
3974                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3975                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3976                        ("user_a".to_string(), "sup".to_string(), false),
3977                        ("user_b".to_string(), "can you see this?".to_string(), false),
3978                    ]
3979            })
3980            .await;
3981
3982        // Ensure client A and B can communicate normally after reconnection.
3983        channel_a
3984            .update(cx_a, |channel, cx| {
3985                channel.send_message("you online?".to_string(), cx).unwrap()
3986            })
3987            .await
3988            .unwrap();
3989        channel_b
3990            .condition(&cx_b, |channel, _| {
3991                channel_messages(channel)
3992                    == [
3993                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3994                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3995                        ("user_a".to_string(), "sup".to_string(), false),
3996                        ("user_b".to_string(), "can you see this?".to_string(), false),
3997                        ("user_a".to_string(), "you online?".to_string(), false),
3998                    ]
3999            })
4000            .await;
4001
4002        channel_b
4003            .update(cx_b, |channel, cx| {
4004                channel.send_message("yep".to_string(), cx).unwrap()
4005            })
4006            .await
4007            .unwrap();
4008        channel_a
4009            .condition(&cx_a, |channel, _| {
4010                channel_messages(channel)
4011                    == [
4012                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4013                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4014                        ("user_a".to_string(), "sup".to_string(), false),
4015                        ("user_b".to_string(), "can you see this?".to_string(), false),
4016                        ("user_a".to_string(), "you online?".to_string(), false),
4017                        ("user_b".to_string(), "yep".to_string(), false),
4018                    ]
4019            })
4020            .await;
4021    }
4022
4023    #[gpui::test(iterations = 10)]
4024    async fn test_contacts(
4025        cx_a: &mut TestAppContext,
4026        cx_b: &mut TestAppContext,
4027        cx_c: &mut TestAppContext,
4028    ) {
4029        cx_a.foreground().forbid_parking();
4030        let lang_registry = Arc::new(LanguageRegistry::test());
4031        let fs = FakeFs::new(cx_a.background());
4032
4033        // Connect to a server as 3 clients.
4034        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4035        let client_a = server.create_client(cx_a, "user_a").await;
4036        let client_b = server.create_client(cx_b, "user_b").await;
4037        let client_c = server.create_client(cx_c, "user_c").await;
4038
4039        // Share a worktree as client A.
4040        fs.insert_tree(
4041            "/a",
4042            json!({
4043                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4044            }),
4045        )
4046        .await;
4047
4048        let project_a = cx_a.update(|cx| {
4049            Project::local(
4050                client_a.clone(),
4051                client_a.user_store.clone(),
4052                lang_registry.clone(),
4053                fs.clone(),
4054                cx,
4055            )
4056        });
4057        let (worktree_a, _) = project_a
4058            .update(cx_a, |p, cx| {
4059                p.find_or_create_local_worktree("/a", true, cx)
4060            })
4061            .await
4062            .unwrap();
4063        worktree_a
4064            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4065            .await;
4066
4067        client_a
4068            .user_store
4069            .condition(&cx_a, |user_store, _| {
4070                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4071            })
4072            .await;
4073        client_b
4074            .user_store
4075            .condition(&cx_b, |user_store, _| {
4076                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4077            })
4078            .await;
4079        client_c
4080            .user_store
4081            .condition(&cx_c, |user_store, _| {
4082                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4083            })
4084            .await;
4085
4086        let project_id = project_a
4087            .update(cx_a, |project, _| project.next_remote_id())
4088            .await;
4089        project_a
4090            .update(cx_a, |project, cx| project.share(cx))
4091            .await
4092            .unwrap();
4093
4094        let _project_b = Project::remote(
4095            project_id,
4096            client_b.clone(),
4097            client_b.user_store.clone(),
4098            lang_registry.clone(),
4099            fs.clone(),
4100            &mut cx_b.to_async(),
4101        )
4102        .await
4103        .unwrap();
4104
4105        client_a
4106            .user_store
4107            .condition(&cx_a, |user_store, _| {
4108                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4109            })
4110            .await;
4111        client_b
4112            .user_store
4113            .condition(&cx_b, |user_store, _| {
4114                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4115            })
4116            .await;
4117        client_c
4118            .user_store
4119            .condition(&cx_c, |user_store, _| {
4120                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4121            })
4122            .await;
4123
4124        project_a
4125            .condition(&cx_a, |project, _| {
4126                project.collaborators().contains_key(&client_b.peer_id)
4127            })
4128            .await;
4129
4130        cx_a.update(move |_| drop(project_a));
4131        client_a
4132            .user_store
4133            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4134            .await;
4135        client_b
4136            .user_store
4137            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4138            .await;
4139        client_c
4140            .user_store
4141            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4142            .await;
4143
4144        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4145            user_store
4146                .contacts()
4147                .iter()
4148                .map(|contact| {
4149                    let worktrees = contact
4150                        .projects
4151                        .iter()
4152                        .map(|p| {
4153                            (
4154                                p.worktree_root_names[0].as_str(),
4155                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4156                            )
4157                        })
4158                        .collect();
4159                    (contact.user.github_login.as_str(), worktrees)
4160                })
4161                .collect()
4162        }
4163    }
4164
4165    #[gpui::test(iterations = 100)]
4166    async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4167        cx.foreground().forbid_parking();
4168        let max_peers = env::var("MAX_PEERS")
4169            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4170            .unwrap_or(5);
4171        let max_operations = env::var("OPERATIONS")
4172            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4173            .unwrap_or(10);
4174
4175        let rng = Arc::new(Mutex::new(rng));
4176
4177        let guest_lang_registry = Arc::new(LanguageRegistry::test());
4178        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4179
4180        let fs = FakeFs::new(cx.background());
4181        fs.insert_tree(
4182            "/_collab",
4183            json!({
4184                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4185            }),
4186        )
4187        .await;
4188
4189        let operations = Rc::new(Cell::new(0));
4190        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4191        let mut clients = Vec::new();
4192
4193        let mut next_entity_id = 100000;
4194        let mut host_cx = TestAppContext::new(
4195            cx.foreground_platform(),
4196            cx.platform(),
4197            cx.foreground(),
4198            cx.background(),
4199            cx.font_cache(),
4200            cx.leak_detector(),
4201            next_entity_id,
4202        );
4203        let host = server.create_client(&mut host_cx, "host").await;
4204        let host_project = host_cx.update(|cx| {
4205            Project::local(
4206                host.client.clone(),
4207                host.user_store.clone(),
4208                Arc::new(LanguageRegistry::test()),
4209                fs.clone(),
4210                cx,
4211            )
4212        });
4213        let host_project_id = host_project
4214            .update(&mut host_cx, |p, _| p.next_remote_id())
4215            .await;
4216
4217        let (collab_worktree, _) = host_project
4218            .update(&mut host_cx, |project, cx| {
4219                project.find_or_create_local_worktree("/_collab", true, cx)
4220            })
4221            .await
4222            .unwrap();
4223        collab_worktree
4224            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4225            .await;
4226        host_project
4227            .update(&mut host_cx, |project, cx| project.share(cx))
4228            .await
4229            .unwrap();
4230
4231        clients.push(cx.foreground().spawn(host.simulate_host(
4232            host_project,
4233            language_server_config,
4234            operations.clone(),
4235            max_operations,
4236            rng.clone(),
4237            host_cx,
4238        )));
4239
4240        while operations.get() < max_operations {
4241            cx.background().simulate_random_delay().await;
4242            if clients.len() >= max_peers {
4243                break;
4244            } else if rng.lock().gen_bool(0.05) {
4245                operations.set(operations.get() + 1);
4246
4247                let guest_id = clients.len();
4248                log::info!("Adding guest {}", guest_id);
4249                next_entity_id += 100000;
4250                let mut guest_cx = TestAppContext::new(
4251                    cx.foreground_platform(),
4252                    cx.platform(),
4253                    cx.foreground(),
4254                    cx.background(),
4255                    cx.font_cache(),
4256                    cx.leak_detector(),
4257                    next_entity_id,
4258                );
4259                let guest = server
4260                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4261                    .await;
4262                let guest_project = Project::remote(
4263                    host_project_id,
4264                    guest.client.clone(),
4265                    guest.user_store.clone(),
4266                    guest_lang_registry.clone(),
4267                    FakeFs::new(cx.background()),
4268                    &mut guest_cx.to_async(),
4269                )
4270                .await
4271                .unwrap();
4272                clients.push(cx.foreground().spawn(guest.simulate_guest(
4273                    guest_id,
4274                    guest_project,
4275                    operations.clone(),
4276                    max_operations,
4277                    rng.clone(),
4278                    guest_cx,
4279                )));
4280
4281                log::info!("Guest {} added", guest_id);
4282            }
4283        }
4284
4285        let mut clients = futures::future::join_all(clients).await;
4286        cx.foreground().run_until_parked();
4287
4288        let (host_client, mut host_cx) = clients.remove(0);
4289        let host_project = host_client.project.as_ref().unwrap();
4290        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4291            project
4292                .worktrees(cx)
4293                .map(|worktree| {
4294                    let snapshot = worktree.read(cx).snapshot();
4295                    (snapshot.id(), snapshot)
4296                })
4297                .collect::<BTreeMap<_, _>>()
4298        });
4299
4300        host_client
4301            .project
4302            .as_ref()
4303            .unwrap()
4304            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4305
4306        for (guest_client, mut guest_cx) in clients.into_iter() {
4307            let guest_id = guest_client.client.id();
4308            let worktree_snapshots =
4309                guest_client
4310                    .project
4311                    .as_ref()
4312                    .unwrap()
4313                    .read_with(&guest_cx, |project, cx| {
4314                        project
4315                            .worktrees(cx)
4316                            .map(|worktree| {
4317                                let worktree = worktree.read(cx);
4318                                (worktree.id(), worktree.snapshot())
4319                            })
4320                            .collect::<BTreeMap<_, _>>()
4321                    });
4322
4323            assert_eq!(
4324                worktree_snapshots.keys().collect::<Vec<_>>(),
4325                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4326                "guest {} has different worktrees than the host",
4327                guest_id
4328            );
4329            for (id, host_snapshot) in &host_worktree_snapshots {
4330                let guest_snapshot = &worktree_snapshots[id];
4331                assert_eq!(
4332                    guest_snapshot.root_name(),
4333                    host_snapshot.root_name(),
4334                    "guest {} has different root name than the host for worktree {}",
4335                    guest_id,
4336                    id
4337                );
4338                assert_eq!(
4339                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4340                    host_snapshot.entries(false).collect::<Vec<_>>(),
4341                    "guest {} has different snapshot than the host for worktree {}",
4342                    guest_id,
4343                    id
4344                );
4345            }
4346
4347            guest_client
4348                .project
4349                .as_ref()
4350                .unwrap()
4351                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4352
4353            for guest_buffer in &guest_client.buffers {
4354                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4355                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4356                    project.buffer_for_id(buffer_id, cx).expect(&format!(
4357                        "host does not have buffer for guest:{}, peer:{}, id:{}",
4358                        guest_id, guest_client.peer_id, buffer_id
4359                    ))
4360                });
4361                let path = host_buffer
4362                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4363
4364                assert_eq!(
4365                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4366                    0,
4367                    "guest {}, buffer {}, path {:?} has deferred operations",
4368                    guest_id,
4369                    buffer_id,
4370                    path,
4371                );
4372                assert_eq!(
4373                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4374                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4375                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4376                    guest_id,
4377                    buffer_id,
4378                    path
4379                );
4380            }
4381
4382            guest_cx.update(|_| drop(guest_client));
4383        }
4384
4385        host_cx.update(|_| drop(host_client));
4386    }
4387
4388    struct TestServer {
4389        peer: Arc<Peer>,
4390        app_state: Arc<AppState>,
4391        server: Arc<Server>,
4392        foreground: Rc<executor::Foreground>,
4393        notifications: mpsc::UnboundedReceiver<()>,
4394        connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4395        forbid_connections: Arc<AtomicBool>,
4396        _test_db: TestDb,
4397    }
4398
4399    impl TestServer {
4400        async fn start(
4401            foreground: Rc<executor::Foreground>,
4402            background: Arc<executor::Background>,
4403        ) -> Self {
4404            let test_db = TestDb::fake(background);
4405            let app_state = Self::build_app_state(&test_db).await;
4406            let peer = Peer::new();
4407            let notifications = mpsc::unbounded();
4408            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4409            Self {
4410                peer,
4411                app_state,
4412                server,
4413                foreground,
4414                notifications: notifications.1,
4415                connection_killers: Default::default(),
4416                forbid_connections: Default::default(),
4417                _test_db: test_db,
4418            }
4419        }
4420
4421        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4422            let http = FakeHttpClient::with_404_response();
4423            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4424            let client_name = name.to_string();
4425            let mut client = Client::new(http.clone());
4426            let server = self.server.clone();
4427            let connection_killers = self.connection_killers.clone();
4428            let forbid_connections = self.forbid_connections.clone();
4429            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4430
4431            Arc::get_mut(&mut client)
4432                .unwrap()
4433                .override_authenticate(move |cx| {
4434                    cx.spawn(|_| async move {
4435                        let access_token = "the-token".to_string();
4436                        Ok(Credentials {
4437                            user_id: user_id.0 as u64,
4438                            access_token,
4439                        })
4440                    })
4441                })
4442                .override_establish_connection(move |credentials, cx| {
4443                    assert_eq!(credentials.user_id, user_id.0 as u64);
4444                    assert_eq!(credentials.access_token, "the-token");
4445
4446                    let server = server.clone();
4447                    let connection_killers = connection_killers.clone();
4448                    let forbid_connections = forbid_connections.clone();
4449                    let client_name = client_name.clone();
4450                    let connection_id_tx = connection_id_tx.clone();
4451                    cx.spawn(move |cx| async move {
4452                        if forbid_connections.load(SeqCst) {
4453                            Err(EstablishConnectionError::other(anyhow!(
4454                                "server is forbidding connections"
4455                            )))
4456                        } else {
4457                            let (client_conn, server_conn, kill_conn) =
4458                                Connection::in_memory(cx.background());
4459                            connection_killers.lock().insert(user_id, kill_conn);
4460                            cx.background()
4461                                .spawn(server.handle_connection(
4462                                    server_conn,
4463                                    client_name,
4464                                    user_id,
4465                                    Some(connection_id_tx),
4466                                    cx.background(),
4467                                ))
4468                                .detach();
4469                            Ok(client_conn)
4470                        }
4471                    })
4472                });
4473
4474            client
4475                .authenticate_and_connect(&cx.to_async())
4476                .await
4477                .unwrap();
4478
4479            Channel::init(&client);
4480            Project::init(&client);
4481
4482            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4483            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4484            let mut authed_user =
4485                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4486            while authed_user.next().await.unwrap().is_none() {}
4487
4488            TestClient {
4489                client,
4490                peer_id,
4491                user_store,
4492                project: Default::default(),
4493                buffers: Default::default(),
4494            }
4495        }
4496
4497        fn disconnect_client(&self, user_id: UserId) {
4498            self.connection_killers.lock().remove(&user_id);
4499        }
4500
4501        fn forbid_connections(&self) {
4502            self.forbid_connections.store(true, SeqCst);
4503        }
4504
4505        fn allow_connections(&self) {
4506            self.forbid_connections.store(false, SeqCst);
4507        }
4508
4509        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4510            let mut config = Config::default();
4511            config.session_secret = "a".repeat(32);
4512            config.database_url = test_db.url.clone();
4513            let github_client = github::AppClient::test();
4514            Arc::new(AppState {
4515                db: test_db.db().clone(),
4516                handlebars: Default::default(),
4517                auth_client: auth::build_client("", ""),
4518                repo_client: github::RepoClient::test(&github_client),
4519                github_client,
4520                config,
4521            })
4522        }
4523
4524        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4525            self.server.store.read()
4526        }
4527
4528        async fn condition<F>(&mut self, mut predicate: F)
4529        where
4530            F: FnMut(&Store) -> bool,
4531        {
4532            async_std::future::timeout(Duration::from_millis(500), async {
4533                while !(predicate)(&*self.server.store.read()) {
4534                    self.foreground.start_waiting();
4535                    self.notifications.next().await;
4536                    self.foreground.finish_waiting();
4537                }
4538            })
4539            .await
4540            .expect("condition timed out");
4541        }
4542    }
4543
4544    impl Drop for TestServer {
4545        fn drop(&mut self) {
4546            self.peer.reset();
4547        }
4548    }
4549
4550    struct TestClient {
4551        client: Arc<Client>,
4552        pub peer_id: PeerId,
4553        pub user_store: ModelHandle<UserStore>,
4554        project: Option<ModelHandle<Project>>,
4555        buffers: HashSet<ModelHandle<language::Buffer>>,
4556    }
4557
4558    impl Deref for TestClient {
4559        type Target = Arc<Client>;
4560
4561        fn deref(&self) -> &Self::Target {
4562            &self.client
4563        }
4564    }
4565
4566    impl TestClient {
4567        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4568            UserId::from_proto(
4569                self.user_store
4570                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4571            )
4572        }
4573
4574        fn simulate_host(
4575            mut self,
4576            project: ModelHandle<Project>,
4577            mut language_server_config: LanguageServerConfig,
4578            operations: Rc<Cell<usize>>,
4579            max_operations: usize,
4580            rng: Arc<Mutex<StdRng>>,
4581            mut cx: TestAppContext,
4582        ) -> impl Future<Output = (Self, TestAppContext)> {
4583            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4584
4585            // Set up a fake language server.
4586            language_server_config.set_fake_initializer({
4587                let rng = rng.clone();
4588                let files = files.clone();
4589                let project = project.downgrade();
4590                move |fake_server| {
4591                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4592                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4593                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4594                                range: lsp::Range::new(
4595                                    lsp::Position::new(0, 0),
4596                                    lsp::Position::new(0, 0),
4597                                ),
4598                                new_text: "the-new-text".to_string(),
4599                            })),
4600                            ..Default::default()
4601                        }]))
4602                    });
4603
4604                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4605                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4606                            lsp::CodeAction {
4607                                title: "the-code-action".to_string(),
4608                                ..Default::default()
4609                            },
4610                        )])
4611                    });
4612
4613                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4614                        |params, _| {
4615                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4616                                params.position,
4617                                params.position,
4618                            )))
4619                        },
4620                    );
4621
4622                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4623                        let files = files.clone();
4624                        let rng = rng.clone();
4625                        move |_, _| {
4626                            let files = files.lock();
4627                            let mut rng = rng.lock();
4628                            let count = rng.gen_range::<usize, _>(1..3);
4629                            let files = (0..count)
4630                                .map(|_| files.choose(&mut *rng).unwrap())
4631                                .collect::<Vec<_>>();
4632                            log::info!("LSP: Returning definitions in files {:?}", &files);
4633                            Some(lsp::GotoDefinitionResponse::Array(
4634                                files
4635                                    .into_iter()
4636                                    .map(|file| lsp::Location {
4637                                        uri: lsp::Url::from_file_path(file).unwrap(),
4638                                        range: Default::default(),
4639                                    })
4640                                    .collect(),
4641                            ))
4642                        }
4643                    });
4644
4645                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4646                        let rng = rng.clone();
4647                        let project = project.clone();
4648                        move |params, mut cx| {
4649                            if let Some(project) = project.upgrade(&cx) {
4650                                project.update(&mut cx, |project, cx| {
4651                                    let path = params
4652                                        .text_document_position_params
4653                                        .text_document
4654                                        .uri
4655                                        .to_file_path()
4656                                        .unwrap();
4657                                    let (worktree, relative_path) =
4658                                        project.find_local_worktree(&path, cx)?;
4659                                    let project_path =
4660                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
4661                                    let buffer =
4662                                        project.get_open_buffer(&project_path, cx)?.read(cx);
4663
4664                                    let mut highlights = Vec::new();
4665                                    let highlight_count = rng.lock().gen_range(1..=5);
4666                                    let mut prev_end = 0;
4667                                    for _ in 0..highlight_count {
4668                                        let range =
4669                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
4670                                        let start = buffer
4671                                            .offset_to_point_utf16(range.start)
4672                                            .to_lsp_position();
4673                                        let end = buffer
4674                                            .offset_to_point_utf16(range.end)
4675                                            .to_lsp_position();
4676                                        highlights.push(lsp::DocumentHighlight {
4677                                            range: lsp::Range::new(start, end),
4678                                            kind: Some(lsp::DocumentHighlightKind::READ),
4679                                        });
4680                                        prev_end = range.end;
4681                                    }
4682                                    Some(highlights)
4683                                })
4684                            } else {
4685                                None
4686                            }
4687                        }
4688                    });
4689                }
4690            });
4691
4692            project.update(&mut cx, |project, _| {
4693                project.languages().add(Arc::new(Language::new(
4694                    LanguageConfig {
4695                        name: "Rust".into(),
4696                        path_suffixes: vec!["rs".to_string()],
4697                        language_server: Some(language_server_config),
4698                        ..Default::default()
4699                    },
4700                    None,
4701                )));
4702            });
4703
4704            async move {
4705                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4706                while operations.get() < max_operations {
4707                    operations.set(operations.get() + 1);
4708
4709                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4710                    match distribution {
4711                        0..=20 if !files.lock().is_empty() => {
4712                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4713                            let mut path = path.as_path();
4714                            while let Some(parent_path) = path.parent() {
4715                                path = parent_path;
4716                                if rng.lock().gen() {
4717                                    break;
4718                                }
4719                            }
4720
4721                            log::info!("Host: find/create local worktree {:?}", path);
4722                            let find_or_create_worktree = project.update(&mut cx, |project, cx| {
4723                                project.find_or_create_local_worktree(path, true, cx)
4724                            });
4725                            let find_or_create_worktree = async move {
4726                                find_or_create_worktree.await.unwrap();
4727                            };
4728                            if rng.lock().gen() {
4729                                cx.background().spawn(find_or_create_worktree).detach();
4730                            } else {
4731                                find_or_create_worktree.await;
4732                            }
4733                        }
4734                        10..=80 if !files.lock().is_empty() => {
4735                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4736                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4737                                let (worktree, path) = project
4738                                    .update(&mut cx, |project, cx| {
4739                                        project.find_or_create_local_worktree(
4740                                            file.clone(),
4741                                            true,
4742                                            cx,
4743                                        )
4744                                    })
4745                                    .await
4746                                    .unwrap();
4747                                let project_path =
4748                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4749                                log::info!(
4750                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
4751                                    file,
4752                                    project_path.0,
4753                                    project_path.1
4754                                );
4755                                let buffer = project
4756                                    .update(&mut cx, |project, cx| {
4757                                        project.open_buffer(project_path, cx)
4758                                    })
4759                                    .await
4760                                    .unwrap();
4761                                self.buffers.insert(buffer.clone());
4762                                buffer
4763                            } else {
4764                                self.buffers
4765                                    .iter()
4766                                    .choose(&mut *rng.lock())
4767                                    .unwrap()
4768                                    .clone()
4769                            };
4770
4771                            if rng.lock().gen_bool(0.1) {
4772                                cx.update(|cx| {
4773                                    log::info!(
4774                                        "Host: dropping buffer {:?}",
4775                                        buffer.read(cx).file().unwrap().full_path(cx)
4776                                    );
4777                                    self.buffers.remove(&buffer);
4778                                    drop(buffer);
4779                                });
4780                            } else {
4781                                buffer.update(&mut cx, |buffer, cx| {
4782                                    log::info!(
4783                                        "Host: updating buffer {:?} ({})",
4784                                        buffer.file().unwrap().full_path(cx),
4785                                        buffer.remote_id()
4786                                    );
4787                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4788                                });
4789                            }
4790                        }
4791                        _ => loop {
4792                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4793                            let mut path = PathBuf::new();
4794                            path.push("/");
4795                            for _ in 0..path_component_count {
4796                                let letter = rng.lock().gen_range(b'a'..=b'z');
4797                                path.push(std::str::from_utf8(&[letter]).unwrap());
4798                            }
4799                            path.set_extension("rs");
4800                            let parent_path = path.parent().unwrap();
4801
4802                            log::info!("Host: creating file {:?}", path,);
4803
4804                            if fs.create_dir(&parent_path).await.is_ok()
4805                                && fs.create_file(&path, Default::default()).await.is_ok()
4806                            {
4807                                files.lock().push(path);
4808                                break;
4809                            } else {
4810                                log::info!("Host: cannot create file");
4811                            }
4812                        },
4813                    }
4814
4815                    cx.background().simulate_random_delay().await;
4816                }
4817
4818                log::info!("Host done");
4819
4820                self.project = Some(project);
4821                (self, cx)
4822            }
4823        }
4824
4825        pub async fn simulate_guest(
4826            mut self,
4827            guest_id: usize,
4828            project: ModelHandle<Project>,
4829            operations: Rc<Cell<usize>>,
4830            max_operations: usize,
4831            rng: Arc<Mutex<StdRng>>,
4832            mut cx: TestAppContext,
4833        ) -> (Self, TestAppContext) {
4834            while operations.get() < max_operations {
4835                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4836                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4837                        project
4838                            .worktrees(&cx)
4839                            .filter(|worktree| {
4840                                let worktree = worktree.read(cx);
4841                                worktree.is_visible()
4842                                    && worktree.entries(false).any(|e| e.is_file())
4843                            })
4844                            .choose(&mut *rng.lock())
4845                    }) {
4846                        worktree
4847                    } else {
4848                        cx.background().simulate_random_delay().await;
4849                        continue;
4850                    };
4851
4852                    operations.set(operations.get() + 1);
4853                    let (worktree_root_name, project_path) =
4854                        worktree.read_with(&cx, |worktree, _| {
4855                            let entry = worktree
4856                                .entries(false)
4857                                .filter(|e| e.is_file())
4858                                .choose(&mut *rng.lock())
4859                                .unwrap();
4860                            (
4861                                worktree.root_name().to_string(),
4862                                (worktree.id(), entry.path.clone()),
4863                            )
4864                        });
4865                    log::info!(
4866                        "Guest {}: opening path {:?} in worktree {} ({})",
4867                        guest_id,
4868                        project_path.1,
4869                        project_path.0,
4870                        worktree_root_name,
4871                    );
4872                    let buffer = project
4873                        .update(&mut cx, |project, cx| {
4874                            project.open_buffer(project_path.clone(), cx)
4875                        })
4876                        .await
4877                        .unwrap();
4878                    log::info!(
4879                        "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
4880                        guest_id,
4881                        project_path.1,
4882                        project_path.0,
4883                        worktree_root_name,
4884                        buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4885                    );
4886                    self.buffers.insert(buffer.clone());
4887                    buffer
4888                } else {
4889                    operations.set(operations.get() + 1);
4890
4891                    self.buffers
4892                        .iter()
4893                        .choose(&mut *rng.lock())
4894                        .unwrap()
4895                        .clone()
4896                };
4897
4898                let choice = rng.lock().gen_range(0..100);
4899                match choice {
4900                    0..=9 => {
4901                        cx.update(|cx| {
4902                            log::info!(
4903                                "Guest {}: dropping buffer {:?}",
4904                                guest_id,
4905                                buffer.read(cx).file().unwrap().full_path(cx)
4906                            );
4907                            self.buffers.remove(&buffer);
4908                            drop(buffer);
4909                        });
4910                    }
4911                    10..=19 => {
4912                        let completions = project.update(&mut cx, |project, cx| {
4913                            log::info!(
4914                                "Guest {}: requesting completions for buffer {} ({:?})",
4915                                guest_id,
4916                                buffer.read(cx).remote_id(),
4917                                buffer.read(cx).file().unwrap().full_path(cx)
4918                            );
4919                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4920                            project.completions(&buffer, offset, cx)
4921                        });
4922                        let completions = cx.background().spawn(async move {
4923                            completions.await.expect("completions request failed");
4924                        });
4925                        if rng.lock().gen_bool(0.3) {
4926                            log::info!("Guest {}: detaching completions request", guest_id);
4927                            completions.detach();
4928                        } else {
4929                            completions.await;
4930                        }
4931                    }
4932                    20..=29 => {
4933                        let code_actions = project.update(&mut cx, |project, cx| {
4934                            log::info!(
4935                                "Guest {}: requesting code actions for buffer {} ({:?})",
4936                                guest_id,
4937                                buffer.read(cx).remote_id(),
4938                                buffer.read(cx).file().unwrap().full_path(cx)
4939                            );
4940                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4941                            project.code_actions(&buffer, range, cx)
4942                        });
4943                        let code_actions = cx.background().spawn(async move {
4944                            code_actions.await.expect("code actions request failed");
4945                        });
4946                        if rng.lock().gen_bool(0.3) {
4947                            log::info!("Guest {}: detaching code actions request", guest_id);
4948                            code_actions.detach();
4949                        } else {
4950                            code_actions.await;
4951                        }
4952                    }
4953                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4954                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4955                            log::info!(
4956                                "Guest {}: saving buffer {} ({:?})",
4957                                guest_id,
4958                                buffer.remote_id(),
4959                                buffer.file().unwrap().full_path(cx)
4960                            );
4961                            (buffer.version(), buffer.save(cx))
4962                        });
4963                        let save = cx.background().spawn(async move {
4964                            let (saved_version, _) = save.await.expect("save request failed");
4965                            assert!(saved_version.observed_all(&requested_version));
4966                        });
4967                        if rng.lock().gen_bool(0.3) {
4968                            log::info!("Guest {}: detaching save request", guest_id);
4969                            save.detach();
4970                        } else {
4971                            save.await;
4972                        }
4973                    }
4974                    40..=44 => {
4975                        let prepare_rename = project.update(&mut cx, |project, cx| {
4976                            log::info!(
4977                                "Guest {}: preparing rename for buffer {} ({:?})",
4978                                guest_id,
4979                                buffer.read(cx).remote_id(),
4980                                buffer.read(cx).file().unwrap().full_path(cx)
4981                            );
4982                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4983                            project.prepare_rename(buffer, offset, cx)
4984                        });
4985                        let prepare_rename = cx.background().spawn(async move {
4986                            prepare_rename.await.expect("prepare rename request failed");
4987                        });
4988                        if rng.lock().gen_bool(0.3) {
4989                            log::info!("Guest {}: detaching prepare rename request", guest_id);
4990                            prepare_rename.detach();
4991                        } else {
4992                            prepare_rename.await;
4993                        }
4994                    }
4995                    45..=49 => {
4996                        let definitions = project.update(&mut cx, |project, cx| {
4997                            log::info!(
4998                                "Guest {}: requesting definitions for buffer {} ({:?})",
4999                                guest_id,
5000                                buffer.read(cx).remote_id(),
5001                                buffer.read(cx).file().unwrap().full_path(cx)
5002                            );
5003                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5004                            project.definition(&buffer, offset, cx)
5005                        });
5006                        let definitions = cx.background().spawn(async move {
5007                            definitions.await.expect("definitions request failed")
5008                        });
5009                        if rng.lock().gen_bool(0.3) {
5010                            log::info!("Guest {}: detaching definitions request", guest_id);
5011                            definitions.detach();
5012                        } else {
5013                            self.buffers
5014                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5015                        }
5016                    }
5017                    50..=54 => {
5018                        let highlights = project.update(&mut cx, |project, cx| {
5019                            log::info!(
5020                                "Guest {}: requesting highlights for buffer {} ({:?})",
5021                                guest_id,
5022                                buffer.read(cx).remote_id(),
5023                                buffer.read(cx).file().unwrap().full_path(cx)
5024                            );
5025                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5026                            project.document_highlights(&buffer, offset, cx)
5027                        });
5028                        let highlights = cx.background().spawn(async move {
5029                            highlights.await.expect("highlights request failed");
5030                        });
5031                        if rng.lock().gen_bool(0.3) {
5032                            log::info!("Guest {}: detaching highlights request", guest_id);
5033                            highlights.detach();
5034                        } else {
5035                            highlights.await;
5036                        }
5037                    }
5038                    55..=59 => {
5039                        let search = project.update(&mut cx, |project, cx| {
5040                            let query = rng.lock().gen_range('a'..='z');
5041                            log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5042                            project.search(SearchQuery::text(query, false, false), cx)
5043                        });
5044                        let search = cx
5045                            .background()
5046                            .spawn(async move { search.await.expect("search request failed") });
5047                        if rng.lock().gen_bool(0.3) {
5048                            log::info!("Guest {}: detaching search request", guest_id);
5049                            search.detach();
5050                        } else {
5051                            self.buffers.extend(search.await.into_keys());
5052                        }
5053                    }
5054                    _ => {
5055                        buffer.update(&mut cx, |buffer, cx| {
5056                            log::info!(
5057                                "Guest {}: updating buffer {} ({:?})",
5058                                guest_id,
5059                                buffer.remote_id(),
5060                                buffer.file().unwrap().full_path(cx)
5061                            );
5062                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5063                        });
5064                    }
5065                }
5066                cx.background().simulate_random_delay().await;
5067            }
5068
5069            log::info!("Guest {} done", guest_id);
5070
5071            self.project = Some(project);
5072            (self, cx)
5073        }
5074    }
5075
5076    impl Drop for TestClient {
5077        fn drop(&mut self) {
5078            self.client.tear_down();
5079        }
5080    }
5081
5082    impl Executor for Arc<gpui::executor::Background> {
5083        type Timer = gpui::executor::Timer;
5084
5085        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5086            self.spawn(future).detach();
5087        }
5088
5089        fn timer(&self, duration: Duration) -> Self::Timer {
5090            self.as_ref().timer(duration)
5091        }
5092    }
5093
5094    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5095        channel
5096            .messages()
5097            .cursor::<()>()
5098            .map(|m| {
5099                (
5100                    m.sender.github_login.clone(),
5101                    m.body.clone(),
5102                    m.is_pending(),
5103                )
5104            })
5105            .collect()
5106    }
5107
5108    struct EmptyView;
5109
5110    impl gpui::Entity for EmptyView {
5111        type Event = ();
5112    }
5113
5114    impl gpui::View for EmptyView {
5115        fn ui_name() -> &'static str {
5116            "empty view"
5117        }
5118
5119        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5120            gpui::Element::boxed(gpui::elements::Empty)
5121        }
5122    }
5123}