rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::{IntoResponse, Response},
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::{HashMap, HashSet};
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::Arc,
  40    time::Duration,
  41};
  42use store::{Store, Worktree};
  43use time::OffsetDateTime;
  44use tokio::{
  45    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  46    time::Sleep,
  47};
  48use tower::ServiceBuilder;
  49use tracing::{info_span, instrument, Instrument};
  50
  51type MessageHandler =
  52    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  53
  54pub struct Server {
  55    peer: Arc<Peer>,
  56    store: RwLock<Store>,
  57    app_state: Arc<AppState>,
  58    handlers: HashMap<TypeId, MessageHandler>,
  59    notifications: Option<mpsc::UnboundedSender<()>>,
  60}
  61
  62pub trait Executor: Send + Clone {
  63    type Sleep: Send + Future;
  64    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  65    fn sleep(&self, duration: Duration) -> Self::Sleep;
  66}
  67
  68#[derive(Clone)]
  69pub struct RealExecutor;
  70
  71const MESSAGE_COUNT_PER_PAGE: usize = 100;
  72const MAX_MESSAGE_LEN: usize = 1024;
  73
  74struct StoreReadGuard<'a> {
  75    guard: RwLockReadGuard<'a, Store>,
  76    _not_send: PhantomData<Rc<()>>,
  77}
  78
  79struct StoreWriteGuard<'a> {
  80    guard: RwLockWriteGuard<'a, Store>,
  81    _not_send: PhantomData<Rc<()>>,
  82}
  83
  84impl Server {
  85    pub fn new(
  86        app_state: Arc<AppState>,
  87        notifications: Option<mpsc::UnboundedSender<()>>,
  88    ) -> Arc<Self> {
  89        let mut server = Self {
  90            peer: Peer::new(),
  91            app_state,
  92            store: Default::default(),
  93            handlers: Default::default(),
  94            notifications,
  95        };
  96
  97        server
  98            .add_request_handler(Server::ping)
  99            .add_request_handler(Server::register_project)
 100            .add_message_handler(Server::unregister_project)
 101            .add_request_handler(Server::share_project)
 102            .add_message_handler(Server::unshare_project)
 103            .add_sync_request_handler(Server::join_project)
 104            .add_message_handler(Server::leave_project)
 105            .add_request_handler(Server::register_worktree)
 106            .add_message_handler(Server::unregister_worktree)
 107            .add_request_handler(Server::update_worktree)
 108            .add_message_handler(Server::start_language_server)
 109            .add_message_handler(Server::update_language_server)
 110            .add_message_handler(Server::update_diagnostic_summary)
 111            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 112            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 113            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 114            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 115            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 116            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 117            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 118            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 119            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 120            .add_request_handler(
 121                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 122            )
 123            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 124            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 125            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 126            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 127            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 128            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 129            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 130            .add_request_handler(Server::update_buffer)
 131            .add_message_handler(Server::update_buffer_file)
 132            .add_message_handler(Server::buffer_reloaded)
 133            .add_message_handler(Server::buffer_saved)
 134            .add_request_handler(Server::save_buffer)
 135            .add_request_handler(Server::get_channels)
 136            .add_request_handler(Server::get_users)
 137            .add_request_handler(Server::join_channel)
 138            .add_message_handler(Server::leave_channel)
 139            .add_request_handler(Server::send_channel_message)
 140            .add_request_handler(Server::follow)
 141            .add_message_handler(Server::unfollow)
 142            .add_message_handler(Server::update_followers)
 143            .add_request_handler(Server::get_channel_messages);
 144
 145        Arc::new(server)
 146    }
 147
 148    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 149    where
 150        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 151        Fut: 'static + Send + Future<Output = Result<()>>,
 152        M: EnvelopedMessage,
 153    {
 154        let prev_handler = self.handlers.insert(
 155            TypeId::of::<M>(),
 156            Box::new(move |server, envelope| {
 157                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 158                let span = info_span!(
 159                    "handle message",
 160                    payload_type = envelope.payload_type_name(),
 161                    payload = serde_json::to_string_pretty(&envelope.payload)
 162                        .unwrap()
 163                        .as_str(),
 164                );
 165                let future = (handler)(server, *envelope);
 166                async move {
 167                    if let Err(error) = future.await {
 168                        tracing::error!(%error, "error handling message");
 169                    }
 170                }
 171                .instrument(span)
 172                .boxed()
 173            }),
 174        );
 175        if prev_handler.is_some() {
 176            panic!("registered a handler for the same message twice");
 177        }
 178        self
 179    }
 180
 181    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 182    where
 183        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 184        Fut: 'static + Send + Future<Output = Result<M::Response>>,
 185        M: RequestMessage,
 186    {
 187        self.add_message_handler(move |server, envelope| {
 188            let receipt = envelope.receipt();
 189            let response = (handler)(server.clone(), envelope);
 190            async move {
 191                match response.await {
 192                    Ok(response) => {
 193                        server.peer.respond(receipt, response)?;
 194                        Ok(())
 195                    }
 196                    Err(error) => {
 197                        server.peer.respond_with_error(
 198                            receipt,
 199                            proto::Error {
 200                                message: error.to_string(),
 201                            },
 202                        )?;
 203                        Err(error)
 204                    }
 205                }
 206            }
 207        })
 208    }
 209
 210    /// Handle a request while holding a lock to the store. This is useful when we're registering
 211    /// a connection but we want to respond on the connection before anybody else can send on it.
 212    fn add_sync_request_handler<F, M>(&mut self, handler: F) -> &mut Self
 213    where
 214        F: 'static
 215            + Send
 216            + Sync
 217            + Fn(Arc<Self>, &mut Store, TypedEnvelope<M>) -> Result<M::Response>,
 218        M: RequestMessage,
 219    {
 220        let handler = Arc::new(handler);
 221        self.add_message_handler(move |server, envelope| {
 222            let receipt = envelope.receipt();
 223            let handler = handler.clone();
 224            async move {
 225                let mut store = server.state_mut().await;
 226                let response = (handler)(server.clone(), &mut *store, envelope);
 227                match response {
 228                    Ok(response) => {
 229                        server.peer.respond(receipt, response)?;
 230                        Ok(())
 231                    }
 232                    Err(error) => {
 233                        server.peer.respond_with_error(
 234                            receipt,
 235                            proto::Error {
 236                                message: error.to_string(),
 237                            },
 238                        )?;
 239                        Err(error)
 240                    }
 241                }
 242            }
 243        })
 244    }
 245
 246    pub fn handle_connection<E: Executor>(
 247        self: &Arc<Self>,
 248        connection: Connection,
 249        address: String,
 250        user_id: UserId,
 251        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 252        executor: E,
 253    ) -> impl Future<Output = ()> {
 254        let mut this = self.clone();
 255        let span = info_span!("handle connection", %user_id, %address);
 256        async move {
 257            let (connection_id, handle_io, mut incoming_rx) = this
 258                .peer
 259                .add_connection(connection, {
 260                    let executor = executor.clone();
 261                    move |duration| {
 262                        let timer = executor.sleep(duration);
 263                        async move {
 264                            timer.await;
 265                        }
 266                    }
 267                })
 268                .await;
 269
 270            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 271
 272            if let Some(send_connection_id) = send_connection_id.as_mut() {
 273                let _ = send_connection_id.send(connection_id).await;
 274            }
 275
 276            {
 277                let mut state = this.state_mut().await;
 278                state.add_connection(connection_id, user_id);
 279                this.update_contacts_for_users(&*state, &[user_id]);
 280            }
 281
 282            let handle_io = handle_io.fuse();
 283            futures::pin_mut!(handle_io);
 284            loop {
 285                let next_message = incoming_rx.next().fuse();
 286                futures::pin_mut!(next_message);
 287                futures::select_biased! {
 288                    result = handle_io => {
 289                        if let Err(error) = result {
 290                            tracing::error!(%error, "error handling I/O");
 291                        }
 292                        break;
 293                    }
 294                    message = next_message => {
 295                        if let Some(message) = message {
 296                            let type_name = message.payload_type_name();
 297                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 298                            async {
 299                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 300                                    let notifications = this.notifications.clone();
 301                                    let is_background = message.is_background();
 302                                    let handle_message = (handler)(this.clone(), message);
 303                                    let handle_message = async move {
 304                                        handle_message.await;
 305                                        if let Some(mut notifications) = notifications {
 306                                            let _ = notifications.send(()).await;
 307                                        }
 308                                    };
 309                                    if is_background {
 310                                        executor.spawn_detached(handle_message);
 311                                    } else {
 312                                        handle_message.await;
 313                                    }
 314                                } else {
 315                                    tracing::error!("no message handler");
 316                                }
 317                            }.instrument(span).await;
 318                        } else {
 319                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 320                            break;
 321                        }
 322                    }
 323                }
 324            }
 325
 326            if let Err(error) = this.sign_out(connection_id).await {
 327                tracing::error!(%error, "error signing out");
 328            }
 329        }.instrument(span)
 330    }
 331
 332    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 333        self.peer.disconnect(connection_id);
 334        let mut state = self.state_mut().await;
 335        let removed_connection = state.remove_connection(connection_id)?;
 336
 337        for (project_id, project) in removed_connection.hosted_projects {
 338            if let Some(share) = project.share {
 339                broadcast(
 340                    connection_id,
 341                    share.guests.keys().copied().collect(),
 342                    |conn_id| {
 343                        self.peer
 344                            .send(conn_id, proto::UnshareProject { project_id })
 345                    },
 346                );
 347            }
 348        }
 349
 350        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 351            broadcast(connection_id, peer_ids, |conn_id| {
 352                self.peer.send(
 353                    conn_id,
 354                    proto::RemoveProjectCollaborator {
 355                        project_id,
 356                        peer_id: connection_id.0,
 357                    },
 358                )
 359            });
 360        }
 361
 362        self.update_contacts_for_users(&*state, removed_connection.contact_ids.iter());
 363        Ok(())
 364    }
 365
 366    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> Result<proto::Ack> {
 367        Ok(proto::Ack {})
 368    }
 369
 370    async fn register_project(
 371        self: Arc<Server>,
 372        request: TypedEnvelope<proto::RegisterProject>,
 373    ) -> Result<proto::RegisterProjectResponse> {
 374        let project_id = {
 375            let mut state = self.state_mut().await;
 376            let user_id = state.user_id_for_connection(request.sender_id)?;
 377            state.register_project(request.sender_id, user_id)
 378        };
 379        Ok(proto::RegisterProjectResponse { project_id })
 380    }
 381
 382    async fn unregister_project(
 383        self: Arc<Server>,
 384        request: TypedEnvelope<proto::UnregisterProject>,
 385    ) -> Result<()> {
 386        let mut state = self.state_mut().await;
 387        let project = state.unregister_project(request.payload.project_id, request.sender_id)?;
 388        self.update_contacts_for_users(&*state, &project.authorized_user_ids());
 389        Ok(())
 390    }
 391
 392    async fn share_project(
 393        self: Arc<Server>,
 394        request: TypedEnvelope<proto::ShareProject>,
 395    ) -> Result<proto::Ack> {
 396        let mut state = self.state_mut().await;
 397        let project = state.share_project(request.payload.project_id, request.sender_id)?;
 398        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 399        Ok(proto::Ack {})
 400    }
 401
 402    async fn unshare_project(
 403        self: Arc<Server>,
 404        request: TypedEnvelope<proto::UnshareProject>,
 405    ) -> Result<()> {
 406        let project_id = request.payload.project_id;
 407        let mut state = self.state_mut().await;
 408        let project = state.unshare_project(project_id, request.sender_id)?;
 409        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 410            self.peer
 411                .send(conn_id, proto::UnshareProject { project_id })
 412        });
 413        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 414        Ok(())
 415    }
 416
 417    fn join_project(
 418        self: Arc<Server>,
 419        state: &mut Store,
 420        request: TypedEnvelope<proto::JoinProject>,
 421    ) -> Result<proto::JoinProjectResponse> {
 422        let project_id = request.payload.project_id;
 423
 424        let user_id = state.user_id_for_connection(request.sender_id)?;
 425        let (response, connection_ids, contact_user_ids) = state
 426            .join_project(request.sender_id, user_id, project_id)
 427            .and_then(|joined| {
 428                let share = joined.project.share()?;
 429                let peer_count = share.guests.len();
 430                let mut collaborators = Vec::with_capacity(peer_count);
 431                collaborators.push(proto::Collaborator {
 432                    peer_id: joined.project.host_connection_id.0,
 433                    replica_id: 0,
 434                    user_id: joined.project.host_user_id.to_proto(),
 435                });
 436                let worktrees = share
 437                    .worktrees
 438                    .iter()
 439                    .filter_map(|(id, shared_worktree)| {
 440                        let worktree = joined.project.worktrees.get(&id)?;
 441                        Some(proto::Worktree {
 442                            id: *id,
 443                            root_name: worktree.root_name.clone(),
 444                            entries: shared_worktree.entries.values().cloned().collect(),
 445                            diagnostic_summaries: shared_worktree
 446                                .diagnostic_summaries
 447                                .values()
 448                                .cloned()
 449                                .collect(),
 450                            visible: worktree.visible,
 451                        })
 452                    })
 453                    .collect();
 454                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 455                    if *peer_conn_id != request.sender_id {
 456                        collaborators.push(proto::Collaborator {
 457                            peer_id: peer_conn_id.0,
 458                            replica_id: *peer_replica_id as u32,
 459                            user_id: peer_user_id.to_proto(),
 460                        });
 461                    }
 462                }
 463                let response = proto::JoinProjectResponse {
 464                    worktrees,
 465                    replica_id: joined.replica_id as u32,
 466                    collaborators,
 467                    language_servers: joined.project.language_servers.clone(),
 468                };
 469                let connection_ids = joined.project.connection_ids();
 470                let contact_user_ids = joined.project.authorized_user_ids();
 471                Ok((response, connection_ids, contact_user_ids))
 472            })?;
 473
 474        broadcast(request.sender_id, connection_ids, |conn_id| {
 475            self.peer.send(
 476                conn_id,
 477                proto::AddProjectCollaborator {
 478                    project_id,
 479                    collaborator: Some(proto::Collaborator {
 480                        peer_id: request.sender_id.0,
 481                        replica_id: response.replica_id,
 482                        user_id: user_id.to_proto(),
 483                    }),
 484                },
 485            )
 486        });
 487        self.update_contacts_for_users(state, &contact_user_ids);
 488        Ok(response)
 489    }
 490
 491    async fn leave_project(
 492        self: Arc<Server>,
 493        request: TypedEnvelope<proto::LeaveProject>,
 494    ) -> Result<()> {
 495        let sender_id = request.sender_id;
 496        let project_id = request.payload.project_id;
 497        let mut state = self.state_mut().await;
 498        let worktree = state.leave_project(sender_id, project_id)?;
 499        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 500            self.peer.send(
 501                conn_id,
 502                proto::RemoveProjectCollaborator {
 503                    project_id,
 504                    peer_id: sender_id.0,
 505                },
 506            )
 507        });
 508        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 509        Ok(())
 510    }
 511
 512    async fn register_worktree(
 513        self: Arc<Server>,
 514        request: TypedEnvelope<proto::RegisterWorktree>,
 515    ) -> Result<proto::Ack> {
 516        let mut contact_user_ids = HashSet::default();
 517        for github_login in &request.payload.authorized_logins {
 518            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 519            contact_user_ids.insert(contact_user_id);
 520        }
 521
 522        let mut state = self.state_mut().await;
 523        let host_user_id = state.user_id_for_connection(request.sender_id)?;
 524        contact_user_ids.insert(host_user_id);
 525
 526        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 527        let guest_connection_ids = state
 528            .read_project(request.payload.project_id, request.sender_id)?
 529            .guest_connection_ids();
 530        state.register_worktree(
 531            request.payload.project_id,
 532            request.payload.worktree_id,
 533            request.sender_id,
 534            Worktree {
 535                authorized_user_ids: contact_user_ids.clone(),
 536                root_name: request.payload.root_name.clone(),
 537                visible: request.payload.visible,
 538            },
 539        )?;
 540
 541        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 542            self.peer
 543                .forward_send(request.sender_id, connection_id, request.payload.clone())
 544        });
 545        self.update_contacts_for_users(&*state, &contact_user_ids);
 546        Ok(proto::Ack {})
 547    }
 548
 549    async fn unregister_worktree(
 550        self: Arc<Server>,
 551        request: TypedEnvelope<proto::UnregisterWorktree>,
 552    ) -> Result<()> {
 553        let project_id = request.payload.project_id;
 554        let worktree_id = request.payload.worktree_id;
 555        let mut state = self.state_mut().await;
 556        let (worktree, guest_connection_ids) =
 557            state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 558        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 559            self.peer.send(
 560                conn_id,
 561                proto::UnregisterWorktree {
 562                    project_id,
 563                    worktree_id,
 564                },
 565            )
 566        });
 567        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 568        Ok(())
 569    }
 570
 571    async fn update_worktree(
 572        self: Arc<Server>,
 573        request: TypedEnvelope<proto::UpdateWorktree>,
 574    ) -> Result<proto::Ack> {
 575        let connection_ids = self.state_mut().await.update_worktree(
 576            request.sender_id,
 577            request.payload.project_id,
 578            request.payload.worktree_id,
 579            &request.payload.removed_entries,
 580            &request.payload.updated_entries,
 581        )?;
 582
 583        broadcast(request.sender_id, connection_ids, |connection_id| {
 584            self.peer
 585                .forward_send(request.sender_id, connection_id, request.payload.clone())
 586        });
 587
 588        Ok(proto::Ack {})
 589    }
 590
 591    async fn update_diagnostic_summary(
 592        self: Arc<Server>,
 593        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 594    ) -> Result<()> {
 595        let summary = request
 596            .payload
 597            .summary
 598            .clone()
 599            .ok_or_else(|| anyhow!("invalid summary"))?;
 600        let receiver_ids = self.state_mut().await.update_diagnostic_summary(
 601            request.payload.project_id,
 602            request.payload.worktree_id,
 603            request.sender_id,
 604            summary,
 605        )?;
 606
 607        broadcast(request.sender_id, receiver_ids, |connection_id| {
 608            self.peer
 609                .forward_send(request.sender_id, connection_id, request.payload.clone())
 610        });
 611        Ok(())
 612    }
 613
 614    async fn start_language_server(
 615        self: Arc<Server>,
 616        request: TypedEnvelope<proto::StartLanguageServer>,
 617    ) -> Result<()> {
 618        let receiver_ids = self.state_mut().await.start_language_server(
 619            request.payload.project_id,
 620            request.sender_id,
 621            request
 622                .payload
 623                .server
 624                .clone()
 625                .ok_or_else(|| anyhow!("invalid language server"))?,
 626        )?;
 627        broadcast(request.sender_id, receiver_ids, |connection_id| {
 628            self.peer
 629                .forward_send(request.sender_id, connection_id, request.payload.clone())
 630        });
 631        Ok(())
 632    }
 633
 634    async fn update_language_server(
 635        self: Arc<Server>,
 636        request: TypedEnvelope<proto::UpdateLanguageServer>,
 637    ) -> Result<()> {
 638        let receiver_ids = self
 639            .state()
 640            .await
 641            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 642        broadcast(request.sender_id, receiver_ids, |connection_id| {
 643            self.peer
 644                .forward_send(request.sender_id, connection_id, request.payload.clone())
 645        });
 646        Ok(())
 647    }
 648
 649    async fn forward_project_request<T>(
 650        self: Arc<Server>,
 651        request: TypedEnvelope<T>,
 652    ) -> Result<T::Response>
 653    where
 654        T: EntityMessage + RequestMessage,
 655    {
 656        let host_connection_id = self
 657            .state()
 658            .await
 659            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 660            .host_connection_id;
 661        Ok(self
 662            .peer
 663            .forward_request(request.sender_id, host_connection_id, request.payload)
 664            .await?)
 665    }
 666
 667    async fn save_buffer(
 668        self: Arc<Server>,
 669        request: TypedEnvelope<proto::SaveBuffer>,
 670    ) -> Result<proto::BufferSaved> {
 671        let host = self
 672            .state()
 673            .await
 674            .read_project(request.payload.project_id, request.sender_id)?
 675            .host_connection_id;
 676        let response = self
 677            .peer
 678            .forward_request(request.sender_id, host, request.payload.clone())
 679            .await?;
 680
 681        let mut guests = self
 682            .state()
 683            .await
 684            .read_project(request.payload.project_id, request.sender_id)?
 685            .connection_ids();
 686        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 687        broadcast(host, guests, |conn_id| {
 688            self.peer.forward_send(host, conn_id, response.clone())
 689        });
 690
 691        Ok(response)
 692    }
 693
 694    async fn update_buffer(
 695        self: Arc<Server>,
 696        request: TypedEnvelope<proto::UpdateBuffer>,
 697    ) -> Result<proto::Ack> {
 698        let receiver_ids = self
 699            .state()
 700            .await
 701            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 702        broadcast(request.sender_id, receiver_ids, |connection_id| {
 703            self.peer
 704                .forward_send(request.sender_id, connection_id, request.payload.clone())
 705        });
 706        Ok(proto::Ack {})
 707    }
 708
 709    async fn update_buffer_file(
 710        self: Arc<Server>,
 711        request: TypedEnvelope<proto::UpdateBufferFile>,
 712    ) -> Result<()> {
 713        let receiver_ids = self
 714            .state()
 715            .await
 716            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 717        broadcast(request.sender_id, receiver_ids, |connection_id| {
 718            self.peer
 719                .forward_send(request.sender_id, connection_id, request.payload.clone())
 720        });
 721        Ok(())
 722    }
 723
 724    async fn buffer_reloaded(
 725        self: Arc<Server>,
 726        request: TypedEnvelope<proto::BufferReloaded>,
 727    ) -> Result<()> {
 728        let receiver_ids = self
 729            .state()
 730            .await
 731            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 732        broadcast(request.sender_id, receiver_ids, |connection_id| {
 733            self.peer
 734                .forward_send(request.sender_id, connection_id, request.payload.clone())
 735        });
 736        Ok(())
 737    }
 738
 739    async fn buffer_saved(
 740        self: Arc<Server>,
 741        request: TypedEnvelope<proto::BufferSaved>,
 742    ) -> Result<()> {
 743        let receiver_ids = self
 744            .state()
 745            .await
 746            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 747        broadcast(request.sender_id, receiver_ids, |connection_id| {
 748            self.peer
 749                .forward_send(request.sender_id, connection_id, request.payload.clone())
 750        });
 751        Ok(())
 752    }
 753
 754    async fn follow(
 755        self: Arc<Self>,
 756        request: TypedEnvelope<proto::Follow>,
 757    ) -> Result<proto::FollowResponse> {
 758        let leader_id = ConnectionId(request.payload.leader_id);
 759        let follower_id = request.sender_id;
 760        if !self
 761            .state()
 762            .await
 763            .project_connection_ids(request.payload.project_id, follower_id)?
 764            .contains(&leader_id)
 765        {
 766            Err(anyhow!("no such peer"))?;
 767        }
 768        let mut response = self
 769            .peer
 770            .forward_request(request.sender_id, leader_id, request.payload)
 771            .await?;
 772        response
 773            .views
 774            .retain(|view| view.leader_id != Some(follower_id.0));
 775        Ok(response)
 776    }
 777
 778    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 779        let leader_id = ConnectionId(request.payload.leader_id);
 780        if !self
 781            .state()
 782            .await
 783            .project_connection_ids(request.payload.project_id, request.sender_id)?
 784            .contains(&leader_id)
 785        {
 786            Err(anyhow!("no such peer"))?;
 787        }
 788        self.peer
 789            .forward_send(request.sender_id, leader_id, request.payload)?;
 790        Ok(())
 791    }
 792
 793    async fn update_followers(
 794        self: Arc<Self>,
 795        request: TypedEnvelope<proto::UpdateFollowers>,
 796    ) -> Result<()> {
 797        let connection_ids = self
 798            .state()
 799            .await
 800            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 801        let leader_id = request
 802            .payload
 803            .variant
 804            .as_ref()
 805            .and_then(|variant| match variant {
 806                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 807                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 808                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 809            });
 810        for follower_id in &request.payload.follower_ids {
 811            let follower_id = ConnectionId(*follower_id);
 812            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 813                self.peer
 814                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 815            }
 816        }
 817        Ok(())
 818    }
 819
 820    async fn get_channels(
 821        self: Arc<Server>,
 822        request: TypedEnvelope<proto::GetChannels>,
 823    ) -> Result<proto::GetChannelsResponse> {
 824        let user_id = self
 825            .state()
 826            .await
 827            .user_id_for_connection(request.sender_id)?;
 828        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 829        Ok(proto::GetChannelsResponse {
 830            channels: channels
 831                .into_iter()
 832                .map(|chan| proto::Channel {
 833                    id: chan.id.to_proto(),
 834                    name: chan.name,
 835                })
 836                .collect(),
 837        })
 838    }
 839
 840    async fn get_users(
 841        self: Arc<Server>,
 842        request: TypedEnvelope<proto::GetUsers>,
 843    ) -> Result<proto::GetUsersResponse> {
 844        let user_ids = request
 845            .payload
 846            .user_ids
 847            .into_iter()
 848            .map(UserId::from_proto)
 849            .collect();
 850        let users = self
 851            .app_state
 852            .db
 853            .get_users_by_ids(user_ids)
 854            .await?
 855            .into_iter()
 856            .map(|user| proto::User {
 857                id: user.id.to_proto(),
 858                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 859                github_login: user.github_login,
 860            })
 861            .collect();
 862        Ok(proto::GetUsersResponse { users })
 863    }
 864
 865    #[instrument(skip(self, state, user_ids))]
 866    fn update_contacts_for_users<'a>(
 867        self: &Arc<Self>,
 868        state: &Store,
 869        user_ids: impl IntoIterator<Item = &'a UserId>,
 870    ) {
 871        for user_id in user_ids {
 872            let contacts = state.contacts_for_user(*user_id);
 873            for connection_id in state.connection_ids_for_user(*user_id) {
 874                self.peer
 875                    .send(
 876                        connection_id,
 877                        proto::UpdateContacts {
 878                            contacts: contacts.clone(),
 879                        },
 880                    )
 881                    .trace_err();
 882            }
 883        }
 884    }
 885
 886    async fn join_channel(
 887        self: Arc<Self>,
 888        request: TypedEnvelope<proto::JoinChannel>,
 889    ) -> Result<proto::JoinChannelResponse> {
 890        let user_id = self
 891            .state()
 892            .await
 893            .user_id_for_connection(request.sender_id)?;
 894        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 895        if !self
 896            .app_state
 897            .db
 898            .can_user_access_channel(user_id, channel_id)
 899            .await?
 900        {
 901            Err(anyhow!("access denied"))?;
 902        }
 903
 904        self.state_mut()
 905            .await
 906            .join_channel(request.sender_id, channel_id);
 907        let messages = self
 908            .app_state
 909            .db
 910            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 911            .await?
 912            .into_iter()
 913            .map(|msg| proto::ChannelMessage {
 914                id: msg.id.to_proto(),
 915                body: msg.body,
 916                timestamp: msg.sent_at.unix_timestamp() as u64,
 917                sender_id: msg.sender_id.to_proto(),
 918                nonce: Some(msg.nonce.as_u128().into()),
 919            })
 920            .collect::<Vec<_>>();
 921        Ok(proto::JoinChannelResponse {
 922            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 923            messages,
 924        })
 925    }
 926
 927    async fn leave_channel(
 928        self: Arc<Self>,
 929        request: TypedEnvelope<proto::LeaveChannel>,
 930    ) -> Result<()> {
 931        let user_id = self
 932            .state()
 933            .await
 934            .user_id_for_connection(request.sender_id)?;
 935        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 936        if !self
 937            .app_state
 938            .db
 939            .can_user_access_channel(user_id, channel_id)
 940            .await?
 941        {
 942            Err(anyhow!("access denied"))?;
 943        }
 944
 945        self.state_mut()
 946            .await
 947            .leave_channel(request.sender_id, channel_id);
 948
 949        Ok(())
 950    }
 951
 952    async fn send_channel_message(
 953        self: Arc<Self>,
 954        request: TypedEnvelope<proto::SendChannelMessage>,
 955    ) -> Result<proto::SendChannelMessageResponse> {
 956        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 957        let user_id;
 958        let connection_ids;
 959        {
 960            let state = self.state().await;
 961            user_id = state.user_id_for_connection(request.sender_id)?;
 962            connection_ids = state.channel_connection_ids(channel_id)?;
 963        }
 964
 965        // Validate the message body.
 966        let body = request.payload.body.trim().to_string();
 967        if body.len() > MAX_MESSAGE_LEN {
 968            return Err(anyhow!("message is too long"))?;
 969        }
 970        if body.is_empty() {
 971            return Err(anyhow!("message can't be blank"))?;
 972        }
 973
 974        let timestamp = OffsetDateTime::now_utc();
 975        let nonce = request
 976            .payload
 977            .nonce
 978            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 979
 980        let message_id = self
 981            .app_state
 982            .db
 983            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 984            .await?
 985            .to_proto();
 986        let message = proto::ChannelMessage {
 987            sender_id: user_id.to_proto(),
 988            id: message_id,
 989            body,
 990            timestamp: timestamp.unix_timestamp() as u64,
 991            nonce: Some(nonce),
 992        };
 993        broadcast(request.sender_id, connection_ids, |conn_id| {
 994            self.peer.send(
 995                conn_id,
 996                proto::ChannelMessageSent {
 997                    channel_id: channel_id.to_proto(),
 998                    message: Some(message.clone()),
 999                },
1000            )
1001        });
1002        Ok(proto::SendChannelMessageResponse {
1003            message: Some(message),
1004        })
1005    }
1006
1007    async fn get_channel_messages(
1008        self: Arc<Self>,
1009        request: TypedEnvelope<proto::GetChannelMessages>,
1010    ) -> Result<proto::GetChannelMessagesResponse> {
1011        let user_id = self
1012            .state()
1013            .await
1014            .user_id_for_connection(request.sender_id)?;
1015        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1016        if !self
1017            .app_state
1018            .db
1019            .can_user_access_channel(user_id, channel_id)
1020            .await?
1021        {
1022            Err(anyhow!("access denied"))?;
1023        }
1024
1025        let messages = self
1026            .app_state
1027            .db
1028            .get_channel_messages(
1029                channel_id,
1030                MESSAGE_COUNT_PER_PAGE,
1031                Some(MessageId::from_proto(request.payload.before_message_id)),
1032            )
1033            .await?
1034            .into_iter()
1035            .map(|msg| proto::ChannelMessage {
1036                id: msg.id.to_proto(),
1037                body: msg.body,
1038                timestamp: msg.sent_at.unix_timestamp() as u64,
1039                sender_id: msg.sender_id.to_proto(),
1040                nonce: Some(msg.nonce.as_u128().into()),
1041            })
1042            .collect::<Vec<_>>();
1043
1044        Ok(proto::GetChannelMessagesResponse {
1045            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1046            messages,
1047        })
1048    }
1049
1050    async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1051        #[cfg(test)]
1052        tokio::task::yield_now().await;
1053        let guard = self.store.read().await;
1054        #[cfg(test)]
1055        tokio::task::yield_now().await;
1056        StoreReadGuard {
1057            guard,
1058            _not_send: PhantomData,
1059        }
1060    }
1061
1062    async fn state_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1063        #[cfg(test)]
1064        tokio::task::yield_now().await;
1065        let guard = self.store.write().await;
1066        #[cfg(test)]
1067        tokio::task::yield_now().await;
1068        StoreWriteGuard {
1069            guard,
1070            _not_send: PhantomData,
1071        }
1072    }
1073}
1074
1075impl<'a> Deref for StoreReadGuard<'a> {
1076    type Target = Store;
1077
1078    fn deref(&self) -> &Self::Target {
1079        &*self.guard
1080    }
1081}
1082
1083impl<'a> Deref for StoreWriteGuard<'a> {
1084    type Target = Store;
1085
1086    fn deref(&self) -> &Self::Target {
1087        &*self.guard
1088    }
1089}
1090
1091impl<'a> DerefMut for StoreWriteGuard<'a> {
1092    fn deref_mut(&mut self) -> &mut Self::Target {
1093        &mut *self.guard
1094    }
1095}
1096
1097impl<'a> Drop for StoreWriteGuard<'a> {
1098    fn drop(&mut self) {
1099        #[cfg(test)]
1100        self.check_invariants();
1101
1102        let metrics = self.metrics();
1103        tracing::info!(
1104            connections = metrics.connections,
1105            registered_projects = metrics.registered_projects,
1106            shared_projects = metrics.shared_projects,
1107            "metrics"
1108        );
1109    }
1110}
1111
1112impl Executor for RealExecutor {
1113    type Sleep = Sleep;
1114
1115    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1116        tokio::task::spawn(future);
1117    }
1118
1119    fn sleep(&self, duration: Duration) -> Self::Sleep {
1120        tokio::time::sleep(duration)
1121    }
1122}
1123
1124#[instrument(skip(f))]
1125fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1126where
1127    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1128{
1129    for receiver_id in receiver_ids {
1130        if receiver_id != sender_id {
1131            f(receiver_id).trace_err();
1132        }
1133    }
1134}
1135
1136lazy_static! {
1137    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1138}
1139
1140pub struct ProtocolVersion(u32);
1141
1142impl Header for ProtocolVersion {
1143    fn name() -> &'static HeaderName {
1144        &ZED_PROTOCOL_VERSION
1145    }
1146
1147    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1148    where
1149        Self: Sized,
1150        I: Iterator<Item = &'i axum::http::HeaderValue>,
1151    {
1152        let version = values
1153            .next()
1154            .ok_or_else(|| axum::headers::Error::invalid())?
1155            .to_str()
1156            .map_err(|_| axum::headers::Error::invalid())?
1157            .parse()
1158            .map_err(|_| axum::headers::Error::invalid())?;
1159        Ok(Self(version))
1160    }
1161
1162    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1163        values.extend([self.0.to_string().parse().unwrap()]);
1164    }
1165}
1166
1167pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1168    let server = Server::new(app_state.clone(), None);
1169    Router::new()
1170        .route("/rpc", get(handle_websocket_request))
1171        .layer(
1172            ServiceBuilder::new()
1173                .layer(Extension(app_state))
1174                .layer(middleware::from_fn(auth::validate_header))
1175                .layer(Extension(server)),
1176        )
1177}
1178
1179pub async fn handle_websocket_request(
1180    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1181    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1182    Extension(server): Extension<Arc<Server>>,
1183    Extension(user_id): Extension<UserId>,
1184    ws: WebSocketUpgrade,
1185) -> Response {
1186    if protocol_version != rpc::PROTOCOL_VERSION {
1187        return (
1188            StatusCode::UPGRADE_REQUIRED,
1189            "client must be upgraded".to_string(),
1190        )
1191            .into_response();
1192    }
1193    let socket_address = socket_address.to_string();
1194    ws.on_upgrade(move |socket| {
1195        let socket = socket
1196            .map_ok(to_tungstenite_message)
1197            .err_into()
1198            .with(|message| async move { Ok(to_axum_message(message)) });
1199        let connection = Connection::new(Box::pin(socket));
1200        server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
1201    })
1202}
1203
1204fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1205    match message {
1206        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1207        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1208        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1209        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1210        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1211            code: frame.code.into(),
1212            reason: frame.reason,
1213        })),
1214    }
1215}
1216
1217fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1218    match message {
1219        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1220        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1221        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1222        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1223        AxumMessage::Close(frame) => {
1224            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1225                code: frame.code.into(),
1226                reason: frame.reason,
1227            }))
1228        }
1229    }
1230}
1231
1232pub trait ResultExt {
1233    type Ok;
1234
1235    fn trace_err(self) -> Option<Self::Ok>;
1236}
1237
1238impl<T, E> ResultExt for Result<T, E>
1239where
1240    E: std::fmt::Debug,
1241{
1242    type Ok = T;
1243
1244    fn trace_err(self) -> Option<T> {
1245        match self {
1246            Ok(value) => Some(value),
1247            Err(error) => {
1248                tracing::error!("{:?}", error);
1249                None
1250            }
1251        }
1252    }
1253}
1254
1255#[cfg(test)]
1256mod tests {
1257    use super::*;
1258    use crate::{
1259        db::{tests::TestDb, UserId},
1260        AppState,
1261    };
1262    use ::rpc::Peer;
1263    use client::{
1264        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1265        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1266    };
1267    use collections::BTreeMap;
1268    use editor::{
1269        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1270        ToOffset, ToggleCodeActions, Undo,
1271    };
1272    use gpui::{
1273        executor::{self, Deterministic},
1274        geometry::vector::vec2f,
1275        ModelHandle, TestAppContext, ViewHandle,
1276    };
1277    use language::{
1278        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1279        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1280    };
1281    use lsp::{self, FakeLanguageServer};
1282    use parking_lot::Mutex;
1283    use project::{
1284        fs::{FakeFs, Fs as _},
1285        search::SearchQuery,
1286        worktree::WorktreeHandle,
1287        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1288    };
1289    use rand::prelude::*;
1290    use rpc::PeerId;
1291    use serde_json::json;
1292    use settings::Settings;
1293    use sqlx::types::time::OffsetDateTime;
1294    use std::{
1295        env,
1296        ops::Deref,
1297        path::{Path, PathBuf},
1298        rc::Rc,
1299        sync::{
1300            atomic::{AtomicBool, Ordering::SeqCst},
1301            Arc,
1302        },
1303        time::Duration,
1304    };
1305    use theme::ThemeRegistry;
1306    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1307
1308    #[cfg(test)]
1309    #[ctor::ctor]
1310    fn init_logger() {
1311        if std::env::var("RUST_LOG").is_ok() {
1312            env_logger::init();
1313        }
1314    }
1315
1316    #[gpui::test(iterations = 10)]
1317    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1318        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1319        let lang_registry = Arc::new(LanguageRegistry::test());
1320        let fs = FakeFs::new(cx_a.background());
1321        cx_a.foreground().forbid_parking();
1322
1323        // Connect to a server as 2 clients.
1324        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1325        let client_a = server.create_client(cx_a, "user_a").await;
1326        let client_b = server.create_client(cx_b, "user_b").await;
1327
1328        // Share a project as client A
1329        fs.insert_tree(
1330            "/a",
1331            json!({
1332                ".zed.toml": r#"collaborators = ["user_b"]"#,
1333                "a.txt": "a-contents",
1334                "b.txt": "b-contents",
1335            }),
1336        )
1337        .await;
1338        let project_a = cx_a.update(|cx| {
1339            Project::local(
1340                client_a.clone(),
1341                client_a.user_store.clone(),
1342                lang_registry.clone(),
1343                fs.clone(),
1344                cx,
1345            )
1346        });
1347        let (worktree_a, _) = project_a
1348            .update(cx_a, |p, cx| {
1349                p.find_or_create_local_worktree("/a", true, cx)
1350            })
1351            .await
1352            .unwrap();
1353        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1354        worktree_a
1355            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1356            .await;
1357        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1358        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1359
1360        // Join that project as client B
1361        let project_b = Project::remote(
1362            project_id,
1363            client_b.clone(),
1364            client_b.user_store.clone(),
1365            lang_registry.clone(),
1366            fs.clone(),
1367            &mut cx_b.to_async(),
1368        )
1369        .await
1370        .unwrap();
1371
1372        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1373            assert_eq!(
1374                project
1375                    .collaborators()
1376                    .get(&client_a.peer_id)
1377                    .unwrap()
1378                    .user
1379                    .github_login,
1380                "user_a"
1381            );
1382            project.replica_id()
1383        });
1384        project_a
1385            .condition(&cx_a, |tree, _| {
1386                tree.collaborators()
1387                    .get(&client_b.peer_id)
1388                    .map_or(false, |collaborator| {
1389                        collaborator.replica_id == replica_id_b
1390                            && collaborator.user.github_login == "user_b"
1391                    })
1392            })
1393            .await;
1394
1395        // Open the same file as client B and client A.
1396        let buffer_b = project_b
1397            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1398            .await
1399            .unwrap();
1400        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1401        project_a.read_with(cx_a, |project, cx| {
1402            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1403        });
1404        let buffer_a = project_a
1405            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1406            .await
1407            .unwrap();
1408
1409        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1410
1411        // TODO
1412        // // Create a selection set as client B and see that selection set as client A.
1413        // buffer_a
1414        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1415        //     .await;
1416
1417        // Edit the buffer as client B and see that edit as client A.
1418        editor_b.update(cx_b, |editor, cx| {
1419            editor.handle_input(&Input("ok, ".into()), cx)
1420        });
1421        buffer_a
1422            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1423            .await;
1424
1425        // TODO
1426        // // Remove the selection set as client B, see those selections disappear as client A.
1427        cx_b.update(move |_| drop(editor_b));
1428        // buffer_a
1429        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1430        //     .await;
1431
1432        // Dropping the client B's project removes client B from client A's collaborators.
1433        cx_b.update(move |_| drop(project_b));
1434        project_a
1435            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1436            .await;
1437    }
1438
1439    #[gpui::test(iterations = 10)]
1440    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1441        let lang_registry = Arc::new(LanguageRegistry::test());
1442        let fs = FakeFs::new(cx_a.background());
1443        cx_a.foreground().forbid_parking();
1444
1445        // Connect to a server as 2 clients.
1446        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1447        let client_a = server.create_client(cx_a, "user_a").await;
1448        let client_b = server.create_client(cx_b, "user_b").await;
1449
1450        // Share a project as client A
1451        fs.insert_tree(
1452            "/a",
1453            json!({
1454                ".zed.toml": r#"collaborators = ["user_b"]"#,
1455                "a.txt": "a-contents",
1456                "b.txt": "b-contents",
1457            }),
1458        )
1459        .await;
1460        let project_a = cx_a.update(|cx| {
1461            Project::local(
1462                client_a.clone(),
1463                client_a.user_store.clone(),
1464                lang_registry.clone(),
1465                fs.clone(),
1466                cx,
1467            )
1468        });
1469        let (worktree_a, _) = project_a
1470            .update(cx_a, |p, cx| {
1471                p.find_or_create_local_worktree("/a", true, cx)
1472            })
1473            .await
1474            .unwrap();
1475        worktree_a
1476            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1477            .await;
1478        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1479        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1480        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1481        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1482
1483        // Join that project as client B
1484        let project_b = Project::remote(
1485            project_id,
1486            client_b.clone(),
1487            client_b.user_store.clone(),
1488            lang_registry.clone(),
1489            fs.clone(),
1490            &mut cx_b.to_async(),
1491        )
1492        .await
1493        .unwrap();
1494        project_b
1495            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1496            .await
1497            .unwrap();
1498
1499        // Unshare the project as client A
1500        project_a.update(cx_a, |project, cx| project.unshare(cx));
1501        project_b
1502            .condition(cx_b, |project, _| project.is_read_only())
1503            .await;
1504        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1505        cx_b.update(|_| {
1506            drop(project_b);
1507        });
1508
1509        // Share the project again and ensure guests can still join.
1510        project_a
1511            .update(cx_a, |project, cx| project.share(cx))
1512            .await
1513            .unwrap();
1514        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1515
1516        let project_b2 = Project::remote(
1517            project_id,
1518            client_b.clone(),
1519            client_b.user_store.clone(),
1520            lang_registry.clone(),
1521            fs.clone(),
1522            &mut cx_b.to_async(),
1523        )
1524        .await
1525        .unwrap();
1526        project_b2
1527            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1528            .await
1529            .unwrap();
1530    }
1531
1532    #[gpui::test(iterations = 10)]
1533    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1534        let lang_registry = Arc::new(LanguageRegistry::test());
1535        let fs = FakeFs::new(cx_a.background());
1536        cx_a.foreground().forbid_parking();
1537
1538        // Connect to a server as 2 clients.
1539        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1540        let client_a = server.create_client(cx_a, "user_a").await;
1541        let client_b = server.create_client(cx_b, "user_b").await;
1542
1543        // Share a project as client A
1544        fs.insert_tree(
1545            "/a",
1546            json!({
1547                ".zed.toml": r#"collaborators = ["user_b"]"#,
1548                "a.txt": "a-contents",
1549                "b.txt": "b-contents",
1550            }),
1551        )
1552        .await;
1553        let project_a = cx_a.update(|cx| {
1554            Project::local(
1555                client_a.clone(),
1556                client_a.user_store.clone(),
1557                lang_registry.clone(),
1558                fs.clone(),
1559                cx,
1560            )
1561        });
1562        let (worktree_a, _) = project_a
1563            .update(cx_a, |p, cx| {
1564                p.find_or_create_local_worktree("/a", true, cx)
1565            })
1566            .await
1567            .unwrap();
1568        worktree_a
1569            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1570            .await;
1571        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1572        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1573        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1574        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1575
1576        // Join that project as client B
1577        let project_b = Project::remote(
1578            project_id,
1579            client_b.clone(),
1580            client_b.user_store.clone(),
1581            lang_registry.clone(),
1582            fs.clone(),
1583            &mut cx_b.to_async(),
1584        )
1585        .await
1586        .unwrap();
1587        project_b
1588            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1589            .await
1590            .unwrap();
1591
1592        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1593        server.disconnect_client(client_a.current_user_id(cx_a));
1594        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1595        project_a
1596            .condition(cx_a, |project, _| project.collaborators().is_empty())
1597            .await;
1598        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1599        project_b
1600            .condition(cx_b, |project, _| project.is_read_only())
1601            .await;
1602        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1603        cx_b.update(|_| {
1604            drop(project_b);
1605        });
1606
1607        // Await reconnection
1608        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1609
1610        // Share the project again and ensure guests can still join.
1611        project_a
1612            .update(cx_a, |project, cx| project.share(cx))
1613            .await
1614            .unwrap();
1615        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1616
1617        let project_b2 = Project::remote(
1618            project_id,
1619            client_b.clone(),
1620            client_b.user_store.clone(),
1621            lang_registry.clone(),
1622            fs.clone(),
1623            &mut cx_b.to_async(),
1624        )
1625        .await
1626        .unwrap();
1627        project_b2
1628            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1629            .await
1630            .unwrap();
1631    }
1632
1633    #[gpui::test(iterations = 10)]
1634    async fn test_propagate_saves_and_fs_changes(
1635        cx_a: &mut TestAppContext,
1636        cx_b: &mut TestAppContext,
1637        cx_c: &mut TestAppContext,
1638    ) {
1639        let lang_registry = Arc::new(LanguageRegistry::test());
1640        let fs = FakeFs::new(cx_a.background());
1641        cx_a.foreground().forbid_parking();
1642
1643        // Connect to a server as 3 clients.
1644        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1645        let client_a = server.create_client(cx_a, "user_a").await;
1646        let client_b = server.create_client(cx_b, "user_b").await;
1647        let client_c = server.create_client(cx_c, "user_c").await;
1648
1649        // Share a worktree as client A.
1650        fs.insert_tree(
1651            "/a",
1652            json!({
1653                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1654                "file1": "",
1655                "file2": ""
1656            }),
1657        )
1658        .await;
1659        let project_a = cx_a.update(|cx| {
1660            Project::local(
1661                client_a.clone(),
1662                client_a.user_store.clone(),
1663                lang_registry.clone(),
1664                fs.clone(),
1665                cx,
1666            )
1667        });
1668        let (worktree_a, _) = project_a
1669            .update(cx_a, |p, cx| {
1670                p.find_or_create_local_worktree("/a", true, cx)
1671            })
1672            .await
1673            .unwrap();
1674        worktree_a
1675            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1676            .await;
1677        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1678        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1679        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1680
1681        // Join that worktree as clients B and C.
1682        let project_b = Project::remote(
1683            project_id,
1684            client_b.clone(),
1685            client_b.user_store.clone(),
1686            lang_registry.clone(),
1687            fs.clone(),
1688            &mut cx_b.to_async(),
1689        )
1690        .await
1691        .unwrap();
1692        let project_c = Project::remote(
1693            project_id,
1694            client_c.clone(),
1695            client_c.user_store.clone(),
1696            lang_registry.clone(),
1697            fs.clone(),
1698            &mut cx_c.to_async(),
1699        )
1700        .await
1701        .unwrap();
1702        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1703        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1704
1705        // Open and edit a buffer as both guests B and C.
1706        let buffer_b = project_b
1707            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1708            .await
1709            .unwrap();
1710        let buffer_c = project_c
1711            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1712            .await
1713            .unwrap();
1714        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1715        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1716
1717        // Open and edit that buffer as the host.
1718        let buffer_a = project_a
1719            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1720            .await
1721            .unwrap();
1722
1723        buffer_a
1724            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1725            .await;
1726        buffer_a.update(cx_a, |buf, cx| {
1727            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1728        });
1729
1730        // Wait for edits to propagate
1731        buffer_a
1732            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1733            .await;
1734        buffer_b
1735            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1736            .await;
1737        buffer_c
1738            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1739            .await;
1740
1741        // Edit the buffer as the host and concurrently save as guest B.
1742        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1743        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
1744        save_b.await.unwrap();
1745        assert_eq!(
1746            fs.load("/a/file1".as_ref()).await.unwrap(),
1747            "hi-a, i-am-c, i-am-b, i-am-a"
1748        );
1749        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1750        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1751        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1752
1753        worktree_a.flush_fs_events(cx_a).await;
1754
1755        // Make changes on host's file system, see those changes on guest worktrees.
1756        fs.rename(
1757            "/a/file1".as_ref(),
1758            "/a/file1-renamed".as_ref(),
1759            Default::default(),
1760        )
1761        .await
1762        .unwrap();
1763
1764        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1765            .await
1766            .unwrap();
1767        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1768
1769        worktree_a
1770            .condition(&cx_a, |tree, _| {
1771                tree.paths()
1772                    .map(|p| p.to_string_lossy())
1773                    .collect::<Vec<_>>()
1774                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1775            })
1776            .await;
1777        worktree_b
1778            .condition(&cx_b, |tree, _| {
1779                tree.paths()
1780                    .map(|p| p.to_string_lossy())
1781                    .collect::<Vec<_>>()
1782                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1783            })
1784            .await;
1785        worktree_c
1786            .condition(&cx_c, |tree, _| {
1787                tree.paths()
1788                    .map(|p| p.to_string_lossy())
1789                    .collect::<Vec<_>>()
1790                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1791            })
1792            .await;
1793
1794        // Ensure buffer files are updated as well.
1795        buffer_a
1796            .condition(&cx_a, |buf, _| {
1797                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1798            })
1799            .await;
1800        buffer_b
1801            .condition(&cx_b, |buf, _| {
1802                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1803            })
1804            .await;
1805        buffer_c
1806            .condition(&cx_c, |buf, _| {
1807                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1808            })
1809            .await;
1810    }
1811
1812    #[gpui::test(iterations = 10)]
1813    async fn test_worktree_manipulation(
1814        executor: Arc<Deterministic>,
1815        cx_a: &mut TestAppContext,
1816        cx_b: &mut TestAppContext,
1817    ) {
1818        executor.forbid_parking();
1819        let fs = FakeFs::new(cx_a.background());
1820
1821        // Connect to a server as 2 clients.
1822        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1823        let mut client_a = server.create_client(cx_a, "user_a").await;
1824        let mut client_b = server.create_client(cx_b, "user_b").await;
1825
1826        // Share a project as client A
1827        fs.insert_tree(
1828            "/dir",
1829            json!({
1830                ".zed.toml": r#"collaborators = ["user_b"]"#,
1831                "a.txt": "a-contents",
1832                "b.txt": "b-contents",
1833            }),
1834        )
1835        .await;
1836
1837        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
1838        let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
1839        project_a
1840            .update(cx_a, |project, cx| project.share(cx))
1841            .await
1842            .unwrap();
1843
1844        let project_b = client_b.build_remote_project(project_id, cx_b).await;
1845
1846        let worktree_a =
1847            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
1848        let worktree_b =
1849            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
1850
1851        project_b
1852            .update(cx_b, |project, cx| {
1853                project.create_file((worktree_id, "c.txt"), cx).unwrap()
1854            })
1855            .await
1856            .unwrap();
1857
1858        executor.run_until_parked();
1859        worktree_a.read_with(cx_a, |worktree, _| {
1860            assert_eq!(
1861                worktree
1862                    .paths()
1863                    .map(|p| p.to_string_lossy())
1864                    .collect::<Vec<_>>(),
1865                [".zed.toml", "a.txt", "b.txt", "c.txt"]
1866            );
1867        });
1868        worktree_b.read_with(cx_b, |worktree, _| {
1869            assert_eq!(
1870                worktree
1871                    .paths()
1872                    .map(|p| p.to_string_lossy())
1873                    .collect::<Vec<_>>(),
1874                [".zed.toml", "a.txt", "b.txt", "c.txt"]
1875            );
1876        });
1877    }
1878
1879    #[gpui::test(iterations = 10)]
1880    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1881        cx_a.foreground().forbid_parking();
1882        let lang_registry = Arc::new(LanguageRegistry::test());
1883        let fs = FakeFs::new(cx_a.background());
1884
1885        // Connect to a server as 2 clients.
1886        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1887        let client_a = server.create_client(cx_a, "user_a").await;
1888        let client_b = server.create_client(cx_b, "user_b").await;
1889
1890        // Share a project as client A
1891        fs.insert_tree(
1892            "/dir",
1893            json!({
1894                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1895                "a.txt": "a-contents",
1896            }),
1897        )
1898        .await;
1899
1900        let project_a = cx_a.update(|cx| {
1901            Project::local(
1902                client_a.clone(),
1903                client_a.user_store.clone(),
1904                lang_registry.clone(),
1905                fs.clone(),
1906                cx,
1907            )
1908        });
1909        let (worktree_a, _) = project_a
1910            .update(cx_a, |p, cx| {
1911                p.find_or_create_local_worktree("/dir", true, cx)
1912            })
1913            .await
1914            .unwrap();
1915        worktree_a
1916            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1917            .await;
1918        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1919        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1920        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1921
1922        // Join that project as client B
1923        let project_b = Project::remote(
1924            project_id,
1925            client_b.clone(),
1926            client_b.user_store.clone(),
1927            lang_registry.clone(),
1928            fs.clone(),
1929            &mut cx_b.to_async(),
1930        )
1931        .await
1932        .unwrap();
1933
1934        // Open a buffer as client B
1935        let buffer_b = project_b
1936            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1937            .await
1938            .unwrap();
1939
1940        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
1941        buffer_b.read_with(cx_b, |buf, _| {
1942            assert!(buf.is_dirty());
1943            assert!(!buf.has_conflict());
1944        });
1945
1946        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1947        buffer_b
1948            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1949            .await;
1950        buffer_b.read_with(cx_b, |buf, _| {
1951            assert!(!buf.has_conflict());
1952        });
1953
1954        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
1955        buffer_b.read_with(cx_b, |buf, _| {
1956            assert!(buf.is_dirty());
1957            assert!(!buf.has_conflict());
1958        });
1959    }
1960
1961    #[gpui::test(iterations = 10)]
1962    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1963        cx_a.foreground().forbid_parking();
1964        let lang_registry = Arc::new(LanguageRegistry::test());
1965        let fs = FakeFs::new(cx_a.background());
1966
1967        // Connect to a server as 2 clients.
1968        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1969        let client_a = server.create_client(cx_a, "user_a").await;
1970        let client_b = server.create_client(cx_b, "user_b").await;
1971
1972        // Share a project as client A
1973        fs.insert_tree(
1974            "/dir",
1975            json!({
1976                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1977                "a.txt": "a-contents",
1978            }),
1979        )
1980        .await;
1981
1982        let project_a = cx_a.update(|cx| {
1983            Project::local(
1984                client_a.clone(),
1985                client_a.user_store.clone(),
1986                lang_registry.clone(),
1987                fs.clone(),
1988                cx,
1989            )
1990        });
1991        let (worktree_a, _) = project_a
1992            .update(cx_a, |p, cx| {
1993                p.find_or_create_local_worktree("/dir", true, cx)
1994            })
1995            .await
1996            .unwrap();
1997        worktree_a
1998            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1999            .await;
2000        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2001        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2002        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2003
2004        // Join that project as client B
2005        let project_b = Project::remote(
2006            project_id,
2007            client_b.clone(),
2008            client_b.user_store.clone(),
2009            lang_registry.clone(),
2010            fs.clone(),
2011            &mut cx_b.to_async(),
2012        )
2013        .await
2014        .unwrap();
2015        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2016
2017        // Open a buffer as client B
2018        let buffer_b = project_b
2019            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2020            .await
2021            .unwrap();
2022        buffer_b.read_with(cx_b, |buf, _| {
2023            assert!(!buf.is_dirty());
2024            assert!(!buf.has_conflict());
2025        });
2026
2027        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2028            .await
2029            .unwrap();
2030        buffer_b
2031            .condition(&cx_b, |buf, _| {
2032                buf.text() == "new contents" && !buf.is_dirty()
2033            })
2034            .await;
2035        buffer_b.read_with(cx_b, |buf, _| {
2036            assert!(!buf.has_conflict());
2037        });
2038    }
2039
2040    #[gpui::test(iterations = 10)]
2041    async fn test_editing_while_guest_opens_buffer(
2042        cx_a: &mut TestAppContext,
2043        cx_b: &mut TestAppContext,
2044    ) {
2045        cx_a.foreground().forbid_parking();
2046        let lang_registry = Arc::new(LanguageRegistry::test());
2047        let fs = FakeFs::new(cx_a.background());
2048
2049        // Connect to a server as 2 clients.
2050        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2051        let client_a = server.create_client(cx_a, "user_a").await;
2052        let client_b = server.create_client(cx_b, "user_b").await;
2053
2054        // Share a project as client A
2055        fs.insert_tree(
2056            "/dir",
2057            json!({
2058                ".zed.toml": r#"collaborators = ["user_b"]"#,
2059                "a.txt": "a-contents",
2060            }),
2061        )
2062        .await;
2063        let project_a = cx_a.update(|cx| {
2064            Project::local(
2065                client_a.clone(),
2066                client_a.user_store.clone(),
2067                lang_registry.clone(),
2068                fs.clone(),
2069                cx,
2070            )
2071        });
2072        let (worktree_a, _) = project_a
2073            .update(cx_a, |p, cx| {
2074                p.find_or_create_local_worktree("/dir", true, cx)
2075            })
2076            .await
2077            .unwrap();
2078        worktree_a
2079            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2080            .await;
2081        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2082        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2083        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2084
2085        // Join that project as client B
2086        let project_b = Project::remote(
2087            project_id,
2088            client_b.clone(),
2089            client_b.user_store.clone(),
2090            lang_registry.clone(),
2091            fs.clone(),
2092            &mut cx_b.to_async(),
2093        )
2094        .await
2095        .unwrap();
2096
2097        // Open a buffer as client A
2098        let buffer_a = project_a
2099            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2100            .await
2101            .unwrap();
2102
2103        // Start opening the same buffer as client B
2104        let buffer_b = cx_b
2105            .background()
2106            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2107
2108        // Edit the buffer as client A while client B is still opening it.
2109        cx_b.background().simulate_random_delay().await;
2110        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2111        cx_b.background().simulate_random_delay().await;
2112        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2113
2114        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2115        let buffer_b = buffer_b.await.unwrap();
2116        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2117    }
2118
2119    #[gpui::test(iterations = 10)]
2120    async fn test_leaving_worktree_while_opening_buffer(
2121        cx_a: &mut TestAppContext,
2122        cx_b: &mut TestAppContext,
2123    ) {
2124        cx_a.foreground().forbid_parking();
2125        let lang_registry = Arc::new(LanguageRegistry::test());
2126        let fs = FakeFs::new(cx_a.background());
2127
2128        // Connect to a server as 2 clients.
2129        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2130        let client_a = server.create_client(cx_a, "user_a").await;
2131        let client_b = server.create_client(cx_b, "user_b").await;
2132
2133        // Share a project as client A
2134        fs.insert_tree(
2135            "/dir",
2136            json!({
2137                ".zed.toml": r#"collaborators = ["user_b"]"#,
2138                "a.txt": "a-contents",
2139            }),
2140        )
2141        .await;
2142        let project_a = cx_a.update(|cx| {
2143            Project::local(
2144                client_a.clone(),
2145                client_a.user_store.clone(),
2146                lang_registry.clone(),
2147                fs.clone(),
2148                cx,
2149            )
2150        });
2151        let (worktree_a, _) = project_a
2152            .update(cx_a, |p, cx| {
2153                p.find_or_create_local_worktree("/dir", true, cx)
2154            })
2155            .await
2156            .unwrap();
2157        worktree_a
2158            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2159            .await;
2160        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2161        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2162        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2163
2164        // Join that project as client B
2165        let project_b = Project::remote(
2166            project_id,
2167            client_b.clone(),
2168            client_b.user_store.clone(),
2169            lang_registry.clone(),
2170            fs.clone(),
2171            &mut cx_b.to_async(),
2172        )
2173        .await
2174        .unwrap();
2175
2176        // See that a guest has joined as client A.
2177        project_a
2178            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2179            .await;
2180
2181        // Begin opening a buffer as client B, but leave the project before the open completes.
2182        let buffer_b = cx_b
2183            .background()
2184            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2185        cx_b.update(|_| drop(project_b));
2186        drop(buffer_b);
2187
2188        // See that the guest has left.
2189        project_a
2190            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2191            .await;
2192    }
2193
2194    #[gpui::test(iterations = 10)]
2195    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2196        cx_a.foreground().forbid_parking();
2197        let lang_registry = Arc::new(LanguageRegistry::test());
2198        let fs = FakeFs::new(cx_a.background());
2199
2200        // Connect to a server as 2 clients.
2201        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2202        let client_a = server.create_client(cx_a, "user_a").await;
2203        let client_b = server.create_client(cx_b, "user_b").await;
2204
2205        // Share a project as client A
2206        fs.insert_tree(
2207            "/a",
2208            json!({
2209                ".zed.toml": r#"collaborators = ["user_b"]"#,
2210                "a.txt": "a-contents",
2211                "b.txt": "b-contents",
2212            }),
2213        )
2214        .await;
2215        let project_a = cx_a.update(|cx| {
2216            Project::local(
2217                client_a.clone(),
2218                client_a.user_store.clone(),
2219                lang_registry.clone(),
2220                fs.clone(),
2221                cx,
2222            )
2223        });
2224        let (worktree_a, _) = project_a
2225            .update(cx_a, |p, cx| {
2226                p.find_or_create_local_worktree("/a", true, cx)
2227            })
2228            .await
2229            .unwrap();
2230        worktree_a
2231            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2232            .await;
2233        let project_id = project_a
2234            .update(cx_a, |project, _| project.next_remote_id())
2235            .await;
2236        project_a
2237            .update(cx_a, |project, cx| project.share(cx))
2238            .await
2239            .unwrap();
2240
2241        // Join that project as client B
2242        let _project_b = Project::remote(
2243            project_id,
2244            client_b.clone(),
2245            client_b.user_store.clone(),
2246            lang_registry.clone(),
2247            fs.clone(),
2248            &mut cx_b.to_async(),
2249        )
2250        .await
2251        .unwrap();
2252
2253        // Client A sees that a guest has joined.
2254        project_a
2255            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2256            .await;
2257
2258        // Drop client B's connection and ensure client A observes client B leaving the project.
2259        client_b.disconnect(&cx_b.to_async()).unwrap();
2260        project_a
2261            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2262            .await;
2263
2264        // Rejoin the project as client B
2265        let _project_b = Project::remote(
2266            project_id,
2267            client_b.clone(),
2268            client_b.user_store.clone(),
2269            lang_registry.clone(),
2270            fs.clone(),
2271            &mut cx_b.to_async(),
2272        )
2273        .await
2274        .unwrap();
2275
2276        // Client A sees that a guest has re-joined.
2277        project_a
2278            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2279            .await;
2280
2281        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2282        client_b.wait_for_current_user(cx_b).await;
2283        server.disconnect_client(client_b.current_user_id(cx_b));
2284        cx_a.foreground().advance_clock(Duration::from_secs(3));
2285        project_a
2286            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2287            .await;
2288    }
2289
2290    #[gpui::test(iterations = 10)]
2291    async fn test_collaborating_with_diagnostics(
2292        cx_a: &mut TestAppContext,
2293        cx_b: &mut TestAppContext,
2294    ) {
2295        cx_a.foreground().forbid_parking();
2296        let lang_registry = Arc::new(LanguageRegistry::test());
2297        let fs = FakeFs::new(cx_a.background());
2298
2299        // Set up a fake language server.
2300        let mut language = Language::new(
2301            LanguageConfig {
2302                name: "Rust".into(),
2303                path_suffixes: vec!["rs".to_string()],
2304                ..Default::default()
2305            },
2306            Some(tree_sitter_rust::language()),
2307        );
2308        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2309        lang_registry.add(Arc::new(language));
2310
2311        // Connect to a server as 2 clients.
2312        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2313        let client_a = server.create_client(cx_a, "user_a").await;
2314        let client_b = server.create_client(cx_b, "user_b").await;
2315
2316        // Share a project as client A
2317        fs.insert_tree(
2318            "/a",
2319            json!({
2320                ".zed.toml": r#"collaborators = ["user_b"]"#,
2321                "a.rs": "let one = two",
2322                "other.rs": "",
2323            }),
2324        )
2325        .await;
2326        let project_a = cx_a.update(|cx| {
2327            Project::local(
2328                client_a.clone(),
2329                client_a.user_store.clone(),
2330                lang_registry.clone(),
2331                fs.clone(),
2332                cx,
2333            )
2334        });
2335        let (worktree_a, _) = project_a
2336            .update(cx_a, |p, cx| {
2337                p.find_or_create_local_worktree("/a", true, cx)
2338            })
2339            .await
2340            .unwrap();
2341        worktree_a
2342            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2343            .await;
2344        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2345        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2346        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2347
2348        // Cause the language server to start.
2349        let _ = cx_a
2350            .background()
2351            .spawn(project_a.update(cx_a, |project, cx| {
2352                project.open_buffer(
2353                    ProjectPath {
2354                        worktree_id,
2355                        path: Path::new("other.rs").into(),
2356                    },
2357                    cx,
2358                )
2359            }))
2360            .await
2361            .unwrap();
2362
2363        // Simulate a language server reporting errors for a file.
2364        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2365        fake_language_server
2366            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2367            .await;
2368        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2369            lsp::PublishDiagnosticsParams {
2370                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2371                version: None,
2372                diagnostics: vec![lsp::Diagnostic {
2373                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2374                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2375                    message: "message 1".to_string(),
2376                    ..Default::default()
2377                }],
2378            },
2379        );
2380
2381        // Wait for server to see the diagnostics update.
2382        server
2383            .condition(|store| {
2384                let worktree = store
2385                    .project(project_id)
2386                    .unwrap()
2387                    .share
2388                    .as_ref()
2389                    .unwrap()
2390                    .worktrees
2391                    .get(&worktree_id.to_proto())
2392                    .unwrap();
2393
2394                !worktree.diagnostic_summaries.is_empty()
2395            })
2396            .await;
2397
2398        // Join the worktree as client B.
2399        let project_b = Project::remote(
2400            project_id,
2401            client_b.clone(),
2402            client_b.user_store.clone(),
2403            lang_registry.clone(),
2404            fs.clone(),
2405            &mut cx_b.to_async(),
2406        )
2407        .await
2408        .unwrap();
2409
2410        project_b.read_with(cx_b, |project, cx| {
2411            assert_eq!(
2412                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2413                &[(
2414                    ProjectPath {
2415                        worktree_id,
2416                        path: Arc::from(Path::new("a.rs")),
2417                    },
2418                    DiagnosticSummary {
2419                        error_count: 1,
2420                        warning_count: 0,
2421                        ..Default::default()
2422                    },
2423                )]
2424            )
2425        });
2426
2427        // Simulate a language server reporting more errors for a file.
2428        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2429            lsp::PublishDiagnosticsParams {
2430                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2431                version: None,
2432                diagnostics: vec![
2433                    lsp::Diagnostic {
2434                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2435                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2436                        message: "message 1".to_string(),
2437                        ..Default::default()
2438                    },
2439                    lsp::Diagnostic {
2440                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2441                        range: lsp::Range::new(
2442                            lsp::Position::new(0, 10),
2443                            lsp::Position::new(0, 13),
2444                        ),
2445                        message: "message 2".to_string(),
2446                        ..Default::default()
2447                    },
2448                ],
2449            },
2450        );
2451
2452        // Client b gets the updated summaries
2453        project_b
2454            .condition(&cx_b, |project, cx| {
2455                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2456                    == &[(
2457                        ProjectPath {
2458                            worktree_id,
2459                            path: Arc::from(Path::new("a.rs")),
2460                        },
2461                        DiagnosticSummary {
2462                            error_count: 1,
2463                            warning_count: 1,
2464                            ..Default::default()
2465                        },
2466                    )]
2467            })
2468            .await;
2469
2470        // Open the file with the errors on client B. They should be present.
2471        let buffer_b = cx_b
2472            .background()
2473            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2474            .await
2475            .unwrap();
2476
2477        buffer_b.read_with(cx_b, |buffer, _| {
2478            assert_eq!(
2479                buffer
2480                    .snapshot()
2481                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2482                    .map(|entry| entry)
2483                    .collect::<Vec<_>>(),
2484                &[
2485                    DiagnosticEntry {
2486                        range: Point::new(0, 4)..Point::new(0, 7),
2487                        diagnostic: Diagnostic {
2488                            group_id: 0,
2489                            message: "message 1".to_string(),
2490                            severity: lsp::DiagnosticSeverity::ERROR,
2491                            is_primary: true,
2492                            ..Default::default()
2493                        }
2494                    },
2495                    DiagnosticEntry {
2496                        range: Point::new(0, 10)..Point::new(0, 13),
2497                        diagnostic: Diagnostic {
2498                            group_id: 1,
2499                            severity: lsp::DiagnosticSeverity::WARNING,
2500                            message: "message 2".to_string(),
2501                            is_primary: true,
2502                            ..Default::default()
2503                        }
2504                    }
2505                ]
2506            );
2507        });
2508
2509        // Simulate a language server reporting no errors for a file.
2510        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2511            lsp::PublishDiagnosticsParams {
2512                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2513                version: None,
2514                diagnostics: vec![],
2515            },
2516        );
2517        project_a
2518            .condition(cx_a, |project, cx| {
2519                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2520            })
2521            .await;
2522        project_b
2523            .condition(cx_b, |project, cx| {
2524                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2525            })
2526            .await;
2527    }
2528
2529    #[gpui::test(iterations = 10)]
2530    async fn test_collaborating_with_completion(
2531        cx_a: &mut TestAppContext,
2532        cx_b: &mut TestAppContext,
2533    ) {
2534        cx_a.foreground().forbid_parking();
2535        let lang_registry = Arc::new(LanguageRegistry::test());
2536        let fs = FakeFs::new(cx_a.background());
2537
2538        // Set up a fake language server.
2539        let mut language = Language::new(
2540            LanguageConfig {
2541                name: "Rust".into(),
2542                path_suffixes: vec!["rs".to_string()],
2543                ..Default::default()
2544            },
2545            Some(tree_sitter_rust::language()),
2546        );
2547        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2548            capabilities: lsp::ServerCapabilities {
2549                completion_provider: Some(lsp::CompletionOptions {
2550                    trigger_characters: Some(vec![".".to_string()]),
2551                    ..Default::default()
2552                }),
2553                ..Default::default()
2554            },
2555            ..Default::default()
2556        });
2557        lang_registry.add(Arc::new(language));
2558
2559        // Connect to a server as 2 clients.
2560        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2561        let client_a = server.create_client(cx_a, "user_a").await;
2562        let client_b = server.create_client(cx_b, "user_b").await;
2563
2564        // Share a project as client A
2565        fs.insert_tree(
2566            "/a",
2567            json!({
2568                ".zed.toml": r#"collaborators = ["user_b"]"#,
2569                "main.rs": "fn main() { a }",
2570                "other.rs": "",
2571            }),
2572        )
2573        .await;
2574        let project_a = cx_a.update(|cx| {
2575            Project::local(
2576                client_a.clone(),
2577                client_a.user_store.clone(),
2578                lang_registry.clone(),
2579                fs.clone(),
2580                cx,
2581            )
2582        });
2583        let (worktree_a, _) = project_a
2584            .update(cx_a, |p, cx| {
2585                p.find_or_create_local_worktree("/a", true, cx)
2586            })
2587            .await
2588            .unwrap();
2589        worktree_a
2590            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2591            .await;
2592        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2593        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2594        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2595
2596        // Join the worktree as client B.
2597        let project_b = Project::remote(
2598            project_id,
2599            client_b.clone(),
2600            client_b.user_store.clone(),
2601            lang_registry.clone(),
2602            fs.clone(),
2603            &mut cx_b.to_async(),
2604        )
2605        .await
2606        .unwrap();
2607
2608        // Open a file in an editor as the guest.
2609        let buffer_b = project_b
2610            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2611            .await
2612            .unwrap();
2613        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2614        let editor_b = cx_b.add_view(window_b, |cx| {
2615            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2616        });
2617
2618        let fake_language_server = fake_language_servers.next().await.unwrap();
2619        buffer_b
2620            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2621            .await;
2622
2623        // Type a completion trigger character as the guest.
2624        editor_b.update(cx_b, |editor, cx| {
2625            editor.select_ranges([13..13], None, cx);
2626            editor.handle_input(&Input(".".into()), cx);
2627            cx.focus(&editor_b);
2628        });
2629
2630        // Receive a completion request as the host's language server.
2631        // Return some completions from the host's language server.
2632        cx_a.foreground().start_waiting();
2633        fake_language_server
2634            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2635                assert_eq!(
2636                    params.text_document_position.text_document.uri,
2637                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2638                );
2639                assert_eq!(
2640                    params.text_document_position.position,
2641                    lsp::Position::new(0, 14),
2642                );
2643
2644                Ok(Some(lsp::CompletionResponse::Array(vec![
2645                    lsp::CompletionItem {
2646                        label: "first_method(…)".into(),
2647                        detail: Some("fn(&mut self, B) -> C".into()),
2648                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2649                            new_text: "first_method($1)".to_string(),
2650                            range: lsp::Range::new(
2651                                lsp::Position::new(0, 14),
2652                                lsp::Position::new(0, 14),
2653                            ),
2654                        })),
2655                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2656                        ..Default::default()
2657                    },
2658                    lsp::CompletionItem {
2659                        label: "second_method(…)".into(),
2660                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2661                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2662                            new_text: "second_method()".to_string(),
2663                            range: lsp::Range::new(
2664                                lsp::Position::new(0, 14),
2665                                lsp::Position::new(0, 14),
2666                            ),
2667                        })),
2668                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2669                        ..Default::default()
2670                    },
2671                ])))
2672            })
2673            .next()
2674            .await
2675            .unwrap();
2676        cx_a.foreground().finish_waiting();
2677
2678        // Open the buffer on the host.
2679        let buffer_a = project_a
2680            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2681            .await
2682            .unwrap();
2683        buffer_a
2684            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2685            .await;
2686
2687        // Confirm a completion on the guest.
2688        editor_b
2689            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2690            .await;
2691        editor_b.update(cx_b, |editor, cx| {
2692            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2693            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2694        });
2695
2696        // Return a resolved completion from the host's language server.
2697        // The resolved completion has an additional text edit.
2698        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2699            |params, _| async move {
2700                assert_eq!(params.label, "first_method(…)");
2701                Ok(lsp::CompletionItem {
2702                    label: "first_method(…)".into(),
2703                    detail: Some("fn(&mut self, B) -> C".into()),
2704                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2705                        new_text: "first_method($1)".to_string(),
2706                        range: lsp::Range::new(
2707                            lsp::Position::new(0, 14),
2708                            lsp::Position::new(0, 14),
2709                        ),
2710                    })),
2711                    additional_text_edits: Some(vec![lsp::TextEdit {
2712                        new_text: "use d::SomeTrait;\n".to_string(),
2713                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2714                    }]),
2715                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2716                    ..Default::default()
2717                })
2718            },
2719        );
2720
2721        // The additional edit is applied.
2722        buffer_a
2723            .condition(&cx_a, |buffer, _| {
2724                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2725            })
2726            .await;
2727        buffer_b
2728            .condition(&cx_b, |buffer, _| {
2729                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2730            })
2731            .await;
2732    }
2733
2734    #[gpui::test(iterations = 10)]
2735    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2736        cx_a.foreground().forbid_parking();
2737        let lang_registry = Arc::new(LanguageRegistry::test());
2738        let fs = FakeFs::new(cx_a.background());
2739
2740        // Connect to a server as 2 clients.
2741        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2742        let client_a = server.create_client(cx_a, "user_a").await;
2743        let client_b = server.create_client(cx_b, "user_b").await;
2744
2745        // Share a project as client A
2746        fs.insert_tree(
2747            "/a",
2748            json!({
2749                ".zed.toml": r#"collaborators = ["user_b"]"#,
2750                "a.rs": "let one = 1;",
2751            }),
2752        )
2753        .await;
2754        let project_a = cx_a.update(|cx| {
2755            Project::local(
2756                client_a.clone(),
2757                client_a.user_store.clone(),
2758                lang_registry.clone(),
2759                fs.clone(),
2760                cx,
2761            )
2762        });
2763        let (worktree_a, _) = project_a
2764            .update(cx_a, |p, cx| {
2765                p.find_or_create_local_worktree("/a", true, cx)
2766            })
2767            .await
2768            .unwrap();
2769        worktree_a
2770            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2771            .await;
2772        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2773        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2774        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2775        let buffer_a = project_a
2776            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2777            .await
2778            .unwrap();
2779
2780        // Join the worktree as client B.
2781        let project_b = Project::remote(
2782            project_id,
2783            client_b.clone(),
2784            client_b.user_store.clone(),
2785            lang_registry.clone(),
2786            fs.clone(),
2787            &mut cx_b.to_async(),
2788        )
2789        .await
2790        .unwrap();
2791
2792        let buffer_b = cx_b
2793            .background()
2794            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2795            .await
2796            .unwrap();
2797        buffer_b.update(cx_b, |buffer, cx| {
2798            buffer.edit([(4..7, "six")], cx);
2799            buffer.edit([(10..11, "6")], cx);
2800            assert_eq!(buffer.text(), "let six = 6;");
2801            assert!(buffer.is_dirty());
2802            assert!(!buffer.has_conflict());
2803        });
2804        buffer_a
2805            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
2806            .await;
2807
2808        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
2809            .await
2810            .unwrap();
2811        buffer_a
2812            .condition(cx_a, |buffer, _| buffer.has_conflict())
2813            .await;
2814        buffer_b
2815            .condition(cx_b, |buffer, _| buffer.has_conflict())
2816            .await;
2817
2818        project_b
2819            .update(cx_b, |project, cx| {
2820                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
2821            })
2822            .await
2823            .unwrap();
2824        buffer_a.read_with(cx_a, |buffer, _| {
2825            assert_eq!(buffer.text(), "let seven = 7;");
2826            assert!(!buffer.is_dirty());
2827            assert!(!buffer.has_conflict());
2828        });
2829        buffer_b.read_with(cx_b, |buffer, _| {
2830            assert_eq!(buffer.text(), "let seven = 7;");
2831            assert!(!buffer.is_dirty());
2832            assert!(!buffer.has_conflict());
2833        });
2834
2835        buffer_a.update(cx_a, |buffer, cx| {
2836            // Undoing on the host is a no-op when the reload was initiated by the guest.
2837            buffer.undo(cx);
2838            assert_eq!(buffer.text(), "let seven = 7;");
2839            assert!(!buffer.is_dirty());
2840            assert!(!buffer.has_conflict());
2841        });
2842        buffer_b.update(cx_b, |buffer, cx| {
2843            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
2844            buffer.undo(cx);
2845            assert_eq!(buffer.text(), "let six = 6;");
2846            assert!(buffer.is_dirty());
2847            assert!(!buffer.has_conflict());
2848        });
2849    }
2850
2851    #[gpui::test(iterations = 10)]
2852    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2853        cx_a.foreground().forbid_parking();
2854        let lang_registry = Arc::new(LanguageRegistry::test());
2855        let fs = FakeFs::new(cx_a.background());
2856
2857        // Set up a fake language server.
2858        let mut language = Language::new(
2859            LanguageConfig {
2860                name: "Rust".into(),
2861                path_suffixes: vec!["rs".to_string()],
2862                ..Default::default()
2863            },
2864            Some(tree_sitter_rust::language()),
2865        );
2866        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2867        lang_registry.add(Arc::new(language));
2868
2869        // Connect to a server as 2 clients.
2870        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2871        let client_a = server.create_client(cx_a, "user_a").await;
2872        let client_b = server.create_client(cx_b, "user_b").await;
2873
2874        // Share a project as client A
2875        fs.insert_tree(
2876            "/a",
2877            json!({
2878                ".zed.toml": r#"collaborators = ["user_b"]"#,
2879                "a.rs": "let one = two",
2880            }),
2881        )
2882        .await;
2883        let project_a = cx_a.update(|cx| {
2884            Project::local(
2885                client_a.clone(),
2886                client_a.user_store.clone(),
2887                lang_registry.clone(),
2888                fs.clone(),
2889                cx,
2890            )
2891        });
2892        let (worktree_a, _) = project_a
2893            .update(cx_a, |p, cx| {
2894                p.find_or_create_local_worktree("/a", true, cx)
2895            })
2896            .await
2897            .unwrap();
2898        worktree_a
2899            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2900            .await;
2901        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2902        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2903        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2904
2905        // Join the worktree as client B.
2906        let project_b = Project::remote(
2907            project_id,
2908            client_b.clone(),
2909            client_b.user_store.clone(),
2910            lang_registry.clone(),
2911            fs.clone(),
2912            &mut cx_b.to_async(),
2913        )
2914        .await
2915        .unwrap();
2916
2917        let buffer_b = cx_b
2918            .background()
2919            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2920            .await
2921            .unwrap();
2922
2923        let fake_language_server = fake_language_servers.next().await.unwrap();
2924        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
2925            Ok(Some(vec![
2926                lsp::TextEdit {
2927                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2928                    new_text: "h".to_string(),
2929                },
2930                lsp::TextEdit {
2931                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2932                    new_text: "y".to_string(),
2933                },
2934            ]))
2935        });
2936
2937        project_b
2938            .update(cx_b, |project, cx| {
2939                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2940            })
2941            .await
2942            .unwrap();
2943        assert_eq!(
2944            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2945            "let honey = two"
2946        );
2947    }
2948
2949    #[gpui::test(iterations = 10)]
2950    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2951        cx_a.foreground().forbid_parking();
2952        let lang_registry = Arc::new(LanguageRegistry::test());
2953        let fs = FakeFs::new(cx_a.background());
2954        fs.insert_tree(
2955            "/root-1",
2956            json!({
2957                ".zed.toml": r#"collaborators = ["user_b"]"#,
2958                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2959            }),
2960        )
2961        .await;
2962        fs.insert_tree(
2963            "/root-2",
2964            json!({
2965                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2966            }),
2967        )
2968        .await;
2969
2970        // Set up a fake language server.
2971        let mut language = Language::new(
2972            LanguageConfig {
2973                name: "Rust".into(),
2974                path_suffixes: vec!["rs".to_string()],
2975                ..Default::default()
2976            },
2977            Some(tree_sitter_rust::language()),
2978        );
2979        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2980        lang_registry.add(Arc::new(language));
2981
2982        // Connect to a server as 2 clients.
2983        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2984        let client_a = server.create_client(cx_a, "user_a").await;
2985        let client_b = server.create_client(cx_b, "user_b").await;
2986
2987        // Share a project as client A
2988        let project_a = cx_a.update(|cx| {
2989            Project::local(
2990                client_a.clone(),
2991                client_a.user_store.clone(),
2992                lang_registry.clone(),
2993                fs.clone(),
2994                cx,
2995            )
2996        });
2997        let (worktree_a, _) = project_a
2998            .update(cx_a, |p, cx| {
2999                p.find_or_create_local_worktree("/root-1", true, cx)
3000            })
3001            .await
3002            .unwrap();
3003        worktree_a
3004            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3005            .await;
3006        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3007        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3008        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3009
3010        // Join the worktree as client B.
3011        let project_b = Project::remote(
3012            project_id,
3013            client_b.clone(),
3014            client_b.user_store.clone(),
3015            lang_registry.clone(),
3016            fs.clone(),
3017            &mut cx_b.to_async(),
3018        )
3019        .await
3020        .unwrap();
3021
3022        // Open the file on client B.
3023        let buffer_b = cx_b
3024            .background()
3025            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3026            .await
3027            .unwrap();
3028
3029        // Request the definition of a symbol as the guest.
3030        let fake_language_server = fake_language_servers.next().await.unwrap();
3031        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3032            |_, _| async move {
3033                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3034                    lsp::Location::new(
3035                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3036                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3037                    ),
3038                )))
3039            },
3040        );
3041
3042        let definitions_1 = project_b
3043            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3044            .await
3045            .unwrap();
3046        cx_b.read(|cx| {
3047            assert_eq!(definitions_1.len(), 1);
3048            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3049            let target_buffer = definitions_1[0].buffer.read(cx);
3050            assert_eq!(
3051                target_buffer.text(),
3052                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3053            );
3054            assert_eq!(
3055                definitions_1[0].range.to_point(target_buffer),
3056                Point::new(0, 6)..Point::new(0, 9)
3057            );
3058        });
3059
3060        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3061        // the previous call to `definition`.
3062        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3063            |_, _| async move {
3064                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3065                    lsp::Location::new(
3066                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3067                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3068                    ),
3069                )))
3070            },
3071        );
3072
3073        let definitions_2 = project_b
3074            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3075            .await
3076            .unwrap();
3077        cx_b.read(|cx| {
3078            assert_eq!(definitions_2.len(), 1);
3079            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3080            let target_buffer = definitions_2[0].buffer.read(cx);
3081            assert_eq!(
3082                target_buffer.text(),
3083                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3084            );
3085            assert_eq!(
3086                definitions_2[0].range.to_point(target_buffer),
3087                Point::new(1, 6)..Point::new(1, 11)
3088            );
3089        });
3090        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3091    }
3092
3093    #[gpui::test(iterations = 10)]
3094    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3095        cx_a.foreground().forbid_parking();
3096        let lang_registry = Arc::new(LanguageRegistry::test());
3097        let fs = FakeFs::new(cx_a.background());
3098        fs.insert_tree(
3099            "/root-1",
3100            json!({
3101                ".zed.toml": r#"collaborators = ["user_b"]"#,
3102                "one.rs": "const ONE: usize = 1;",
3103                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3104            }),
3105        )
3106        .await;
3107        fs.insert_tree(
3108            "/root-2",
3109            json!({
3110                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3111            }),
3112        )
3113        .await;
3114
3115        // Set up a fake language server.
3116        let mut language = Language::new(
3117            LanguageConfig {
3118                name: "Rust".into(),
3119                path_suffixes: vec!["rs".to_string()],
3120                ..Default::default()
3121            },
3122            Some(tree_sitter_rust::language()),
3123        );
3124        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3125        lang_registry.add(Arc::new(language));
3126
3127        // Connect to a server as 2 clients.
3128        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3129        let client_a = server.create_client(cx_a, "user_a").await;
3130        let client_b = server.create_client(cx_b, "user_b").await;
3131
3132        // Share a project as client A
3133        let project_a = cx_a.update(|cx| {
3134            Project::local(
3135                client_a.clone(),
3136                client_a.user_store.clone(),
3137                lang_registry.clone(),
3138                fs.clone(),
3139                cx,
3140            )
3141        });
3142        let (worktree_a, _) = project_a
3143            .update(cx_a, |p, cx| {
3144                p.find_or_create_local_worktree("/root-1", true, cx)
3145            })
3146            .await
3147            .unwrap();
3148        worktree_a
3149            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3150            .await;
3151        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3152        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3153        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3154
3155        // Join the worktree as client B.
3156        let project_b = Project::remote(
3157            project_id,
3158            client_b.clone(),
3159            client_b.user_store.clone(),
3160            lang_registry.clone(),
3161            fs.clone(),
3162            &mut cx_b.to_async(),
3163        )
3164        .await
3165        .unwrap();
3166
3167        // Open the file on client B.
3168        let buffer_b = cx_b
3169            .background()
3170            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3171            .await
3172            .unwrap();
3173
3174        // Request references to a symbol as the guest.
3175        let fake_language_server = fake_language_servers.next().await.unwrap();
3176        fake_language_server.handle_request::<lsp::request::References, _, _>(
3177            |params, _| async move {
3178                assert_eq!(
3179                    params.text_document_position.text_document.uri.as_str(),
3180                    "file:///root-1/one.rs"
3181                );
3182                Ok(Some(vec![
3183                    lsp::Location {
3184                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3185                        range: lsp::Range::new(
3186                            lsp::Position::new(0, 24),
3187                            lsp::Position::new(0, 27),
3188                        ),
3189                    },
3190                    lsp::Location {
3191                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3192                        range: lsp::Range::new(
3193                            lsp::Position::new(0, 35),
3194                            lsp::Position::new(0, 38),
3195                        ),
3196                    },
3197                    lsp::Location {
3198                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3199                        range: lsp::Range::new(
3200                            lsp::Position::new(0, 37),
3201                            lsp::Position::new(0, 40),
3202                        ),
3203                    },
3204                ]))
3205            },
3206        );
3207
3208        let references = project_b
3209            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3210            .await
3211            .unwrap();
3212        cx_b.read(|cx| {
3213            assert_eq!(references.len(), 3);
3214            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3215
3216            let two_buffer = references[0].buffer.read(cx);
3217            let three_buffer = references[2].buffer.read(cx);
3218            assert_eq!(
3219                two_buffer.file().unwrap().path().as_ref(),
3220                Path::new("two.rs")
3221            );
3222            assert_eq!(references[1].buffer, references[0].buffer);
3223            assert_eq!(
3224                three_buffer.file().unwrap().full_path(cx),
3225                Path::new("three.rs")
3226            );
3227
3228            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3229            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3230            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3231        });
3232    }
3233
3234    #[gpui::test(iterations = 10)]
3235    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3236        cx_a.foreground().forbid_parking();
3237        let lang_registry = Arc::new(LanguageRegistry::test());
3238        let fs = FakeFs::new(cx_a.background());
3239        fs.insert_tree(
3240            "/root-1",
3241            json!({
3242                ".zed.toml": r#"collaborators = ["user_b"]"#,
3243                "a": "hello world",
3244                "b": "goodnight moon",
3245                "c": "a world of goo",
3246                "d": "world champion of clown world",
3247            }),
3248        )
3249        .await;
3250        fs.insert_tree(
3251            "/root-2",
3252            json!({
3253                "e": "disney world is fun",
3254            }),
3255        )
3256        .await;
3257
3258        // Connect to a server as 2 clients.
3259        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3260        let client_a = server.create_client(cx_a, "user_a").await;
3261        let client_b = server.create_client(cx_b, "user_b").await;
3262
3263        // Share a project as client A
3264        let project_a = cx_a.update(|cx| {
3265            Project::local(
3266                client_a.clone(),
3267                client_a.user_store.clone(),
3268                lang_registry.clone(),
3269                fs.clone(),
3270                cx,
3271            )
3272        });
3273        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3274
3275        let (worktree_1, _) = project_a
3276            .update(cx_a, |p, cx| {
3277                p.find_or_create_local_worktree("/root-1", true, cx)
3278            })
3279            .await
3280            .unwrap();
3281        worktree_1
3282            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3283            .await;
3284        let (worktree_2, _) = project_a
3285            .update(cx_a, |p, cx| {
3286                p.find_or_create_local_worktree("/root-2", true, cx)
3287            })
3288            .await
3289            .unwrap();
3290        worktree_2
3291            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3292            .await;
3293
3294        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3295
3296        // Join the worktree as client B.
3297        let project_b = Project::remote(
3298            project_id,
3299            client_b.clone(),
3300            client_b.user_store.clone(),
3301            lang_registry.clone(),
3302            fs.clone(),
3303            &mut cx_b.to_async(),
3304        )
3305        .await
3306        .unwrap();
3307
3308        let results = project_b
3309            .update(cx_b, |project, cx| {
3310                project.search(SearchQuery::text("world", false, false), cx)
3311            })
3312            .await
3313            .unwrap();
3314
3315        let mut ranges_by_path = results
3316            .into_iter()
3317            .map(|(buffer, ranges)| {
3318                buffer.read_with(cx_b, |buffer, cx| {
3319                    let path = buffer.file().unwrap().full_path(cx);
3320                    let offset_ranges = ranges
3321                        .into_iter()
3322                        .map(|range| range.to_offset(buffer))
3323                        .collect::<Vec<_>>();
3324                    (path, offset_ranges)
3325                })
3326            })
3327            .collect::<Vec<_>>();
3328        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3329
3330        assert_eq!(
3331            ranges_by_path,
3332            &[
3333                (PathBuf::from("root-1/a"), vec![6..11]),
3334                (PathBuf::from("root-1/c"), vec![2..7]),
3335                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3336                (PathBuf::from("root-2/e"), vec![7..12]),
3337            ]
3338        );
3339    }
3340
3341    #[gpui::test(iterations = 10)]
3342    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3343        cx_a.foreground().forbid_parking();
3344        let lang_registry = Arc::new(LanguageRegistry::test());
3345        let fs = FakeFs::new(cx_a.background());
3346        fs.insert_tree(
3347            "/root-1",
3348            json!({
3349                ".zed.toml": r#"collaborators = ["user_b"]"#,
3350                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3351            }),
3352        )
3353        .await;
3354
3355        // Set up a fake language server.
3356        let mut language = Language::new(
3357            LanguageConfig {
3358                name: "Rust".into(),
3359                path_suffixes: vec!["rs".to_string()],
3360                ..Default::default()
3361            },
3362            Some(tree_sitter_rust::language()),
3363        );
3364        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3365        lang_registry.add(Arc::new(language));
3366
3367        // Connect to a server as 2 clients.
3368        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3369        let client_a = server.create_client(cx_a, "user_a").await;
3370        let client_b = server.create_client(cx_b, "user_b").await;
3371
3372        // Share a project as client A
3373        let project_a = cx_a.update(|cx| {
3374            Project::local(
3375                client_a.clone(),
3376                client_a.user_store.clone(),
3377                lang_registry.clone(),
3378                fs.clone(),
3379                cx,
3380            )
3381        });
3382        let (worktree_a, _) = project_a
3383            .update(cx_a, |p, cx| {
3384                p.find_or_create_local_worktree("/root-1", true, cx)
3385            })
3386            .await
3387            .unwrap();
3388        worktree_a
3389            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3390            .await;
3391        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3392        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3393        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3394
3395        // Join the worktree as client B.
3396        let project_b = Project::remote(
3397            project_id,
3398            client_b.clone(),
3399            client_b.user_store.clone(),
3400            lang_registry.clone(),
3401            fs.clone(),
3402            &mut cx_b.to_async(),
3403        )
3404        .await
3405        .unwrap();
3406
3407        // Open the file on client B.
3408        let buffer_b = cx_b
3409            .background()
3410            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3411            .await
3412            .unwrap();
3413
3414        // Request document highlights as the guest.
3415        let fake_language_server = fake_language_servers.next().await.unwrap();
3416        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3417            |params, _| async move {
3418                assert_eq!(
3419                    params
3420                        .text_document_position_params
3421                        .text_document
3422                        .uri
3423                        .as_str(),
3424                    "file:///root-1/main.rs"
3425                );
3426                assert_eq!(
3427                    params.text_document_position_params.position,
3428                    lsp::Position::new(0, 34)
3429                );
3430                Ok(Some(vec![
3431                    lsp::DocumentHighlight {
3432                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3433                        range: lsp::Range::new(
3434                            lsp::Position::new(0, 10),
3435                            lsp::Position::new(0, 16),
3436                        ),
3437                    },
3438                    lsp::DocumentHighlight {
3439                        kind: Some(lsp::DocumentHighlightKind::READ),
3440                        range: lsp::Range::new(
3441                            lsp::Position::new(0, 32),
3442                            lsp::Position::new(0, 38),
3443                        ),
3444                    },
3445                    lsp::DocumentHighlight {
3446                        kind: Some(lsp::DocumentHighlightKind::READ),
3447                        range: lsp::Range::new(
3448                            lsp::Position::new(0, 41),
3449                            lsp::Position::new(0, 47),
3450                        ),
3451                    },
3452                ]))
3453            },
3454        );
3455
3456        let highlights = project_b
3457            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3458            .await
3459            .unwrap();
3460        buffer_b.read_with(cx_b, |buffer, _| {
3461            let snapshot = buffer.snapshot();
3462
3463            let highlights = highlights
3464                .into_iter()
3465                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3466                .collect::<Vec<_>>();
3467            assert_eq!(
3468                highlights,
3469                &[
3470                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3471                    (lsp::DocumentHighlightKind::READ, 32..38),
3472                    (lsp::DocumentHighlightKind::READ, 41..47)
3473                ]
3474            )
3475        });
3476    }
3477
3478    #[gpui::test(iterations = 10)]
3479    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3480        cx_a.foreground().forbid_parking();
3481        let lang_registry = Arc::new(LanguageRegistry::test());
3482        let fs = FakeFs::new(cx_a.background());
3483        fs.insert_tree(
3484            "/code",
3485            json!({
3486                "crate-1": {
3487                    ".zed.toml": r#"collaborators = ["user_b"]"#,
3488                    "one.rs": "const ONE: usize = 1;",
3489                },
3490                "crate-2": {
3491                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3492                },
3493                "private": {
3494                    "passwords.txt": "the-password",
3495                }
3496            }),
3497        )
3498        .await;
3499
3500        // Set up a fake language server.
3501        let mut language = Language::new(
3502            LanguageConfig {
3503                name: "Rust".into(),
3504                path_suffixes: vec!["rs".to_string()],
3505                ..Default::default()
3506            },
3507            Some(tree_sitter_rust::language()),
3508        );
3509        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3510        lang_registry.add(Arc::new(language));
3511
3512        // Connect to a server as 2 clients.
3513        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3514        let client_a = server.create_client(cx_a, "user_a").await;
3515        let client_b = server.create_client(cx_b, "user_b").await;
3516
3517        // Share a project as client A
3518        let project_a = cx_a.update(|cx| {
3519            Project::local(
3520                client_a.clone(),
3521                client_a.user_store.clone(),
3522                lang_registry.clone(),
3523                fs.clone(),
3524                cx,
3525            )
3526        });
3527        let (worktree_a, _) = project_a
3528            .update(cx_a, |p, cx| {
3529                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3530            })
3531            .await
3532            .unwrap();
3533        worktree_a
3534            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3535            .await;
3536        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3537        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3538        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3539
3540        // Join the worktree as client B.
3541        let project_b = Project::remote(
3542            project_id,
3543            client_b.clone(),
3544            client_b.user_store.clone(),
3545            lang_registry.clone(),
3546            fs.clone(),
3547            &mut cx_b.to_async(),
3548        )
3549        .await
3550        .unwrap();
3551
3552        // Cause the language server to start.
3553        let _buffer = cx_b
3554            .background()
3555            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3556            .await
3557            .unwrap();
3558
3559        let fake_language_server = fake_language_servers.next().await.unwrap();
3560        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3561            |_, _| async move {
3562                #[allow(deprecated)]
3563                Ok(Some(vec![lsp::SymbolInformation {
3564                    name: "TWO".into(),
3565                    location: lsp::Location {
3566                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3567                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3568                    },
3569                    kind: lsp::SymbolKind::CONSTANT,
3570                    tags: None,
3571                    container_name: None,
3572                    deprecated: None,
3573                }]))
3574            },
3575        );
3576
3577        // Request the definition of a symbol as the guest.
3578        let symbols = project_b
3579            .update(cx_b, |p, cx| p.symbols("two", cx))
3580            .await
3581            .unwrap();
3582        assert_eq!(symbols.len(), 1);
3583        assert_eq!(symbols[0].name, "TWO");
3584
3585        // Open one of the returned symbols.
3586        let buffer_b_2 = project_b
3587            .update(cx_b, |project, cx| {
3588                project.open_buffer_for_symbol(&symbols[0], cx)
3589            })
3590            .await
3591            .unwrap();
3592        buffer_b_2.read_with(cx_b, |buffer, _| {
3593            assert_eq!(
3594                buffer.file().unwrap().path().as_ref(),
3595                Path::new("../crate-2/two.rs")
3596            );
3597        });
3598
3599        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3600        let mut fake_symbol = symbols[0].clone();
3601        fake_symbol.path = Path::new("/code/secrets").into();
3602        let error = project_b
3603            .update(cx_b, |project, cx| {
3604                project.open_buffer_for_symbol(&fake_symbol, cx)
3605            })
3606            .await
3607            .unwrap_err();
3608        assert!(error.to_string().contains("invalid symbol signature"));
3609    }
3610
3611    #[gpui::test(iterations = 10)]
3612    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3613        cx_a: &mut TestAppContext,
3614        cx_b: &mut TestAppContext,
3615        mut rng: StdRng,
3616    ) {
3617        cx_a.foreground().forbid_parking();
3618        let lang_registry = Arc::new(LanguageRegistry::test());
3619        let fs = FakeFs::new(cx_a.background());
3620        fs.insert_tree(
3621            "/root",
3622            json!({
3623                ".zed.toml": r#"collaborators = ["user_b"]"#,
3624                "a.rs": "const ONE: usize = b::TWO;",
3625                "b.rs": "const TWO: usize = 2",
3626            }),
3627        )
3628        .await;
3629
3630        // Set up a fake language server.
3631        let mut language = Language::new(
3632            LanguageConfig {
3633                name: "Rust".into(),
3634                path_suffixes: vec!["rs".to_string()],
3635                ..Default::default()
3636            },
3637            Some(tree_sitter_rust::language()),
3638        );
3639        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3640        lang_registry.add(Arc::new(language));
3641
3642        // Connect to a server as 2 clients.
3643        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3644        let client_a = server.create_client(cx_a, "user_a").await;
3645        let client_b = server.create_client(cx_b, "user_b").await;
3646
3647        // Share a project as client A
3648        let project_a = cx_a.update(|cx| {
3649            Project::local(
3650                client_a.clone(),
3651                client_a.user_store.clone(),
3652                lang_registry.clone(),
3653                fs.clone(),
3654                cx,
3655            )
3656        });
3657
3658        let (worktree_a, _) = project_a
3659            .update(cx_a, |p, cx| {
3660                p.find_or_create_local_worktree("/root", true, cx)
3661            })
3662            .await
3663            .unwrap();
3664        worktree_a
3665            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3666            .await;
3667        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3668        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3669        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3670
3671        // Join the worktree as client B.
3672        let project_b = Project::remote(
3673            project_id,
3674            client_b.clone(),
3675            client_b.user_store.clone(),
3676            lang_registry.clone(),
3677            fs.clone(),
3678            &mut cx_b.to_async(),
3679        )
3680        .await
3681        .unwrap();
3682
3683        let buffer_b1 = cx_b
3684            .background()
3685            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3686            .await
3687            .unwrap();
3688
3689        let fake_language_server = fake_language_servers.next().await.unwrap();
3690        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3691            |_, _| async move {
3692                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3693                    lsp::Location::new(
3694                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
3695                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3696                    ),
3697                )))
3698            },
3699        );
3700
3701        let definitions;
3702        let buffer_b2;
3703        if rng.gen() {
3704            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3705            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3706        } else {
3707            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3708            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3709        }
3710
3711        let buffer_b2 = buffer_b2.await.unwrap();
3712        let definitions = definitions.await.unwrap();
3713        assert_eq!(definitions.len(), 1);
3714        assert_eq!(definitions[0].buffer, buffer_b2);
3715    }
3716
3717    #[gpui::test(iterations = 10)]
3718    async fn test_collaborating_with_code_actions(
3719        cx_a: &mut TestAppContext,
3720        cx_b: &mut TestAppContext,
3721    ) {
3722        cx_a.foreground().forbid_parking();
3723        let lang_registry = Arc::new(LanguageRegistry::test());
3724        let fs = FakeFs::new(cx_a.background());
3725        cx_b.update(|cx| editor::init(cx));
3726
3727        // Set up a fake language server.
3728        let mut language = Language::new(
3729            LanguageConfig {
3730                name: "Rust".into(),
3731                path_suffixes: vec!["rs".to_string()],
3732                ..Default::default()
3733            },
3734            Some(tree_sitter_rust::language()),
3735        );
3736        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3737        lang_registry.add(Arc::new(language));
3738
3739        // Connect to a server as 2 clients.
3740        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3741        let client_a = server.create_client(cx_a, "user_a").await;
3742        let client_b = server.create_client(cx_b, "user_b").await;
3743
3744        // Share a project as client A
3745        fs.insert_tree(
3746            "/a",
3747            json!({
3748                ".zed.toml": r#"collaborators = ["user_b"]"#,
3749                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3750                "other.rs": "pub fn foo() -> usize { 4 }",
3751            }),
3752        )
3753        .await;
3754        let project_a = cx_a.update(|cx| {
3755            Project::local(
3756                client_a.clone(),
3757                client_a.user_store.clone(),
3758                lang_registry.clone(),
3759                fs.clone(),
3760                cx,
3761            )
3762        });
3763        let (worktree_a, _) = project_a
3764            .update(cx_a, |p, cx| {
3765                p.find_or_create_local_worktree("/a", true, cx)
3766            })
3767            .await
3768            .unwrap();
3769        worktree_a
3770            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3771            .await;
3772        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3773        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3774        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3775
3776        // Join the worktree as client B.
3777        let project_b = Project::remote(
3778            project_id,
3779            client_b.clone(),
3780            client_b.user_store.clone(),
3781            lang_registry.clone(),
3782            fs.clone(),
3783            &mut cx_b.to_async(),
3784        )
3785        .await
3786        .unwrap();
3787        let mut params = cx_b.update(WorkspaceParams::test);
3788        params.languages = lang_registry.clone();
3789        params.client = client_b.client.clone();
3790        params.user_store = client_b.user_store.clone();
3791        params.project = project_b;
3792
3793        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3794        let editor_b = workspace_b
3795            .update(cx_b, |workspace, cx| {
3796                workspace.open_path((worktree_id, "main.rs"), cx)
3797            })
3798            .await
3799            .unwrap()
3800            .downcast::<Editor>()
3801            .unwrap();
3802
3803        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3804        fake_language_server
3805            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3806                assert_eq!(
3807                    params.text_document.uri,
3808                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3809                );
3810                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3811                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3812                Ok(None)
3813            })
3814            .next()
3815            .await;
3816
3817        // Move cursor to a location that contains code actions.
3818        editor_b.update(cx_b, |editor, cx| {
3819            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3820            cx.focus(&editor_b);
3821        });
3822
3823        fake_language_server
3824            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3825                assert_eq!(
3826                    params.text_document.uri,
3827                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3828                );
3829                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3830                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3831
3832                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
3833                    lsp::CodeAction {
3834                        title: "Inline into all callers".to_string(),
3835                        edit: Some(lsp::WorkspaceEdit {
3836                            changes: Some(
3837                                [
3838                                    (
3839                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3840                                        vec![lsp::TextEdit::new(
3841                                            lsp::Range::new(
3842                                                lsp::Position::new(1, 22),
3843                                                lsp::Position::new(1, 34),
3844                                            ),
3845                                            "4".to_string(),
3846                                        )],
3847                                    ),
3848                                    (
3849                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3850                                        vec![lsp::TextEdit::new(
3851                                            lsp::Range::new(
3852                                                lsp::Position::new(0, 0),
3853                                                lsp::Position::new(0, 27),
3854                                            ),
3855                                            "".to_string(),
3856                                        )],
3857                                    ),
3858                                ]
3859                                .into_iter()
3860                                .collect(),
3861                            ),
3862                            ..Default::default()
3863                        }),
3864                        data: Some(json!({
3865                            "codeActionParams": {
3866                                "range": {
3867                                    "start": {"line": 1, "column": 31},
3868                                    "end": {"line": 1, "column": 31},
3869                                }
3870                            }
3871                        })),
3872                        ..Default::default()
3873                    },
3874                )]))
3875            })
3876            .next()
3877            .await;
3878
3879        // Toggle code actions and wait for them to display.
3880        editor_b.update(cx_b, |editor, cx| {
3881            editor.toggle_code_actions(
3882                &ToggleCodeActions {
3883                    deployed_from_indicator: false,
3884                },
3885                cx,
3886            );
3887        });
3888        editor_b
3889            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3890            .await;
3891
3892        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3893
3894        // Confirming the code action will trigger a resolve request.
3895        let confirm_action = workspace_b
3896            .update(cx_b, |workspace, cx| {
3897                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
3898            })
3899            .unwrap();
3900        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
3901            |_, _| async move {
3902                Ok(lsp::CodeAction {
3903                    title: "Inline into all callers".to_string(),
3904                    edit: Some(lsp::WorkspaceEdit {
3905                        changes: Some(
3906                            [
3907                                (
3908                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3909                                    vec![lsp::TextEdit::new(
3910                                        lsp::Range::new(
3911                                            lsp::Position::new(1, 22),
3912                                            lsp::Position::new(1, 34),
3913                                        ),
3914                                        "4".to_string(),
3915                                    )],
3916                                ),
3917                                (
3918                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
3919                                    vec![lsp::TextEdit::new(
3920                                        lsp::Range::new(
3921                                            lsp::Position::new(0, 0),
3922                                            lsp::Position::new(0, 27),
3923                                        ),
3924                                        "".to_string(),
3925                                    )],
3926                                ),
3927                            ]
3928                            .into_iter()
3929                            .collect(),
3930                        ),
3931                        ..Default::default()
3932                    }),
3933                    ..Default::default()
3934                })
3935            },
3936        );
3937
3938        // After the action is confirmed, an editor containing both modified files is opened.
3939        confirm_action.await.unwrap();
3940        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3941            workspace
3942                .active_item(cx)
3943                .unwrap()
3944                .downcast::<Editor>()
3945                .unwrap()
3946        });
3947        code_action_editor.update(cx_b, |editor, cx| {
3948            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
3949            editor.undo(&Undo, cx);
3950            assert_eq!(
3951                editor.text(cx),
3952                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
3953            );
3954            editor.redo(&Redo, cx);
3955            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
3956        });
3957    }
3958
3959    #[gpui::test(iterations = 10)]
3960    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3961        cx_a.foreground().forbid_parking();
3962        let lang_registry = Arc::new(LanguageRegistry::test());
3963        let fs = FakeFs::new(cx_a.background());
3964        cx_b.update(|cx| editor::init(cx));
3965
3966        // Set up a fake language server.
3967        let mut language = Language::new(
3968            LanguageConfig {
3969                name: "Rust".into(),
3970                path_suffixes: vec!["rs".to_string()],
3971                ..Default::default()
3972            },
3973            Some(tree_sitter_rust::language()),
3974        );
3975        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3976            capabilities: lsp::ServerCapabilities {
3977                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
3978                    prepare_provider: Some(true),
3979                    work_done_progress_options: Default::default(),
3980                })),
3981                ..Default::default()
3982            },
3983            ..Default::default()
3984        });
3985        lang_registry.add(Arc::new(language));
3986
3987        // Connect to a server as 2 clients.
3988        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3989        let client_a = server.create_client(cx_a, "user_a").await;
3990        let client_b = server.create_client(cx_b, "user_b").await;
3991
3992        // Share a project as client A
3993        fs.insert_tree(
3994            "/dir",
3995            json!({
3996                ".zed.toml": r#"collaborators = ["user_b"]"#,
3997                "one.rs": "const ONE: usize = 1;",
3998                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3999            }),
4000        )
4001        .await;
4002        let project_a = cx_a.update(|cx| {
4003            Project::local(
4004                client_a.clone(),
4005                client_a.user_store.clone(),
4006                lang_registry.clone(),
4007                fs.clone(),
4008                cx,
4009            )
4010        });
4011        let (worktree_a, _) = project_a
4012            .update(cx_a, |p, cx| {
4013                p.find_or_create_local_worktree("/dir", true, cx)
4014            })
4015            .await
4016            .unwrap();
4017        worktree_a
4018            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4019            .await;
4020        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4021        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4022        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4023
4024        // Join the worktree as client B.
4025        let project_b = Project::remote(
4026            project_id,
4027            client_b.clone(),
4028            client_b.user_store.clone(),
4029            lang_registry.clone(),
4030            fs.clone(),
4031            &mut cx_b.to_async(),
4032        )
4033        .await
4034        .unwrap();
4035        let mut params = cx_b.update(WorkspaceParams::test);
4036        params.languages = lang_registry.clone();
4037        params.client = client_b.client.clone();
4038        params.user_store = client_b.user_store.clone();
4039        params.project = project_b;
4040
4041        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4042        let editor_b = workspace_b
4043            .update(cx_b, |workspace, cx| {
4044                workspace.open_path((worktree_id, "one.rs"), cx)
4045            })
4046            .await
4047            .unwrap()
4048            .downcast::<Editor>()
4049            .unwrap();
4050        let fake_language_server = fake_language_servers.next().await.unwrap();
4051
4052        // Move cursor to a location that can be renamed.
4053        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4054            editor.select_ranges([7..7], None, cx);
4055            editor.rename(&Rename, cx).unwrap()
4056        });
4057
4058        fake_language_server
4059            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4060                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4061                assert_eq!(params.position, lsp::Position::new(0, 7));
4062                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4063                    lsp::Position::new(0, 6),
4064                    lsp::Position::new(0, 9),
4065                ))))
4066            })
4067            .next()
4068            .await
4069            .unwrap();
4070        prepare_rename.await.unwrap();
4071        editor_b.update(cx_b, |editor, cx| {
4072            let rename = editor.pending_rename().unwrap();
4073            let buffer = editor.buffer().read(cx).snapshot(cx);
4074            assert_eq!(
4075                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4076                6..9
4077            );
4078            rename.editor.update(cx, |rename_editor, cx| {
4079                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4080                    rename_buffer.edit([(0..3, "THREE")], cx);
4081                });
4082            });
4083        });
4084
4085        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4086            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4087        });
4088        fake_language_server
4089            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4090                assert_eq!(
4091                    params.text_document_position.text_document.uri.as_str(),
4092                    "file:///dir/one.rs"
4093                );
4094                assert_eq!(
4095                    params.text_document_position.position,
4096                    lsp::Position::new(0, 6)
4097                );
4098                assert_eq!(params.new_name, "THREE");
4099                Ok(Some(lsp::WorkspaceEdit {
4100                    changes: Some(
4101                        [
4102                            (
4103                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4104                                vec![lsp::TextEdit::new(
4105                                    lsp::Range::new(
4106                                        lsp::Position::new(0, 6),
4107                                        lsp::Position::new(0, 9),
4108                                    ),
4109                                    "THREE".to_string(),
4110                                )],
4111                            ),
4112                            (
4113                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4114                                vec![
4115                                    lsp::TextEdit::new(
4116                                        lsp::Range::new(
4117                                            lsp::Position::new(0, 24),
4118                                            lsp::Position::new(0, 27),
4119                                        ),
4120                                        "THREE".to_string(),
4121                                    ),
4122                                    lsp::TextEdit::new(
4123                                        lsp::Range::new(
4124                                            lsp::Position::new(0, 35),
4125                                            lsp::Position::new(0, 38),
4126                                        ),
4127                                        "THREE".to_string(),
4128                                    ),
4129                                ],
4130                            ),
4131                        ]
4132                        .into_iter()
4133                        .collect(),
4134                    ),
4135                    ..Default::default()
4136                }))
4137            })
4138            .next()
4139            .await
4140            .unwrap();
4141        confirm_rename.await.unwrap();
4142
4143        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4144            workspace
4145                .active_item(cx)
4146                .unwrap()
4147                .downcast::<Editor>()
4148                .unwrap()
4149        });
4150        rename_editor.update(cx_b, |editor, cx| {
4151            assert_eq!(
4152                editor.text(cx),
4153                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4154            );
4155            editor.undo(&Undo, cx);
4156            assert_eq!(
4157                editor.text(cx),
4158                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4159            );
4160            editor.redo(&Redo, cx);
4161            assert_eq!(
4162                editor.text(cx),
4163                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4164            );
4165        });
4166
4167        // Ensure temporary rename edits cannot be undone/redone.
4168        editor_b.update(cx_b, |editor, cx| {
4169            editor.undo(&Undo, cx);
4170            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4171            editor.undo(&Undo, cx);
4172            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4173            editor.redo(&Redo, cx);
4174            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4175        })
4176    }
4177
4178    #[gpui::test(iterations = 10)]
4179    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4180        cx_a.foreground().forbid_parking();
4181
4182        // Connect to a server as 2 clients.
4183        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4184        let client_a = server.create_client(cx_a, "user_a").await;
4185        let client_b = server.create_client(cx_b, "user_b").await;
4186
4187        // Create an org that includes these 2 users.
4188        let db = &server.app_state.db;
4189        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4190        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4191            .await
4192            .unwrap();
4193        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4194            .await
4195            .unwrap();
4196
4197        // Create a channel that includes all the users.
4198        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4199        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4200            .await
4201            .unwrap();
4202        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4203            .await
4204            .unwrap();
4205        db.create_channel_message(
4206            channel_id,
4207            client_b.current_user_id(&cx_b),
4208            "hello A, it's B.",
4209            OffsetDateTime::now_utc(),
4210            1,
4211        )
4212        .await
4213        .unwrap();
4214
4215        let channels_a = cx_a
4216            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4217        channels_a
4218            .condition(cx_a, |list, _| list.available_channels().is_some())
4219            .await;
4220        channels_a.read_with(cx_a, |list, _| {
4221            assert_eq!(
4222                list.available_channels().unwrap(),
4223                &[ChannelDetails {
4224                    id: channel_id.to_proto(),
4225                    name: "test-channel".to_string()
4226                }]
4227            )
4228        });
4229        let channel_a = channels_a.update(cx_a, |this, cx| {
4230            this.get_channel(channel_id.to_proto(), cx).unwrap()
4231        });
4232        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4233        channel_a
4234            .condition(&cx_a, |channel, _| {
4235                channel_messages(channel)
4236                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4237            })
4238            .await;
4239
4240        let channels_b = cx_b
4241            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4242        channels_b
4243            .condition(cx_b, |list, _| list.available_channels().is_some())
4244            .await;
4245        channels_b.read_with(cx_b, |list, _| {
4246            assert_eq!(
4247                list.available_channels().unwrap(),
4248                &[ChannelDetails {
4249                    id: channel_id.to_proto(),
4250                    name: "test-channel".to_string()
4251                }]
4252            )
4253        });
4254
4255        let channel_b = channels_b.update(cx_b, |this, cx| {
4256            this.get_channel(channel_id.to_proto(), cx).unwrap()
4257        });
4258        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4259        channel_b
4260            .condition(&cx_b, |channel, _| {
4261                channel_messages(channel)
4262                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4263            })
4264            .await;
4265
4266        channel_a
4267            .update(cx_a, |channel, cx| {
4268                channel
4269                    .send_message("oh, hi B.".to_string(), cx)
4270                    .unwrap()
4271                    .detach();
4272                let task = channel.send_message("sup".to_string(), cx).unwrap();
4273                assert_eq!(
4274                    channel_messages(channel),
4275                    &[
4276                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4277                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4278                        ("user_a".to_string(), "sup".to_string(), true)
4279                    ]
4280                );
4281                task
4282            })
4283            .await
4284            .unwrap();
4285
4286        channel_b
4287            .condition(&cx_b, |channel, _| {
4288                channel_messages(channel)
4289                    == [
4290                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4291                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4292                        ("user_a".to_string(), "sup".to_string(), false),
4293                    ]
4294            })
4295            .await;
4296
4297        assert_eq!(
4298            server
4299                .state()
4300                .await
4301                .channel(channel_id)
4302                .unwrap()
4303                .connection_ids
4304                .len(),
4305            2
4306        );
4307        cx_b.update(|_| drop(channel_b));
4308        server
4309            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4310            .await;
4311
4312        cx_a.update(|_| drop(channel_a));
4313        server
4314            .condition(|state| state.channel(channel_id).is_none())
4315            .await;
4316    }
4317
4318    #[gpui::test(iterations = 10)]
4319    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4320        cx_a.foreground().forbid_parking();
4321
4322        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4323        let client_a = server.create_client(cx_a, "user_a").await;
4324
4325        let db = &server.app_state.db;
4326        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4327        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4328        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4329            .await
4330            .unwrap();
4331        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4332            .await
4333            .unwrap();
4334
4335        let channels_a = cx_a
4336            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4337        channels_a
4338            .condition(cx_a, |list, _| list.available_channels().is_some())
4339            .await;
4340        let channel_a = channels_a.update(cx_a, |this, cx| {
4341            this.get_channel(channel_id.to_proto(), cx).unwrap()
4342        });
4343
4344        // Messages aren't allowed to be too long.
4345        channel_a
4346            .update(cx_a, |channel, cx| {
4347                let long_body = "this is long.\n".repeat(1024);
4348                channel.send_message(long_body, cx).unwrap()
4349            })
4350            .await
4351            .unwrap_err();
4352
4353        // Messages aren't allowed to be blank.
4354        channel_a.update(cx_a, |channel, cx| {
4355            channel.send_message(String::new(), cx).unwrap_err()
4356        });
4357
4358        // Leading and trailing whitespace are trimmed.
4359        channel_a
4360            .update(cx_a, |channel, cx| {
4361                channel
4362                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4363                    .unwrap()
4364            })
4365            .await
4366            .unwrap();
4367        assert_eq!(
4368            db.get_channel_messages(channel_id, 10, None)
4369                .await
4370                .unwrap()
4371                .iter()
4372                .map(|m| &m.body)
4373                .collect::<Vec<_>>(),
4374            &["surrounded by whitespace"]
4375        );
4376    }
4377
4378    #[gpui::test(iterations = 10)]
4379    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4380        cx_a.foreground().forbid_parking();
4381
4382        // Connect to a server as 2 clients.
4383        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4384        let client_a = server.create_client(cx_a, "user_a").await;
4385        let client_b = server.create_client(cx_b, "user_b").await;
4386        let mut status_b = client_b.status();
4387
4388        // Create an org that includes these 2 users.
4389        let db = &server.app_state.db;
4390        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4391        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4392            .await
4393            .unwrap();
4394        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4395            .await
4396            .unwrap();
4397
4398        // Create a channel that includes all the users.
4399        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4400        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4401            .await
4402            .unwrap();
4403        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4404            .await
4405            .unwrap();
4406        db.create_channel_message(
4407            channel_id,
4408            client_b.current_user_id(&cx_b),
4409            "hello A, it's B.",
4410            OffsetDateTime::now_utc(),
4411            2,
4412        )
4413        .await
4414        .unwrap();
4415
4416        let channels_a = cx_a
4417            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4418        channels_a
4419            .condition(cx_a, |list, _| list.available_channels().is_some())
4420            .await;
4421
4422        channels_a.read_with(cx_a, |list, _| {
4423            assert_eq!(
4424                list.available_channels().unwrap(),
4425                &[ChannelDetails {
4426                    id: channel_id.to_proto(),
4427                    name: "test-channel".to_string()
4428                }]
4429            )
4430        });
4431        let channel_a = channels_a.update(cx_a, |this, cx| {
4432            this.get_channel(channel_id.to_proto(), cx).unwrap()
4433        });
4434        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4435        channel_a
4436            .condition(&cx_a, |channel, _| {
4437                channel_messages(channel)
4438                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4439            })
4440            .await;
4441
4442        let channels_b = cx_b
4443            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4444        channels_b
4445            .condition(cx_b, |list, _| list.available_channels().is_some())
4446            .await;
4447        channels_b.read_with(cx_b, |list, _| {
4448            assert_eq!(
4449                list.available_channels().unwrap(),
4450                &[ChannelDetails {
4451                    id: channel_id.to_proto(),
4452                    name: "test-channel".to_string()
4453                }]
4454            )
4455        });
4456
4457        let channel_b = channels_b.update(cx_b, |this, cx| {
4458            this.get_channel(channel_id.to_proto(), cx).unwrap()
4459        });
4460        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4461        channel_b
4462            .condition(&cx_b, |channel, _| {
4463                channel_messages(channel)
4464                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4465            })
4466            .await;
4467
4468        // Disconnect client B, ensuring we can still access its cached channel data.
4469        server.forbid_connections();
4470        server.disconnect_client(client_b.current_user_id(&cx_b));
4471        cx_b.foreground().advance_clock(Duration::from_secs(3));
4472        while !matches!(
4473            status_b.next().await,
4474            Some(client::Status::ReconnectionError { .. })
4475        ) {}
4476
4477        channels_b.read_with(cx_b, |channels, _| {
4478            assert_eq!(
4479                channels.available_channels().unwrap(),
4480                [ChannelDetails {
4481                    id: channel_id.to_proto(),
4482                    name: "test-channel".to_string()
4483                }]
4484            )
4485        });
4486        channel_b.read_with(cx_b, |channel, _| {
4487            assert_eq!(
4488                channel_messages(channel),
4489                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4490            )
4491        });
4492
4493        // Send a message from client B while it is disconnected.
4494        channel_b
4495            .update(cx_b, |channel, cx| {
4496                let task = channel
4497                    .send_message("can you see this?".to_string(), cx)
4498                    .unwrap();
4499                assert_eq!(
4500                    channel_messages(channel),
4501                    &[
4502                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4503                        ("user_b".to_string(), "can you see this?".to_string(), true)
4504                    ]
4505                );
4506                task
4507            })
4508            .await
4509            .unwrap_err();
4510
4511        // Send a message from client A while B is disconnected.
4512        channel_a
4513            .update(cx_a, |channel, cx| {
4514                channel
4515                    .send_message("oh, hi B.".to_string(), cx)
4516                    .unwrap()
4517                    .detach();
4518                let task = channel.send_message("sup".to_string(), cx).unwrap();
4519                assert_eq!(
4520                    channel_messages(channel),
4521                    &[
4522                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4523                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4524                        ("user_a".to_string(), "sup".to_string(), true)
4525                    ]
4526                );
4527                task
4528            })
4529            .await
4530            .unwrap();
4531
4532        // Give client B a chance to reconnect.
4533        server.allow_connections();
4534        cx_b.foreground().advance_clock(Duration::from_secs(10));
4535
4536        // Verify that B sees the new messages upon reconnection, as well as the message client B
4537        // sent while offline.
4538        channel_b
4539            .condition(&cx_b, |channel, _| {
4540                channel_messages(channel)
4541                    == [
4542                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4543                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4544                        ("user_a".to_string(), "sup".to_string(), false),
4545                        ("user_b".to_string(), "can you see this?".to_string(), false),
4546                    ]
4547            })
4548            .await;
4549
4550        // Ensure client A and B can communicate normally after reconnection.
4551        channel_a
4552            .update(cx_a, |channel, cx| {
4553                channel.send_message("you online?".to_string(), cx).unwrap()
4554            })
4555            .await
4556            .unwrap();
4557        channel_b
4558            .condition(&cx_b, |channel, _| {
4559                channel_messages(channel)
4560                    == [
4561                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4562                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4563                        ("user_a".to_string(), "sup".to_string(), false),
4564                        ("user_b".to_string(), "can you see this?".to_string(), false),
4565                        ("user_a".to_string(), "you online?".to_string(), false),
4566                    ]
4567            })
4568            .await;
4569
4570        channel_b
4571            .update(cx_b, |channel, cx| {
4572                channel.send_message("yep".to_string(), cx).unwrap()
4573            })
4574            .await
4575            .unwrap();
4576        channel_a
4577            .condition(&cx_a, |channel, _| {
4578                channel_messages(channel)
4579                    == [
4580                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4581                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4582                        ("user_a".to_string(), "sup".to_string(), false),
4583                        ("user_b".to_string(), "can you see this?".to_string(), false),
4584                        ("user_a".to_string(), "you online?".to_string(), false),
4585                        ("user_b".to_string(), "yep".to_string(), false),
4586                    ]
4587            })
4588            .await;
4589    }
4590
4591    #[gpui::test(iterations = 10)]
4592    async fn test_contacts(
4593        cx_a: &mut TestAppContext,
4594        cx_b: &mut TestAppContext,
4595        cx_c: &mut TestAppContext,
4596    ) {
4597        cx_a.foreground().forbid_parking();
4598        let lang_registry = Arc::new(LanguageRegistry::test());
4599        let fs = FakeFs::new(cx_a.background());
4600
4601        // Connect to a server as 3 clients.
4602        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4603        let client_a = server.create_client(cx_a, "user_a").await;
4604        let client_b = server.create_client(cx_b, "user_b").await;
4605        let client_c = server.create_client(cx_c, "user_c").await;
4606
4607        // Share a worktree as client A.
4608        fs.insert_tree(
4609            "/a",
4610            json!({
4611                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4612            }),
4613        )
4614        .await;
4615
4616        let project_a = cx_a.update(|cx| {
4617            Project::local(
4618                client_a.clone(),
4619                client_a.user_store.clone(),
4620                lang_registry.clone(),
4621                fs.clone(),
4622                cx,
4623            )
4624        });
4625        let (worktree_a, _) = project_a
4626            .update(cx_a, |p, cx| {
4627                p.find_or_create_local_worktree("/a", true, cx)
4628            })
4629            .await
4630            .unwrap();
4631        worktree_a
4632            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4633            .await;
4634
4635        client_a
4636            .user_store
4637            .condition(&cx_a, |user_store, _| {
4638                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4639            })
4640            .await;
4641        client_b
4642            .user_store
4643            .condition(&cx_b, |user_store, _| {
4644                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4645            })
4646            .await;
4647        client_c
4648            .user_store
4649            .condition(&cx_c, |user_store, _| {
4650                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4651            })
4652            .await;
4653
4654        let project_id = project_a
4655            .update(cx_a, |project, _| project.next_remote_id())
4656            .await;
4657        project_a
4658            .update(cx_a, |project, cx| project.share(cx))
4659            .await
4660            .unwrap();
4661        client_a
4662            .user_store
4663            .condition(&cx_a, |user_store, _| {
4664                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4665            })
4666            .await;
4667        client_b
4668            .user_store
4669            .condition(&cx_b, |user_store, _| {
4670                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4671            })
4672            .await;
4673        client_c
4674            .user_store
4675            .condition(&cx_c, |user_store, _| {
4676                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4677            })
4678            .await;
4679
4680        let _project_b = Project::remote(
4681            project_id,
4682            client_b.clone(),
4683            client_b.user_store.clone(),
4684            lang_registry.clone(),
4685            fs.clone(),
4686            &mut cx_b.to_async(),
4687        )
4688        .await
4689        .unwrap();
4690
4691        client_a
4692            .user_store
4693            .condition(&cx_a, |user_store, _| {
4694                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4695            })
4696            .await;
4697        client_b
4698            .user_store
4699            .condition(&cx_b, |user_store, _| {
4700                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4701            })
4702            .await;
4703        client_c
4704            .user_store
4705            .condition(&cx_c, |user_store, _| {
4706                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4707            })
4708            .await;
4709
4710        project_a
4711            .condition(&cx_a, |project, _| {
4712                project.collaborators().contains_key(&client_b.peer_id)
4713            })
4714            .await;
4715
4716        cx_a.update(move |_| drop(project_a));
4717        client_a
4718            .user_store
4719            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4720            .await;
4721        client_b
4722            .user_store
4723            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4724            .await;
4725        client_c
4726            .user_store
4727            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4728            .await;
4729
4730        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
4731            user_store
4732                .contacts()
4733                .iter()
4734                .map(|contact| {
4735                    let worktrees = contact
4736                        .projects
4737                        .iter()
4738                        .map(|p| {
4739                            (
4740                                p.worktree_root_names[0].as_str(),
4741                                p.is_shared,
4742                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4743                            )
4744                        })
4745                        .collect();
4746                    (contact.user.github_login.as_str(), worktrees)
4747                })
4748                .collect()
4749        }
4750    }
4751
4752    #[gpui::test(iterations = 10)]
4753    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4754        cx_a.foreground().forbid_parking();
4755        let fs = FakeFs::new(cx_a.background());
4756
4757        // 2 clients connect to a server.
4758        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4759        let mut client_a = server.create_client(cx_a, "user_a").await;
4760        let mut client_b = server.create_client(cx_b, "user_b").await;
4761        cx_a.update(editor::init);
4762        cx_b.update(editor::init);
4763
4764        // Client A shares a project.
4765        fs.insert_tree(
4766            "/a",
4767            json!({
4768                ".zed.toml": r#"collaborators = ["user_b"]"#,
4769                "1.txt": "one",
4770                "2.txt": "two",
4771                "3.txt": "three",
4772            }),
4773        )
4774        .await;
4775        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4776        project_a
4777            .update(cx_a, |project, cx| project.share(cx))
4778            .await
4779            .unwrap();
4780
4781        // Client B joins the project.
4782        let project_b = client_b
4783            .build_remote_project(
4784                project_a
4785                    .read_with(cx_a, |project, _| project.remote_id())
4786                    .unwrap(),
4787                cx_b,
4788            )
4789            .await;
4790
4791        // Client A opens some editors.
4792        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4793        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4794        let editor_a1 = workspace_a
4795            .update(cx_a, |workspace, cx| {
4796                workspace.open_path((worktree_id, "1.txt"), cx)
4797            })
4798            .await
4799            .unwrap()
4800            .downcast::<Editor>()
4801            .unwrap();
4802        let editor_a2 = workspace_a
4803            .update(cx_a, |workspace, cx| {
4804                workspace.open_path((worktree_id, "2.txt"), cx)
4805            })
4806            .await
4807            .unwrap()
4808            .downcast::<Editor>()
4809            .unwrap();
4810
4811        // Client B opens an editor.
4812        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4813        let editor_b1 = workspace_b
4814            .update(cx_b, |workspace, cx| {
4815                workspace.open_path((worktree_id, "1.txt"), cx)
4816            })
4817            .await
4818            .unwrap()
4819            .downcast::<Editor>()
4820            .unwrap();
4821
4822        let client_a_id = project_b.read_with(cx_b, |project, _| {
4823            project.collaborators().values().next().unwrap().peer_id
4824        });
4825        let client_b_id = project_a.read_with(cx_a, |project, _| {
4826            project.collaborators().values().next().unwrap().peer_id
4827        });
4828
4829        // When client B starts following client A, all visible view states are replicated to client B.
4830        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4831        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4832        workspace_b
4833            .update(cx_b, |workspace, cx| {
4834                workspace
4835                    .toggle_follow(&ToggleFollow(client_a_id), cx)
4836                    .unwrap()
4837            })
4838            .await
4839            .unwrap();
4840        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4841            workspace
4842                .active_item(cx)
4843                .unwrap()
4844                .downcast::<Editor>()
4845                .unwrap()
4846        });
4847        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4848        assert_eq!(
4849            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4850            Some((worktree_id, "2.txt").into())
4851        );
4852        assert_eq!(
4853            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4854            vec![2..3]
4855        );
4856        assert_eq!(
4857            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4858            vec![0..1]
4859        );
4860
4861        // When client A activates a different editor, client B does so as well.
4862        workspace_a.update(cx_a, |workspace, cx| {
4863            workspace.activate_item(&editor_a1, cx)
4864        });
4865        workspace_b
4866            .condition(cx_b, |workspace, cx| {
4867                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4868            })
4869            .await;
4870
4871        // When client A navigates back and forth, client B does so as well.
4872        workspace_a
4873            .update(cx_a, |workspace, cx| {
4874                workspace::Pane::go_back(workspace, None, cx)
4875            })
4876            .await;
4877        workspace_b
4878            .condition(cx_b, |workspace, cx| {
4879                workspace.active_item(cx).unwrap().id() == editor_b2.id()
4880            })
4881            .await;
4882
4883        workspace_a
4884            .update(cx_a, |workspace, cx| {
4885                workspace::Pane::go_forward(workspace, None, cx)
4886            })
4887            .await;
4888        workspace_b
4889            .condition(cx_b, |workspace, cx| {
4890                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4891            })
4892            .await;
4893
4894        // Changes to client A's editor are reflected on client B.
4895        editor_a1.update(cx_a, |editor, cx| {
4896            editor.select_ranges([1..1, 2..2], None, cx);
4897        });
4898        editor_b1
4899            .condition(cx_b, |editor, cx| {
4900                editor.selected_ranges(cx) == vec![1..1, 2..2]
4901            })
4902            .await;
4903
4904        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
4905        editor_b1
4906            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
4907            .await;
4908
4909        editor_a1.update(cx_a, |editor, cx| {
4910            editor.select_ranges([3..3], None, cx);
4911            editor.set_scroll_position(vec2f(0., 100.), cx);
4912        });
4913        editor_b1
4914            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
4915            .await;
4916
4917        // After unfollowing, client B stops receiving updates from client A.
4918        workspace_b.update(cx_b, |workspace, cx| {
4919            workspace.unfollow(&workspace.active_pane().clone(), cx)
4920        });
4921        workspace_a.update(cx_a, |workspace, cx| {
4922            workspace.activate_item(&editor_a2, cx)
4923        });
4924        cx_a.foreground().run_until_parked();
4925        assert_eq!(
4926            workspace_b.read_with(cx_b, |workspace, cx| workspace
4927                .active_item(cx)
4928                .unwrap()
4929                .id()),
4930            editor_b1.id()
4931        );
4932
4933        // Client A starts following client B.
4934        workspace_a
4935            .update(cx_a, |workspace, cx| {
4936                workspace
4937                    .toggle_follow(&ToggleFollow(client_b_id), cx)
4938                    .unwrap()
4939            })
4940            .await
4941            .unwrap();
4942        assert_eq!(
4943            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4944            Some(client_b_id)
4945        );
4946        assert_eq!(
4947            workspace_a.read_with(cx_a, |workspace, cx| workspace
4948                .active_item(cx)
4949                .unwrap()
4950                .id()),
4951            editor_a1.id()
4952        );
4953
4954        // Following interrupts when client B disconnects.
4955        client_b.disconnect(&cx_b.to_async()).unwrap();
4956        cx_a.foreground().run_until_parked();
4957        assert_eq!(
4958            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4959            None
4960        );
4961    }
4962
4963    #[gpui::test(iterations = 10)]
4964    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4965        cx_a.foreground().forbid_parking();
4966        let fs = FakeFs::new(cx_a.background());
4967
4968        // 2 clients connect to a server.
4969        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4970        let mut client_a = server.create_client(cx_a, "user_a").await;
4971        let mut client_b = server.create_client(cx_b, "user_b").await;
4972        cx_a.update(editor::init);
4973        cx_b.update(editor::init);
4974
4975        // Client A shares a project.
4976        fs.insert_tree(
4977            "/a",
4978            json!({
4979                ".zed.toml": r#"collaborators = ["user_b"]"#,
4980                "1.txt": "one",
4981                "2.txt": "two",
4982                "3.txt": "three",
4983                "4.txt": "four",
4984            }),
4985        )
4986        .await;
4987        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4988        project_a
4989            .update(cx_a, |project, cx| project.share(cx))
4990            .await
4991            .unwrap();
4992
4993        // Client B joins the project.
4994        let project_b = client_b
4995            .build_remote_project(
4996                project_a
4997                    .read_with(cx_a, |project, _| project.remote_id())
4998                    .unwrap(),
4999                cx_b,
5000            )
5001            .await;
5002
5003        // Client A opens some editors.
5004        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5005        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5006        let _editor_a1 = workspace_a
5007            .update(cx_a, |workspace, cx| {
5008                workspace.open_path((worktree_id, "1.txt"), cx)
5009            })
5010            .await
5011            .unwrap()
5012            .downcast::<Editor>()
5013            .unwrap();
5014
5015        // Client B opens an editor.
5016        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5017        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5018        let _editor_b1 = workspace_b
5019            .update(cx_b, |workspace, cx| {
5020                workspace.open_path((worktree_id, "2.txt"), cx)
5021            })
5022            .await
5023            .unwrap()
5024            .downcast::<Editor>()
5025            .unwrap();
5026
5027        // Clients A and B follow each other in split panes
5028        workspace_a
5029            .update(cx_a, |workspace, cx| {
5030                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5031                assert_ne!(*workspace.active_pane(), pane_a1);
5032                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5033                workspace
5034                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5035                    .unwrap()
5036            })
5037            .await
5038            .unwrap();
5039        workspace_b
5040            .update(cx_b, |workspace, cx| {
5041                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5042                assert_ne!(*workspace.active_pane(), pane_b1);
5043                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5044                workspace
5045                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5046                    .unwrap()
5047            })
5048            .await
5049            .unwrap();
5050
5051        workspace_a
5052            .update(cx_a, |workspace, cx| {
5053                workspace.activate_next_pane(cx);
5054                assert_eq!(*workspace.active_pane(), pane_a1);
5055                workspace.open_path((worktree_id, "3.txt"), cx)
5056            })
5057            .await
5058            .unwrap();
5059        workspace_b
5060            .update(cx_b, |workspace, cx| {
5061                workspace.activate_next_pane(cx);
5062                assert_eq!(*workspace.active_pane(), pane_b1);
5063                workspace.open_path((worktree_id, "4.txt"), cx)
5064            })
5065            .await
5066            .unwrap();
5067        cx_a.foreground().run_until_parked();
5068
5069        // Ensure leader updates don't change the active pane of followers
5070        workspace_a.read_with(cx_a, |workspace, _| {
5071            assert_eq!(*workspace.active_pane(), pane_a1);
5072        });
5073        workspace_b.read_with(cx_b, |workspace, _| {
5074            assert_eq!(*workspace.active_pane(), pane_b1);
5075        });
5076
5077        // Ensure peers following each other doesn't cause an infinite loop.
5078        assert_eq!(
5079            workspace_a.read_with(cx_a, |workspace, cx| workspace
5080                .active_item(cx)
5081                .unwrap()
5082                .project_path(cx)),
5083            Some((worktree_id, "3.txt").into())
5084        );
5085        workspace_a.update(cx_a, |workspace, cx| {
5086            assert_eq!(
5087                workspace.active_item(cx).unwrap().project_path(cx),
5088                Some((worktree_id, "3.txt").into())
5089            );
5090            workspace.activate_next_pane(cx);
5091            assert_eq!(
5092                workspace.active_item(cx).unwrap().project_path(cx),
5093                Some((worktree_id, "4.txt").into())
5094            );
5095        });
5096        workspace_b.update(cx_b, |workspace, cx| {
5097            assert_eq!(
5098                workspace.active_item(cx).unwrap().project_path(cx),
5099                Some((worktree_id, "4.txt").into())
5100            );
5101            workspace.activate_next_pane(cx);
5102            assert_eq!(
5103                workspace.active_item(cx).unwrap().project_path(cx),
5104                Some((worktree_id, "3.txt").into())
5105            );
5106        });
5107    }
5108
5109    #[gpui::test(iterations = 10)]
5110    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5111        cx_a.foreground().forbid_parking();
5112        let fs = FakeFs::new(cx_a.background());
5113
5114        // 2 clients connect to a server.
5115        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5116        let mut client_a = server.create_client(cx_a, "user_a").await;
5117        let mut client_b = server.create_client(cx_b, "user_b").await;
5118        cx_a.update(editor::init);
5119        cx_b.update(editor::init);
5120
5121        // Client A shares a project.
5122        fs.insert_tree(
5123            "/a",
5124            json!({
5125                ".zed.toml": r#"collaborators = ["user_b"]"#,
5126                "1.txt": "one",
5127                "2.txt": "two",
5128                "3.txt": "three",
5129            }),
5130        )
5131        .await;
5132        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5133        project_a
5134            .update(cx_a, |project, cx| project.share(cx))
5135            .await
5136            .unwrap();
5137
5138        // Client B joins the project.
5139        let project_b = client_b
5140            .build_remote_project(
5141                project_a
5142                    .read_with(cx_a, |project, _| project.remote_id())
5143                    .unwrap(),
5144                cx_b,
5145            )
5146            .await;
5147
5148        // Client A opens some editors.
5149        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5150        let _editor_a1 = workspace_a
5151            .update(cx_a, |workspace, cx| {
5152                workspace.open_path((worktree_id, "1.txt"), cx)
5153            })
5154            .await
5155            .unwrap()
5156            .downcast::<Editor>()
5157            .unwrap();
5158
5159        // Client B starts following client A.
5160        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5161        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5162        let leader_id = project_b.read_with(cx_b, |project, _| {
5163            project.collaborators().values().next().unwrap().peer_id
5164        });
5165        workspace_b
5166            .update(cx_b, |workspace, cx| {
5167                workspace
5168                    .toggle_follow(&ToggleFollow(leader_id), cx)
5169                    .unwrap()
5170            })
5171            .await
5172            .unwrap();
5173        assert_eq!(
5174            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5175            Some(leader_id)
5176        );
5177        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5178            workspace
5179                .active_item(cx)
5180                .unwrap()
5181                .downcast::<Editor>()
5182                .unwrap()
5183        });
5184
5185        // When client B moves, it automatically stops following client A.
5186        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5187        assert_eq!(
5188            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5189            None
5190        );
5191
5192        workspace_b
5193            .update(cx_b, |workspace, cx| {
5194                workspace
5195                    .toggle_follow(&ToggleFollow(leader_id), cx)
5196                    .unwrap()
5197            })
5198            .await
5199            .unwrap();
5200        assert_eq!(
5201            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5202            Some(leader_id)
5203        );
5204
5205        // When client B edits, it automatically stops following client A.
5206        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5207        assert_eq!(
5208            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5209            None
5210        );
5211
5212        workspace_b
5213            .update(cx_b, |workspace, cx| {
5214                workspace
5215                    .toggle_follow(&ToggleFollow(leader_id), cx)
5216                    .unwrap()
5217            })
5218            .await
5219            .unwrap();
5220        assert_eq!(
5221            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5222            Some(leader_id)
5223        );
5224
5225        // When client B scrolls, it automatically stops following client A.
5226        editor_b2.update(cx_b, |editor, cx| {
5227            editor.set_scroll_position(vec2f(0., 3.), cx)
5228        });
5229        assert_eq!(
5230            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5231            None
5232        );
5233
5234        workspace_b
5235            .update(cx_b, |workspace, cx| {
5236                workspace
5237                    .toggle_follow(&ToggleFollow(leader_id), cx)
5238                    .unwrap()
5239            })
5240            .await
5241            .unwrap();
5242        assert_eq!(
5243            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5244            Some(leader_id)
5245        );
5246
5247        // When client B activates a different pane, it continues following client A in the original pane.
5248        workspace_b.update(cx_b, |workspace, cx| {
5249            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5250        });
5251        assert_eq!(
5252            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5253            Some(leader_id)
5254        );
5255
5256        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5257        assert_eq!(
5258            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5259            Some(leader_id)
5260        );
5261
5262        // When client B activates a different item in the original pane, it automatically stops following client A.
5263        workspace_b
5264            .update(cx_b, |workspace, cx| {
5265                workspace.open_path((worktree_id, "2.txt"), cx)
5266            })
5267            .await
5268            .unwrap();
5269        assert_eq!(
5270            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5271            None
5272        );
5273    }
5274
5275    #[gpui::test(iterations = 100)]
5276    async fn test_random_collaboration(
5277        cx: &mut TestAppContext,
5278        deterministic: Arc<Deterministic>,
5279        rng: StdRng,
5280    ) {
5281        cx.foreground().forbid_parking();
5282        let max_peers = env::var("MAX_PEERS")
5283            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5284            .unwrap_or(5);
5285        assert!(max_peers <= 5);
5286
5287        let max_operations = env::var("OPERATIONS")
5288            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5289            .unwrap_or(10);
5290
5291        let rng = Arc::new(Mutex::new(rng));
5292
5293        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5294        let host_language_registry = Arc::new(LanguageRegistry::test());
5295
5296        let fs = FakeFs::new(cx.background());
5297        fs.insert_tree(
5298            "/_collab",
5299            json!({
5300                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5301            }),
5302        )
5303        .await;
5304
5305        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5306        let mut clients = Vec::new();
5307        let mut user_ids = Vec::new();
5308        let mut op_start_signals = Vec::new();
5309        let files = Arc::new(Mutex::new(Vec::new()));
5310
5311        let mut next_entity_id = 100000;
5312        let mut host_cx = TestAppContext::new(
5313            cx.foreground_platform(),
5314            cx.platform(),
5315            deterministic.build_foreground(next_entity_id),
5316            deterministic.build_background(),
5317            cx.font_cache(),
5318            cx.leak_detector(),
5319            next_entity_id,
5320        );
5321        let host = server.create_client(&mut host_cx, "host").await;
5322        let host_project = host_cx.update(|cx| {
5323            Project::local(
5324                host.client.clone(),
5325                host.user_store.clone(),
5326                host_language_registry.clone(),
5327                fs.clone(),
5328                cx,
5329            )
5330        });
5331        let host_project_id = host_project
5332            .update(&mut host_cx, |p, _| p.next_remote_id())
5333            .await;
5334
5335        let (collab_worktree, _) = host_project
5336            .update(&mut host_cx, |project, cx| {
5337                project.find_or_create_local_worktree("/_collab", true, cx)
5338            })
5339            .await
5340            .unwrap();
5341        collab_worktree
5342            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5343            .await;
5344        host_project
5345            .update(&mut host_cx, |project, cx| project.share(cx))
5346            .await
5347            .unwrap();
5348
5349        // Set up fake language servers.
5350        let mut language = Language::new(
5351            LanguageConfig {
5352                name: "Rust".into(),
5353                path_suffixes: vec!["rs".to_string()],
5354                ..Default::default()
5355            },
5356            None,
5357        );
5358        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5359            name: "the-fake-language-server",
5360            capabilities: lsp::LanguageServer::full_capabilities(),
5361            initializer: Some(Box::new({
5362                let rng = rng.clone();
5363                let files = files.clone();
5364                let project = host_project.downgrade();
5365                move |fake_server: &mut FakeLanguageServer| {
5366                    fake_server.handle_request::<lsp::request::Completion, _, _>(
5367                        |_, _| async move {
5368                            Ok(Some(lsp::CompletionResponse::Array(vec![
5369                                lsp::CompletionItem {
5370                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5371                                        range: lsp::Range::new(
5372                                            lsp::Position::new(0, 0),
5373                                            lsp::Position::new(0, 0),
5374                                        ),
5375                                        new_text: "the-new-text".to_string(),
5376                                    })),
5377                                    ..Default::default()
5378                                },
5379                            ])))
5380                        },
5381                    );
5382
5383                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5384                        |_, _| async move {
5385                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5386                                lsp::CodeAction {
5387                                    title: "the-code-action".to_string(),
5388                                    ..Default::default()
5389                                },
5390                            )]))
5391                        },
5392                    );
5393
5394                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5395                        |params, _| async move {
5396                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5397                                params.position,
5398                                params.position,
5399                            ))))
5400                        },
5401                    );
5402
5403                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5404                        let files = files.clone();
5405                        let rng = rng.clone();
5406                        move |_, _| {
5407                            let files = files.clone();
5408                            let rng = rng.clone();
5409                            async move {
5410                                let files = files.lock();
5411                                let mut rng = rng.lock();
5412                                let count = rng.gen_range::<usize, _>(1..3);
5413                                let files = (0..count)
5414                                    .map(|_| files.choose(&mut *rng).unwrap())
5415                                    .collect::<Vec<_>>();
5416                                log::info!("LSP: Returning definitions in files {:?}", &files);
5417                                Ok(Some(lsp::GotoDefinitionResponse::Array(
5418                                    files
5419                                        .into_iter()
5420                                        .map(|file| lsp::Location {
5421                                            uri: lsp::Url::from_file_path(file).unwrap(),
5422                                            range: Default::default(),
5423                                        })
5424                                        .collect(),
5425                                )))
5426                            }
5427                        }
5428                    });
5429
5430                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5431                        let rng = rng.clone();
5432                        let project = project.clone();
5433                        move |params, mut cx| {
5434                            let highlights = if let Some(project) = project.upgrade(&cx) {
5435                                project.update(&mut cx, |project, cx| {
5436                                    let path = params
5437                                        .text_document_position_params
5438                                        .text_document
5439                                        .uri
5440                                        .to_file_path()
5441                                        .unwrap();
5442                                    let (worktree, relative_path) =
5443                                        project.find_local_worktree(&path, cx)?;
5444                                    let project_path =
5445                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
5446                                    let buffer =
5447                                        project.get_open_buffer(&project_path, cx)?.read(cx);
5448
5449                                    let mut highlights = Vec::new();
5450                                    let highlight_count = rng.lock().gen_range(1..=5);
5451                                    let mut prev_end = 0;
5452                                    for _ in 0..highlight_count {
5453                                        let range =
5454                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
5455
5456                                        highlights.push(lsp::DocumentHighlight {
5457                                            range: range_to_lsp(range.to_point_utf16(buffer)),
5458                                            kind: Some(lsp::DocumentHighlightKind::READ),
5459                                        });
5460                                        prev_end = range.end;
5461                                    }
5462                                    Some(highlights)
5463                                })
5464                            } else {
5465                                None
5466                            };
5467                            async move { Ok(highlights) }
5468                        }
5469                    });
5470                }
5471            })),
5472            ..Default::default()
5473        });
5474        host_language_registry.add(Arc::new(language));
5475
5476        let op_start_signal = futures::channel::mpsc::unbounded();
5477        user_ids.push(host.current_user_id(&host_cx));
5478        op_start_signals.push(op_start_signal.0);
5479        clients.push(host_cx.foreground().spawn(host.simulate_host(
5480            host_project,
5481            files,
5482            op_start_signal.1,
5483            rng.clone(),
5484            host_cx,
5485        )));
5486
5487        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5488            rng.lock().gen_range(0..max_operations)
5489        } else {
5490            max_operations
5491        };
5492        let mut available_guests = vec![
5493            "guest-1".to_string(),
5494            "guest-2".to_string(),
5495            "guest-3".to_string(),
5496            "guest-4".to_string(),
5497        ];
5498        let mut operations = 0;
5499        while operations < max_operations {
5500            if operations == disconnect_host_at {
5501                server.disconnect_client(user_ids[0]);
5502                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5503                drop(op_start_signals);
5504                let mut clients = futures::future::join_all(clients).await;
5505                cx.foreground().run_until_parked();
5506
5507                let (host, mut host_cx, host_err) = clients.remove(0);
5508                if let Some(host_err) = host_err {
5509                    log::error!("host error - {}", host_err);
5510                }
5511                host.project
5512                    .as_ref()
5513                    .unwrap()
5514                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5515                for (guest, mut guest_cx, guest_err) in clients {
5516                    if let Some(guest_err) = guest_err {
5517                        log::error!("{} error - {}", guest.username, guest_err);
5518                    }
5519                    let contacts = server
5520                        .store
5521                        .read()
5522                        .await
5523                        .contacts_for_user(guest.current_user_id(&guest_cx));
5524                    assert!(!contacts
5525                        .iter()
5526                        .flat_map(|contact| &contact.projects)
5527                        .any(|project| project.id == host_project_id));
5528                    guest
5529                        .project
5530                        .as_ref()
5531                        .unwrap()
5532                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5533                    guest_cx.update(|_| drop(guest));
5534                }
5535                host_cx.update(|_| drop(host));
5536
5537                return;
5538            }
5539
5540            let distribution = rng.lock().gen_range(0..100);
5541            match distribution {
5542                0..=19 if !available_guests.is_empty() => {
5543                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
5544                    let guest_username = available_guests.remove(guest_ix);
5545                    log::info!("Adding new connection for {}", guest_username);
5546                    next_entity_id += 100000;
5547                    let mut guest_cx = TestAppContext::new(
5548                        cx.foreground_platform(),
5549                        cx.platform(),
5550                        deterministic.build_foreground(next_entity_id),
5551                        deterministic.build_background(),
5552                        cx.font_cache(),
5553                        cx.leak_detector(),
5554                        next_entity_id,
5555                    );
5556                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
5557                    let guest_project = Project::remote(
5558                        host_project_id,
5559                        guest.client.clone(),
5560                        guest.user_store.clone(),
5561                        guest_lang_registry.clone(),
5562                        FakeFs::new(cx.background()),
5563                        &mut guest_cx.to_async(),
5564                    )
5565                    .await
5566                    .unwrap();
5567                    let op_start_signal = futures::channel::mpsc::unbounded();
5568                    user_ids.push(guest.current_user_id(&guest_cx));
5569                    op_start_signals.push(op_start_signal.0);
5570                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5571                        guest_username.clone(),
5572                        guest_project,
5573                        op_start_signal.1,
5574                        rng.clone(),
5575                        guest_cx,
5576                    )));
5577
5578                    log::info!("Added connection for {}", guest_username);
5579                    operations += 1;
5580                }
5581                20..=29 if clients.len() > 1 => {
5582                    log::info!("Removing guest");
5583                    let guest_ix = rng.lock().gen_range(1..clients.len());
5584                    let removed_guest_id = user_ids.remove(guest_ix);
5585                    let guest = clients.remove(guest_ix);
5586                    op_start_signals.remove(guest_ix);
5587                    server.disconnect_client(removed_guest_id);
5588                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5589                    let (guest, mut guest_cx, guest_err) = guest.await;
5590                    if let Some(guest_err) = guest_err {
5591                        log::error!("{} error - {}", guest.username, guest_err);
5592                    }
5593                    guest
5594                        .project
5595                        .as_ref()
5596                        .unwrap()
5597                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5598                    for user_id in &user_ids {
5599                        for contact in server.store.read().await.contacts_for_user(*user_id) {
5600                            assert_ne!(
5601                                contact.user_id, removed_guest_id.0 as u64,
5602                                "removed guest is still a contact of another peer"
5603                            );
5604                            for project in contact.projects {
5605                                for project_guest_id in project.guests {
5606                                    assert_ne!(
5607                                        project_guest_id, removed_guest_id.0 as u64,
5608                                        "removed guest appears as still participating on a project"
5609                                    );
5610                                }
5611                            }
5612                        }
5613                    }
5614
5615                    log::info!("{} removed", guest.username);
5616                    available_guests.push(guest.username.clone());
5617                    guest_cx.update(|_| drop(guest));
5618
5619                    operations += 1;
5620                }
5621                _ => {
5622                    while operations < max_operations && rng.lock().gen_bool(0.7) {
5623                        op_start_signals
5624                            .choose(&mut *rng.lock())
5625                            .unwrap()
5626                            .unbounded_send(())
5627                            .unwrap();
5628                        operations += 1;
5629                    }
5630
5631                    if rng.lock().gen_bool(0.8) {
5632                        cx.foreground().run_until_parked();
5633                    }
5634                }
5635            }
5636        }
5637
5638        drop(op_start_signals);
5639        let mut clients = futures::future::join_all(clients).await;
5640        cx.foreground().run_until_parked();
5641
5642        let (host_client, mut host_cx, host_err) = clients.remove(0);
5643        if let Some(host_err) = host_err {
5644            panic!("host error - {}", host_err);
5645        }
5646        let host_project = host_client.project.as_ref().unwrap();
5647        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5648            project
5649                .worktrees(cx)
5650                .map(|worktree| {
5651                    let snapshot = worktree.read(cx).snapshot();
5652                    (snapshot.id(), snapshot)
5653                })
5654                .collect::<BTreeMap<_, _>>()
5655        });
5656
5657        host_client
5658            .project
5659            .as_ref()
5660            .unwrap()
5661            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5662
5663        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5664            if let Some(guest_err) = guest_err {
5665                panic!("{} error - {}", guest_client.username, guest_err);
5666            }
5667            let worktree_snapshots =
5668                guest_client
5669                    .project
5670                    .as_ref()
5671                    .unwrap()
5672                    .read_with(&guest_cx, |project, cx| {
5673                        project
5674                            .worktrees(cx)
5675                            .map(|worktree| {
5676                                let worktree = worktree.read(cx);
5677                                (worktree.id(), worktree.snapshot())
5678                            })
5679                            .collect::<BTreeMap<_, _>>()
5680                    });
5681
5682            assert_eq!(
5683                worktree_snapshots.keys().collect::<Vec<_>>(),
5684                host_worktree_snapshots.keys().collect::<Vec<_>>(),
5685                "{} has different worktrees than the host",
5686                guest_client.username
5687            );
5688            for (id, host_snapshot) in &host_worktree_snapshots {
5689                let guest_snapshot = &worktree_snapshots[id];
5690                assert_eq!(
5691                    guest_snapshot.root_name(),
5692                    host_snapshot.root_name(),
5693                    "{} has different root name than the host for worktree {}",
5694                    guest_client.username,
5695                    id
5696                );
5697                assert_eq!(
5698                    guest_snapshot.entries(false).collect::<Vec<_>>(),
5699                    host_snapshot.entries(false).collect::<Vec<_>>(),
5700                    "{} has different snapshot than the host for worktree {}",
5701                    guest_client.username,
5702                    id
5703                );
5704            }
5705
5706            guest_client
5707                .project
5708                .as_ref()
5709                .unwrap()
5710                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5711
5712            for guest_buffer in &guest_client.buffers {
5713                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5714                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5715                    project.buffer_for_id(buffer_id, cx).expect(&format!(
5716                        "host does not have buffer for guest:{}, peer:{}, id:{}",
5717                        guest_client.username, guest_client.peer_id, buffer_id
5718                    ))
5719                });
5720                let path = host_buffer
5721                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5722
5723                assert_eq!(
5724                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5725                    0,
5726                    "{}, buffer {}, path {:?} has deferred operations",
5727                    guest_client.username,
5728                    buffer_id,
5729                    path,
5730                );
5731                assert_eq!(
5732                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5733                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5734                    "{}, buffer {}, path {:?}, differs from the host's buffer",
5735                    guest_client.username,
5736                    buffer_id,
5737                    path
5738                );
5739            }
5740
5741            guest_cx.update(|_| drop(guest_client));
5742        }
5743
5744        host_cx.update(|_| drop(host_client));
5745    }
5746
5747    struct TestServer {
5748        peer: Arc<Peer>,
5749        app_state: Arc<AppState>,
5750        server: Arc<Server>,
5751        foreground: Rc<executor::Foreground>,
5752        notifications: mpsc::UnboundedReceiver<()>,
5753        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5754        forbid_connections: Arc<AtomicBool>,
5755        _test_db: TestDb,
5756    }
5757
5758    impl TestServer {
5759        async fn start(
5760            foreground: Rc<executor::Foreground>,
5761            background: Arc<executor::Background>,
5762        ) -> Self {
5763            let test_db = TestDb::fake(background);
5764            let app_state = Self::build_app_state(&test_db).await;
5765            let peer = Peer::new();
5766            let notifications = mpsc::unbounded();
5767            let server = Server::new(app_state.clone(), Some(notifications.0));
5768            Self {
5769                peer,
5770                app_state,
5771                server,
5772                foreground,
5773                notifications: notifications.1,
5774                connection_killers: Default::default(),
5775                forbid_connections: Default::default(),
5776                _test_db: test_db,
5777            }
5778        }
5779
5780        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
5781            cx.update(|cx| {
5782                let settings = Settings::test(cx);
5783                cx.set_global(settings);
5784            });
5785
5786            let http = FakeHttpClient::with_404_response();
5787            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
5788            let client_name = name.to_string();
5789            let mut client = Client::new(http.clone());
5790            let server = self.server.clone();
5791            let connection_killers = self.connection_killers.clone();
5792            let forbid_connections = self.forbid_connections.clone();
5793            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
5794
5795            Arc::get_mut(&mut client)
5796                .unwrap()
5797                .override_authenticate(move |cx| {
5798                    cx.spawn(|_| async move {
5799                        let access_token = "the-token".to_string();
5800                        Ok(Credentials {
5801                            user_id: user_id.0 as u64,
5802                            access_token,
5803                        })
5804                    })
5805                })
5806                .override_establish_connection(move |credentials, cx| {
5807                    assert_eq!(credentials.user_id, user_id.0 as u64);
5808                    assert_eq!(credentials.access_token, "the-token");
5809
5810                    let server = server.clone();
5811                    let connection_killers = connection_killers.clone();
5812                    let forbid_connections = forbid_connections.clone();
5813                    let client_name = client_name.clone();
5814                    let connection_id_tx = connection_id_tx.clone();
5815                    cx.spawn(move |cx| async move {
5816                        if forbid_connections.load(SeqCst) {
5817                            Err(EstablishConnectionError::other(anyhow!(
5818                                "server is forbidding connections"
5819                            )))
5820                        } else {
5821                            let (client_conn, server_conn, killed) =
5822                                Connection::in_memory(cx.background());
5823                            connection_killers.lock().insert(user_id, killed);
5824                            cx.background()
5825                                .spawn(server.handle_connection(
5826                                    server_conn,
5827                                    client_name,
5828                                    user_id,
5829                                    Some(connection_id_tx),
5830                                    cx.background(),
5831                                ))
5832                                .detach();
5833                            Ok(client_conn)
5834                        }
5835                    })
5836                });
5837
5838            client
5839                .authenticate_and_connect(false, &cx.to_async())
5840                .await
5841                .unwrap();
5842
5843            Channel::init(&client);
5844            Project::init(&client);
5845            cx.update(|cx| {
5846                workspace::init(&client, cx);
5847            });
5848
5849            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5850            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5851
5852            let client = TestClient {
5853                client,
5854                peer_id,
5855                username: name.to_string(),
5856                user_store,
5857                language_registry: Arc::new(LanguageRegistry::test()),
5858                project: Default::default(),
5859                buffers: Default::default(),
5860            };
5861            client.wait_for_current_user(cx).await;
5862            client
5863        }
5864
5865        fn disconnect_client(&self, user_id: UserId) {
5866            self.connection_killers
5867                .lock()
5868                .remove(&user_id)
5869                .unwrap()
5870                .store(true, SeqCst);
5871        }
5872
5873        fn forbid_connections(&self) {
5874            self.forbid_connections.store(true, SeqCst);
5875        }
5876
5877        fn allow_connections(&self) {
5878            self.forbid_connections.store(false, SeqCst);
5879        }
5880
5881        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
5882            Arc::new(AppState {
5883                db: test_db.db().clone(),
5884                api_token: Default::default(),
5885            })
5886        }
5887
5888        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
5889            self.server.store.read().await
5890        }
5891
5892        async fn condition<F>(&mut self, mut predicate: F)
5893        where
5894            F: FnMut(&Store) -> bool,
5895        {
5896            assert!(
5897                self.foreground.parking_forbidden(),
5898                "you must call forbid_parking to use server conditions so we don't block indefinitely"
5899            );
5900            while !(predicate)(&*self.server.store.read().await) {
5901                self.foreground.start_waiting();
5902                self.notifications.next().await;
5903                self.foreground.finish_waiting();
5904            }
5905        }
5906    }
5907
5908    impl Deref for TestServer {
5909        type Target = Server;
5910
5911        fn deref(&self) -> &Self::Target {
5912            &self.server
5913        }
5914    }
5915
5916    impl Drop for TestServer {
5917        fn drop(&mut self) {
5918            self.peer.reset();
5919        }
5920    }
5921
5922    struct TestClient {
5923        client: Arc<Client>,
5924        username: String,
5925        pub peer_id: PeerId,
5926        pub user_store: ModelHandle<UserStore>,
5927        language_registry: Arc<LanguageRegistry>,
5928        project: Option<ModelHandle<Project>>,
5929        buffers: HashSet<ModelHandle<language::Buffer>>,
5930    }
5931
5932    impl Deref for TestClient {
5933        type Target = Arc<Client>;
5934
5935        fn deref(&self) -> &Self::Target {
5936            &self.client
5937        }
5938    }
5939
5940    impl TestClient {
5941        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
5942            UserId::from_proto(
5943                self.user_store
5944                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
5945            )
5946        }
5947
5948        async fn wait_for_current_user(&self, cx: &TestAppContext) {
5949            let mut authed_user = self
5950                .user_store
5951                .read_with(cx, |user_store, _| user_store.watch_current_user());
5952            while authed_user.next().await.unwrap().is_none() {}
5953        }
5954
5955        async fn build_local_project(
5956            &mut self,
5957            fs: Arc<FakeFs>,
5958            root_path: impl AsRef<Path>,
5959            cx: &mut TestAppContext,
5960        ) -> (ModelHandle<Project>, WorktreeId) {
5961            let project = cx.update(|cx| {
5962                Project::local(
5963                    self.client.clone(),
5964                    self.user_store.clone(),
5965                    self.language_registry.clone(),
5966                    fs,
5967                    cx,
5968                )
5969            });
5970            self.project = Some(project.clone());
5971            let (worktree, _) = project
5972                .update(cx, |p, cx| {
5973                    p.find_or_create_local_worktree(root_path, true, cx)
5974                })
5975                .await
5976                .unwrap();
5977            worktree
5978                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
5979                .await;
5980            project
5981                .update(cx, |project, _| project.next_remote_id())
5982                .await;
5983            (project, worktree.read_with(cx, |tree, _| tree.id()))
5984        }
5985
5986        async fn build_remote_project(
5987            &mut self,
5988            project_id: u64,
5989            cx: &mut TestAppContext,
5990        ) -> ModelHandle<Project> {
5991            let project = Project::remote(
5992                project_id,
5993                self.client.clone(),
5994                self.user_store.clone(),
5995                self.language_registry.clone(),
5996                FakeFs::new(cx.background()),
5997                &mut cx.to_async(),
5998            )
5999            .await
6000            .unwrap();
6001            self.project = Some(project.clone());
6002            project
6003        }
6004
6005        fn build_workspace(
6006            &self,
6007            project: &ModelHandle<Project>,
6008            cx: &mut TestAppContext,
6009        ) -> ViewHandle<Workspace> {
6010            let (window_id, _) = cx.add_window(|_| EmptyView);
6011            cx.add_view(window_id, |cx| {
6012                let fs = project.read(cx).fs().clone();
6013                Workspace::new(
6014                    &WorkspaceParams {
6015                        fs,
6016                        project: project.clone(),
6017                        user_store: self.user_store.clone(),
6018                        languages: self.language_registry.clone(),
6019                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6020                        channel_list: cx.add_model(|cx| {
6021                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6022                        }),
6023                        client: self.client.clone(),
6024                    },
6025                    cx,
6026                )
6027            })
6028        }
6029
6030        async fn simulate_host(
6031            mut self,
6032            project: ModelHandle<Project>,
6033            files: Arc<Mutex<Vec<PathBuf>>>,
6034            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6035            rng: Arc<Mutex<StdRng>>,
6036            mut cx: TestAppContext,
6037        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6038            async fn simulate_host_internal(
6039                client: &mut TestClient,
6040                project: ModelHandle<Project>,
6041                files: Arc<Mutex<Vec<PathBuf>>>,
6042                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6043                rng: Arc<Mutex<StdRng>>,
6044                cx: &mut TestAppContext,
6045            ) -> anyhow::Result<()> {
6046                let fs = project.read_with(cx, |project, _| project.fs().clone());
6047
6048                while op_start_signal.next().await.is_some() {
6049                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6050                    match distribution {
6051                        0..=20 if !files.lock().is_empty() => {
6052                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6053                            let mut path = path.as_path();
6054                            while let Some(parent_path) = path.parent() {
6055                                path = parent_path;
6056                                if rng.lock().gen() {
6057                                    break;
6058                                }
6059                            }
6060
6061                            log::info!("Host: find/create local worktree {:?}", path);
6062                            let find_or_create_worktree = project.update(cx, |project, cx| {
6063                                project.find_or_create_local_worktree(path, true, cx)
6064                            });
6065                            if rng.lock().gen() {
6066                                cx.background().spawn(find_or_create_worktree).detach();
6067                            } else {
6068                                find_or_create_worktree.await?;
6069                            }
6070                        }
6071                        10..=80 if !files.lock().is_empty() => {
6072                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6073                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6074                                let (worktree, path) = project
6075                                    .update(cx, |project, cx| {
6076                                        project.find_or_create_local_worktree(
6077                                            file.clone(),
6078                                            true,
6079                                            cx,
6080                                        )
6081                                    })
6082                                    .await?;
6083                                let project_path =
6084                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6085                                log::info!(
6086                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6087                                    file,
6088                                    project_path.0,
6089                                    project_path.1
6090                                );
6091                                let buffer = project
6092                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6093                                    .await
6094                                    .unwrap();
6095                                client.buffers.insert(buffer.clone());
6096                                buffer
6097                            } else {
6098                                client
6099                                    .buffers
6100                                    .iter()
6101                                    .choose(&mut *rng.lock())
6102                                    .unwrap()
6103                                    .clone()
6104                            };
6105
6106                            if rng.lock().gen_bool(0.1) {
6107                                cx.update(|cx| {
6108                                    log::info!(
6109                                        "Host: dropping buffer {:?}",
6110                                        buffer.read(cx).file().unwrap().full_path(cx)
6111                                    );
6112                                    client.buffers.remove(&buffer);
6113                                    drop(buffer);
6114                                });
6115                            } else {
6116                                buffer.update(cx, |buffer, cx| {
6117                                    log::info!(
6118                                        "Host: updating buffer {:?} ({})",
6119                                        buffer.file().unwrap().full_path(cx),
6120                                        buffer.remote_id()
6121                                    );
6122
6123                                    if rng.lock().gen_bool(0.7) {
6124                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6125                                    } else {
6126                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6127                                    }
6128                                });
6129                            }
6130                        }
6131                        _ => loop {
6132                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6133                            let mut path = PathBuf::new();
6134                            path.push("/");
6135                            for _ in 0..path_component_count {
6136                                let letter = rng.lock().gen_range(b'a'..=b'z');
6137                                path.push(std::str::from_utf8(&[letter]).unwrap());
6138                            }
6139                            path.set_extension("rs");
6140                            let parent_path = path.parent().unwrap();
6141
6142                            log::info!("Host: creating file {:?}", path,);
6143
6144                            if fs.create_dir(&parent_path).await.is_ok()
6145                                && fs.create_file(&path, Default::default()).await.is_ok()
6146                            {
6147                                files.lock().push(path);
6148                                break;
6149                            } else {
6150                                log::info!("Host: cannot create file");
6151                            }
6152                        },
6153                    }
6154
6155                    cx.background().simulate_random_delay().await;
6156                }
6157
6158                Ok(())
6159            }
6160
6161            let result = simulate_host_internal(
6162                &mut self,
6163                project.clone(),
6164                files,
6165                op_start_signal,
6166                rng,
6167                &mut cx,
6168            )
6169            .await;
6170            log::info!("Host done");
6171            self.project = Some(project);
6172            (self, cx, result.err())
6173        }
6174
6175        pub async fn simulate_guest(
6176            mut self,
6177            guest_username: String,
6178            project: ModelHandle<Project>,
6179            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6180            rng: Arc<Mutex<StdRng>>,
6181            mut cx: TestAppContext,
6182        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6183            async fn simulate_guest_internal(
6184                client: &mut TestClient,
6185                guest_username: &str,
6186                project: ModelHandle<Project>,
6187                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6188                rng: Arc<Mutex<StdRng>>,
6189                cx: &mut TestAppContext,
6190            ) -> anyhow::Result<()> {
6191                while op_start_signal.next().await.is_some() {
6192                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6193                        let worktree = if let Some(worktree) =
6194                            project.read_with(cx, |project, cx| {
6195                                project
6196                                    .worktrees(&cx)
6197                                    .filter(|worktree| {
6198                                        let worktree = worktree.read(cx);
6199                                        worktree.is_visible()
6200                                            && worktree.entries(false).any(|e| e.is_file())
6201                                    })
6202                                    .choose(&mut *rng.lock())
6203                            }) {
6204                            worktree
6205                        } else {
6206                            cx.background().simulate_random_delay().await;
6207                            continue;
6208                        };
6209
6210                        let (worktree_root_name, project_path) =
6211                            worktree.read_with(cx, |worktree, _| {
6212                                let entry = worktree
6213                                    .entries(false)
6214                                    .filter(|e| e.is_file())
6215                                    .choose(&mut *rng.lock())
6216                                    .unwrap();
6217                                (
6218                                    worktree.root_name().to_string(),
6219                                    (worktree.id(), entry.path.clone()),
6220                                )
6221                            });
6222                        log::info!(
6223                            "{}: opening path {:?} in worktree {} ({})",
6224                            guest_username,
6225                            project_path.1,
6226                            project_path.0,
6227                            worktree_root_name,
6228                        );
6229                        let buffer = project
6230                            .update(cx, |project, cx| {
6231                                project.open_buffer(project_path.clone(), cx)
6232                            })
6233                            .await?;
6234                        log::info!(
6235                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6236                            guest_username,
6237                            project_path.1,
6238                            project_path.0,
6239                            worktree_root_name,
6240                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6241                        );
6242                        client.buffers.insert(buffer.clone());
6243                        buffer
6244                    } else {
6245                        client
6246                            .buffers
6247                            .iter()
6248                            .choose(&mut *rng.lock())
6249                            .unwrap()
6250                            .clone()
6251                    };
6252
6253                    let choice = rng.lock().gen_range(0..100);
6254                    match choice {
6255                        0..=9 => {
6256                            cx.update(|cx| {
6257                                log::info!(
6258                                    "{}: dropping buffer {:?}",
6259                                    guest_username,
6260                                    buffer.read(cx).file().unwrap().full_path(cx)
6261                                );
6262                                client.buffers.remove(&buffer);
6263                                drop(buffer);
6264                            });
6265                        }
6266                        10..=19 => {
6267                            let completions = project.update(cx, |project, cx| {
6268                                log::info!(
6269                                    "{}: requesting completions for buffer {} ({:?})",
6270                                    guest_username,
6271                                    buffer.read(cx).remote_id(),
6272                                    buffer.read(cx).file().unwrap().full_path(cx)
6273                                );
6274                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6275                                project.completions(&buffer, offset, cx)
6276                            });
6277                            let completions = cx.background().spawn(async move {
6278                                completions
6279                                    .await
6280                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6281                            });
6282                            if rng.lock().gen_bool(0.3) {
6283                                log::info!("{}: detaching completions request", guest_username);
6284                                cx.update(|cx| completions.detach_and_log_err(cx));
6285                            } else {
6286                                completions.await?;
6287                            }
6288                        }
6289                        20..=29 => {
6290                            let code_actions = project.update(cx, |project, cx| {
6291                                log::info!(
6292                                    "{}: requesting code actions for buffer {} ({:?})",
6293                                    guest_username,
6294                                    buffer.read(cx).remote_id(),
6295                                    buffer.read(cx).file().unwrap().full_path(cx)
6296                                );
6297                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6298                                project.code_actions(&buffer, range, cx)
6299                            });
6300                            let code_actions = cx.background().spawn(async move {
6301                                code_actions.await.map_err(|err| {
6302                                    anyhow!("code actions request failed: {:?}", err)
6303                                })
6304                            });
6305                            if rng.lock().gen_bool(0.3) {
6306                                log::info!("{}: detaching code actions request", guest_username);
6307                                cx.update(|cx| code_actions.detach_and_log_err(cx));
6308                            } else {
6309                                code_actions.await?;
6310                            }
6311                        }
6312                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6313                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6314                                log::info!(
6315                                    "{}: saving buffer {} ({:?})",
6316                                    guest_username,
6317                                    buffer.remote_id(),
6318                                    buffer.file().unwrap().full_path(cx)
6319                                );
6320                                (buffer.version(), buffer.save(cx))
6321                            });
6322                            let save = cx.background().spawn(async move {
6323                                let (saved_version, _) = save
6324                                    .await
6325                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6326                                assert!(saved_version.observed_all(&requested_version));
6327                                Ok::<_, anyhow::Error>(())
6328                            });
6329                            if rng.lock().gen_bool(0.3) {
6330                                log::info!("{}: detaching save request", guest_username);
6331                                cx.update(|cx| save.detach_and_log_err(cx));
6332                            } else {
6333                                save.await?;
6334                            }
6335                        }
6336                        40..=44 => {
6337                            let prepare_rename = project.update(cx, |project, cx| {
6338                                log::info!(
6339                                    "{}: preparing rename for buffer {} ({:?})",
6340                                    guest_username,
6341                                    buffer.read(cx).remote_id(),
6342                                    buffer.read(cx).file().unwrap().full_path(cx)
6343                                );
6344                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6345                                project.prepare_rename(buffer, offset, cx)
6346                            });
6347                            let prepare_rename = cx.background().spawn(async move {
6348                                prepare_rename.await.map_err(|err| {
6349                                    anyhow!("prepare rename request failed: {:?}", err)
6350                                })
6351                            });
6352                            if rng.lock().gen_bool(0.3) {
6353                                log::info!("{}: detaching prepare rename request", guest_username);
6354                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6355                            } else {
6356                                prepare_rename.await?;
6357                            }
6358                        }
6359                        45..=49 => {
6360                            let definitions = project.update(cx, |project, cx| {
6361                                log::info!(
6362                                    "{}: requesting definitions for buffer {} ({:?})",
6363                                    guest_username,
6364                                    buffer.read(cx).remote_id(),
6365                                    buffer.read(cx).file().unwrap().full_path(cx)
6366                                );
6367                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6368                                project.definition(&buffer, offset, cx)
6369                            });
6370                            let definitions = cx.background().spawn(async move {
6371                                definitions
6372                                    .await
6373                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6374                            });
6375                            if rng.lock().gen_bool(0.3) {
6376                                log::info!("{}: detaching definitions request", guest_username);
6377                                cx.update(|cx| definitions.detach_and_log_err(cx));
6378                            } else {
6379                                client
6380                                    .buffers
6381                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6382                            }
6383                        }
6384                        50..=54 => {
6385                            let highlights = project.update(cx, |project, cx| {
6386                                log::info!(
6387                                    "{}: requesting highlights for buffer {} ({:?})",
6388                                    guest_username,
6389                                    buffer.read(cx).remote_id(),
6390                                    buffer.read(cx).file().unwrap().full_path(cx)
6391                                );
6392                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6393                                project.document_highlights(&buffer, offset, cx)
6394                            });
6395                            let highlights = cx.background().spawn(async move {
6396                                highlights
6397                                    .await
6398                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6399                            });
6400                            if rng.lock().gen_bool(0.3) {
6401                                log::info!("{}: detaching highlights request", guest_username);
6402                                cx.update(|cx| highlights.detach_and_log_err(cx));
6403                            } else {
6404                                highlights.await?;
6405                            }
6406                        }
6407                        55..=59 => {
6408                            let search = project.update(cx, |project, cx| {
6409                                let query = rng.lock().gen_range('a'..='z');
6410                                log::info!("{}: project-wide search {:?}", guest_username, query);
6411                                project.search(SearchQuery::text(query, false, false), cx)
6412                            });
6413                            let search = cx.background().spawn(async move {
6414                                search
6415                                    .await
6416                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
6417                            });
6418                            if rng.lock().gen_bool(0.3) {
6419                                log::info!("{}: detaching search request", guest_username);
6420                                cx.update(|cx| search.detach_and_log_err(cx));
6421                            } else {
6422                                client.buffers.extend(search.await?.into_keys());
6423                            }
6424                        }
6425                        _ => {
6426                            buffer.update(cx, |buffer, cx| {
6427                                log::info!(
6428                                    "{}: updating buffer {} ({:?})",
6429                                    guest_username,
6430                                    buffer.remote_id(),
6431                                    buffer.file().unwrap().full_path(cx)
6432                                );
6433                                if rng.lock().gen_bool(0.7) {
6434                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6435                                } else {
6436                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6437                                }
6438                            });
6439                        }
6440                    }
6441                    cx.background().simulate_random_delay().await;
6442                }
6443                Ok(())
6444            }
6445
6446            let result = simulate_guest_internal(
6447                &mut self,
6448                &guest_username,
6449                project.clone(),
6450                op_start_signal,
6451                rng,
6452                &mut cx,
6453            )
6454            .await;
6455            log::info!("{}: done", guest_username);
6456
6457            self.project = Some(project);
6458            (self, cx, result.err())
6459        }
6460    }
6461
6462    impl Drop for TestClient {
6463        fn drop(&mut self) {
6464            self.client.tear_down();
6465        }
6466    }
6467
6468    impl Executor for Arc<gpui::executor::Background> {
6469        type Sleep = gpui::executor::Timer;
6470
6471        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6472            self.spawn(future).detach();
6473        }
6474
6475        fn sleep(&self, duration: Duration) -> Self::Sleep {
6476            self.as_ref().timer(duration)
6477        }
6478    }
6479
6480    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6481        channel
6482            .messages()
6483            .cursor::<()>()
6484            .map(|m| {
6485                (
6486                    m.sender.github_login.clone(),
6487                    m.body.clone(),
6488                    m.is_pending(),
6489                )
6490            })
6491            .collect()
6492    }
6493
6494    struct EmptyView;
6495
6496    impl gpui::Entity for EmptyView {
6497        type Event = ();
6498    }
6499
6500    impl gpui::View for EmptyView {
6501        fn ui_name() -> &'static str {
6502            "empty view"
6503        }
6504
6505        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6506            gpui::Element::boxed(gpui::elements::Empty)
6507        }
6508    }
6509}