rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::{IntoResponse, Response},
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::{HashMap, HashSet};
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::Arc,
  40    time::Duration,
  41};
  42use store::{Store, Worktree};
  43use time::OffsetDateTime;
  44use tokio::{
  45    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  46    time::Sleep,
  47};
  48use tower::ServiceBuilder;
  49use tracing::{info_span, instrument, Instrument};
  50
  51type MessageHandler =
  52    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  53
  54pub struct Server {
  55    peer: Arc<Peer>,
  56    store: RwLock<Store>,
  57    app_state: Arc<AppState>,
  58    handlers: HashMap<TypeId, MessageHandler>,
  59    notifications: Option<mpsc::UnboundedSender<()>>,
  60}
  61
  62pub trait Executor: Send + Clone {
  63    type Sleep: Send + Future;
  64    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  65    fn sleep(&self, duration: Duration) -> Self::Sleep;
  66}
  67
  68#[derive(Clone)]
  69pub struct RealExecutor;
  70
  71const MESSAGE_COUNT_PER_PAGE: usize = 100;
  72const MAX_MESSAGE_LEN: usize = 1024;
  73
  74struct StoreReadGuard<'a> {
  75    guard: RwLockReadGuard<'a, Store>,
  76    _not_send: PhantomData<Rc<()>>,
  77}
  78
  79struct StoreWriteGuard<'a> {
  80    guard: RwLockWriteGuard<'a, Store>,
  81    _not_send: PhantomData<Rc<()>>,
  82}
  83
  84impl Server {
  85    pub fn new(
  86        app_state: Arc<AppState>,
  87        notifications: Option<mpsc::UnboundedSender<()>>,
  88    ) -> Arc<Self> {
  89        let mut server = Self {
  90            peer: Peer::new(),
  91            app_state,
  92            store: Default::default(),
  93            handlers: Default::default(),
  94            notifications,
  95        };
  96
  97        server
  98            .add_request_handler(Server::ping)
  99            .add_request_handler(Server::register_project)
 100            .add_message_handler(Server::unregister_project)
 101            .add_request_handler(Server::share_project)
 102            .add_message_handler(Server::unshare_project)
 103            .add_sync_request_handler(Server::join_project)
 104            .add_message_handler(Server::leave_project)
 105            .add_request_handler(Server::register_worktree)
 106            .add_message_handler(Server::unregister_worktree)
 107            .add_request_handler(Server::update_worktree)
 108            .add_message_handler(Server::start_language_server)
 109            .add_message_handler(Server::update_language_server)
 110            .add_message_handler(Server::update_diagnostic_summary)
 111            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 112            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 113            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 114            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 115            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 116            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 117            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 118            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 119            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 120            .add_request_handler(
 121                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 122            )
 123            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 124            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 125            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 126            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 127            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 128            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 129            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 130            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 131            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 132            .add_request_handler(Server::update_buffer)
 133            .add_message_handler(Server::update_buffer_file)
 134            .add_message_handler(Server::buffer_reloaded)
 135            .add_message_handler(Server::buffer_saved)
 136            .add_request_handler(Server::save_buffer)
 137            .add_request_handler(Server::get_channels)
 138            .add_request_handler(Server::get_users)
 139            .add_request_handler(Server::join_channel)
 140            .add_message_handler(Server::leave_channel)
 141            .add_request_handler(Server::send_channel_message)
 142            .add_request_handler(Server::follow)
 143            .add_message_handler(Server::unfollow)
 144            .add_message_handler(Server::update_followers)
 145            .add_request_handler(Server::get_channel_messages);
 146
 147        Arc::new(server)
 148    }
 149
 150    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 151    where
 152        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 153        Fut: 'static + Send + Future<Output = Result<()>>,
 154        M: EnvelopedMessage,
 155    {
 156        let prev_handler = self.handlers.insert(
 157            TypeId::of::<M>(),
 158            Box::new(move |server, envelope| {
 159                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 160                let span = info_span!(
 161                    "handle message",
 162                    payload_type = envelope.payload_type_name(),
 163                    payload = format!("{:?}", envelope.payload).as_str(),
 164                );
 165                let future = (handler)(server, *envelope);
 166                async move {
 167                    if let Err(error) = future.await {
 168                        tracing::error!(%error, "error handling message");
 169                    }
 170                }
 171                .instrument(span)
 172                .boxed()
 173            }),
 174        );
 175        if prev_handler.is_some() {
 176            panic!("registered a handler for the same message twice");
 177        }
 178        self
 179    }
 180
 181    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 182    where
 183        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 184        Fut: 'static + Send + Future<Output = Result<M::Response>>,
 185        M: RequestMessage,
 186    {
 187        self.add_message_handler(move |server, envelope| {
 188            let receipt = envelope.receipt();
 189            let response = (handler)(server.clone(), envelope);
 190            async move {
 191                match response.await {
 192                    Ok(response) => {
 193                        server.peer.respond(receipt, response)?;
 194                        Ok(())
 195                    }
 196                    Err(error) => {
 197                        server.peer.respond_with_error(
 198                            receipt,
 199                            proto::Error {
 200                                message: error.to_string(),
 201                            },
 202                        )?;
 203                        Err(error)
 204                    }
 205                }
 206            }
 207        })
 208    }
 209
 210    /// Handle a request while holding a lock to the store. This is useful when we're registering
 211    /// a connection but we want to respond on the connection before anybody else can send on it.
 212    fn add_sync_request_handler<F, M>(&mut self, handler: F) -> &mut Self
 213    where
 214        F: 'static
 215            + Send
 216            + Sync
 217            + Fn(Arc<Self>, &mut Store, TypedEnvelope<M>) -> Result<M::Response>,
 218        M: RequestMessage,
 219    {
 220        let handler = Arc::new(handler);
 221        self.add_message_handler(move |server, envelope| {
 222            let receipt = envelope.receipt();
 223            let handler = handler.clone();
 224            async move {
 225                let mut store = server.state_mut().await;
 226                let response = (handler)(server.clone(), &mut *store, envelope);
 227                match response {
 228                    Ok(response) => {
 229                        server.peer.respond(receipt, response)?;
 230                        Ok(())
 231                    }
 232                    Err(error) => {
 233                        server.peer.respond_with_error(
 234                            receipt,
 235                            proto::Error {
 236                                message: error.to_string(),
 237                            },
 238                        )?;
 239                        Err(error)
 240                    }
 241                }
 242            }
 243        })
 244    }
 245
 246    pub fn handle_connection<E: Executor>(
 247        self: &Arc<Self>,
 248        connection: Connection,
 249        address: String,
 250        user_id: UserId,
 251        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 252        executor: E,
 253    ) -> impl Future<Output = ()> {
 254        let mut this = self.clone();
 255        let span = info_span!("handle connection", %user_id, %address);
 256        async move {
 257            let (connection_id, handle_io, mut incoming_rx) = this
 258                .peer
 259                .add_connection(connection, {
 260                    let executor = executor.clone();
 261                    move |duration| {
 262                        let timer = executor.sleep(duration);
 263                        async move {
 264                            timer.await;
 265                        }
 266                    }
 267                })
 268                .await;
 269
 270            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 271
 272            if let Some(send_connection_id) = send_connection_id.as_mut() {
 273                let _ = send_connection_id.send(connection_id).await;
 274            }
 275
 276            {
 277                let mut state = this.state_mut().await;
 278                state.add_connection(connection_id, user_id);
 279                this.update_contacts_for_users(&*state, &[user_id]);
 280            }
 281
 282            let handle_io = handle_io.fuse();
 283            futures::pin_mut!(handle_io);
 284            loop {
 285                let next_message = incoming_rx.next().fuse();
 286                futures::pin_mut!(next_message);
 287                futures::select_biased! {
 288                    result = handle_io => {
 289                        if let Err(error) = result {
 290                            tracing::error!(%error, "error handling I/O");
 291                        }
 292                        break;
 293                    }
 294                    message = next_message => {
 295                        if let Some(message) = message {
 296                            let type_name = message.payload_type_name();
 297                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 298                            async {
 299                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 300                                    let notifications = this.notifications.clone();
 301                                    let is_background = message.is_background();
 302                                    let handle_message = (handler)(this.clone(), message);
 303                                    let handle_message = async move {
 304                                        handle_message.await;
 305                                        if let Some(mut notifications) = notifications {
 306                                            let _ = notifications.send(()).await;
 307                                        }
 308                                    };
 309                                    if is_background {
 310                                        executor.spawn_detached(handle_message);
 311                                    } else {
 312                                        handle_message.await;
 313                                    }
 314                                } else {
 315                                    tracing::error!("no message handler");
 316                                }
 317                            }.instrument(span).await;
 318                        } else {
 319                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 320                            break;
 321                        }
 322                    }
 323                }
 324            }
 325
 326            if let Err(error) = this.sign_out(connection_id).await {
 327                tracing::error!(%error, "error signing out");
 328            }
 329        }.instrument(span)
 330    }
 331
 332    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 333        self.peer.disconnect(connection_id);
 334        let mut state = self.state_mut().await;
 335        let removed_connection = state.remove_connection(connection_id)?;
 336
 337        for (project_id, project) in removed_connection.hosted_projects {
 338            if let Some(share) = project.share {
 339                broadcast(
 340                    connection_id,
 341                    share.guests.keys().copied().collect(),
 342                    |conn_id| {
 343                        self.peer
 344                            .send(conn_id, proto::UnshareProject { project_id })
 345                    },
 346                );
 347            }
 348        }
 349
 350        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 351            broadcast(connection_id, peer_ids, |conn_id| {
 352                self.peer.send(
 353                    conn_id,
 354                    proto::RemoveProjectCollaborator {
 355                        project_id,
 356                        peer_id: connection_id.0,
 357                    },
 358                )
 359            });
 360        }
 361
 362        self.update_contacts_for_users(&*state, removed_connection.contact_ids.iter());
 363        Ok(())
 364    }
 365
 366    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> Result<proto::Ack> {
 367        Ok(proto::Ack {})
 368    }
 369
 370    async fn register_project(
 371        self: Arc<Server>,
 372        request: TypedEnvelope<proto::RegisterProject>,
 373    ) -> Result<proto::RegisterProjectResponse> {
 374        let project_id = {
 375            let mut state = self.state_mut().await;
 376            let user_id = state.user_id_for_connection(request.sender_id)?;
 377            state.register_project(request.sender_id, user_id)
 378        };
 379        Ok(proto::RegisterProjectResponse { project_id })
 380    }
 381
 382    async fn unregister_project(
 383        self: Arc<Server>,
 384        request: TypedEnvelope<proto::UnregisterProject>,
 385    ) -> Result<()> {
 386        let mut state = self.state_mut().await;
 387        let project = state.unregister_project(request.payload.project_id, request.sender_id)?;
 388        self.update_contacts_for_users(&*state, &project.authorized_user_ids());
 389        Ok(())
 390    }
 391
 392    async fn share_project(
 393        self: Arc<Server>,
 394        request: TypedEnvelope<proto::ShareProject>,
 395    ) -> Result<proto::Ack> {
 396        let mut state = self.state_mut().await;
 397        let project = state.share_project(request.payload.project_id, request.sender_id)?;
 398        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 399        Ok(proto::Ack {})
 400    }
 401
 402    async fn unshare_project(
 403        self: Arc<Server>,
 404        request: TypedEnvelope<proto::UnshareProject>,
 405    ) -> Result<()> {
 406        let project_id = request.payload.project_id;
 407        let mut state = self.state_mut().await;
 408        let project = state.unshare_project(project_id, request.sender_id)?;
 409        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 410            self.peer
 411                .send(conn_id, proto::UnshareProject { project_id })
 412        });
 413        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 414        Ok(())
 415    }
 416
 417    fn join_project(
 418        self: Arc<Server>,
 419        state: &mut Store,
 420        request: TypedEnvelope<proto::JoinProject>,
 421    ) -> Result<proto::JoinProjectResponse> {
 422        let project_id = request.payload.project_id;
 423
 424        let user_id = state.user_id_for_connection(request.sender_id)?;
 425        let (response, connection_ids, contact_user_ids) = state
 426            .join_project(request.sender_id, user_id, project_id)
 427            .and_then(|joined| {
 428                let share = joined.project.share()?;
 429                let peer_count = share.guests.len();
 430                let mut collaborators = Vec::with_capacity(peer_count);
 431                collaborators.push(proto::Collaborator {
 432                    peer_id: joined.project.host_connection_id.0,
 433                    replica_id: 0,
 434                    user_id: joined.project.host_user_id.to_proto(),
 435                });
 436                let worktrees = share
 437                    .worktrees
 438                    .iter()
 439                    .filter_map(|(id, shared_worktree)| {
 440                        let worktree = joined.project.worktrees.get(&id)?;
 441                        Some(proto::Worktree {
 442                            id: *id,
 443                            root_name: worktree.root_name.clone(),
 444                            entries: shared_worktree.entries.values().cloned().collect(),
 445                            diagnostic_summaries: shared_worktree
 446                                .diagnostic_summaries
 447                                .values()
 448                                .cloned()
 449                                .collect(),
 450                            visible: worktree.visible,
 451                            scan_id: shared_worktree.scan_id,
 452                        })
 453                    })
 454                    .collect();
 455                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 456                    if *peer_conn_id != request.sender_id {
 457                        collaborators.push(proto::Collaborator {
 458                            peer_id: peer_conn_id.0,
 459                            replica_id: *peer_replica_id as u32,
 460                            user_id: peer_user_id.to_proto(),
 461                        });
 462                    }
 463                }
 464                let response = proto::JoinProjectResponse {
 465                    worktrees,
 466                    replica_id: joined.replica_id as u32,
 467                    collaborators,
 468                    language_servers: joined.project.language_servers.clone(),
 469                };
 470                let connection_ids = joined.project.connection_ids();
 471                let contact_user_ids = joined.project.authorized_user_ids();
 472                Ok((response, connection_ids, contact_user_ids))
 473            })?;
 474
 475        broadcast(request.sender_id, connection_ids, |conn_id| {
 476            self.peer.send(
 477                conn_id,
 478                proto::AddProjectCollaborator {
 479                    project_id,
 480                    collaborator: Some(proto::Collaborator {
 481                        peer_id: request.sender_id.0,
 482                        replica_id: response.replica_id,
 483                        user_id: user_id.to_proto(),
 484                    }),
 485                },
 486            )
 487        });
 488        self.update_contacts_for_users(state, &contact_user_ids);
 489        Ok(response)
 490    }
 491
 492    async fn leave_project(
 493        self: Arc<Server>,
 494        request: TypedEnvelope<proto::LeaveProject>,
 495    ) -> Result<()> {
 496        let sender_id = request.sender_id;
 497        let project_id = request.payload.project_id;
 498        let mut state = self.state_mut().await;
 499        let worktree = state.leave_project(sender_id, project_id)?;
 500        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 501            self.peer.send(
 502                conn_id,
 503                proto::RemoveProjectCollaborator {
 504                    project_id,
 505                    peer_id: sender_id.0,
 506                },
 507            )
 508        });
 509        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 510        Ok(())
 511    }
 512
 513    async fn register_worktree(
 514        self: Arc<Server>,
 515        request: TypedEnvelope<proto::RegisterWorktree>,
 516    ) -> Result<proto::Ack> {
 517        let mut contact_user_ids = HashSet::default();
 518        for github_login in &request.payload.authorized_logins {
 519            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 520            contact_user_ids.insert(contact_user_id);
 521        }
 522
 523        let mut state = self.state_mut().await;
 524        let host_user_id = state.user_id_for_connection(request.sender_id)?;
 525        contact_user_ids.insert(host_user_id);
 526
 527        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 528        let guest_connection_ids = state
 529            .read_project(request.payload.project_id, request.sender_id)?
 530            .guest_connection_ids();
 531        state.register_worktree(
 532            request.payload.project_id,
 533            request.payload.worktree_id,
 534            request.sender_id,
 535            Worktree {
 536                authorized_user_ids: contact_user_ids.clone(),
 537                root_name: request.payload.root_name.clone(),
 538                visible: request.payload.visible,
 539            },
 540        )?;
 541
 542        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 543            self.peer
 544                .forward_send(request.sender_id, connection_id, request.payload.clone())
 545        });
 546        self.update_contacts_for_users(&*state, &contact_user_ids);
 547        Ok(proto::Ack {})
 548    }
 549
 550    async fn unregister_worktree(
 551        self: Arc<Server>,
 552        request: TypedEnvelope<proto::UnregisterWorktree>,
 553    ) -> Result<()> {
 554        let project_id = request.payload.project_id;
 555        let worktree_id = request.payload.worktree_id;
 556        let mut state = self.state_mut().await;
 557        let (worktree, guest_connection_ids) =
 558            state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 559        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 560            self.peer.send(
 561                conn_id,
 562                proto::UnregisterWorktree {
 563                    project_id,
 564                    worktree_id,
 565                },
 566            )
 567        });
 568        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 569        Ok(())
 570    }
 571
 572    async fn update_worktree(
 573        self: Arc<Server>,
 574        request: TypedEnvelope<proto::UpdateWorktree>,
 575    ) -> Result<proto::Ack> {
 576        let connection_ids = self.state_mut().await.update_worktree(
 577            request.sender_id,
 578            request.payload.project_id,
 579            request.payload.worktree_id,
 580            &request.payload.removed_entries,
 581            &request.payload.updated_entries,
 582            request.payload.scan_id,
 583        )?;
 584
 585        broadcast(request.sender_id, connection_ids, |connection_id| {
 586            self.peer
 587                .forward_send(request.sender_id, connection_id, request.payload.clone())
 588        });
 589
 590        Ok(proto::Ack {})
 591    }
 592
 593    async fn update_diagnostic_summary(
 594        self: Arc<Server>,
 595        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 596    ) -> Result<()> {
 597        let summary = request
 598            .payload
 599            .summary
 600            .clone()
 601            .ok_or_else(|| anyhow!("invalid summary"))?;
 602        let receiver_ids = self.state_mut().await.update_diagnostic_summary(
 603            request.payload.project_id,
 604            request.payload.worktree_id,
 605            request.sender_id,
 606            summary,
 607        )?;
 608
 609        broadcast(request.sender_id, receiver_ids, |connection_id| {
 610            self.peer
 611                .forward_send(request.sender_id, connection_id, request.payload.clone())
 612        });
 613        Ok(())
 614    }
 615
 616    async fn start_language_server(
 617        self: Arc<Server>,
 618        request: TypedEnvelope<proto::StartLanguageServer>,
 619    ) -> Result<()> {
 620        let receiver_ids = self.state_mut().await.start_language_server(
 621            request.payload.project_id,
 622            request.sender_id,
 623            request
 624                .payload
 625                .server
 626                .clone()
 627                .ok_or_else(|| anyhow!("invalid language server"))?,
 628        )?;
 629        broadcast(request.sender_id, receiver_ids, |connection_id| {
 630            self.peer
 631                .forward_send(request.sender_id, connection_id, request.payload.clone())
 632        });
 633        Ok(())
 634    }
 635
 636    async fn update_language_server(
 637        self: Arc<Server>,
 638        request: TypedEnvelope<proto::UpdateLanguageServer>,
 639    ) -> Result<()> {
 640        let receiver_ids = self
 641            .state()
 642            .await
 643            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 644        broadcast(request.sender_id, receiver_ids, |connection_id| {
 645            self.peer
 646                .forward_send(request.sender_id, connection_id, request.payload.clone())
 647        });
 648        Ok(())
 649    }
 650
 651    async fn forward_project_request<T>(
 652        self: Arc<Server>,
 653        request: TypedEnvelope<T>,
 654    ) -> Result<T::Response>
 655    where
 656        T: EntityMessage + RequestMessage,
 657    {
 658        let host_connection_id = self
 659            .state()
 660            .await
 661            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 662            .host_connection_id;
 663        Ok(self
 664            .peer
 665            .forward_request(request.sender_id, host_connection_id, request.payload)
 666            .await?)
 667    }
 668
 669    async fn save_buffer(
 670        self: Arc<Server>,
 671        request: TypedEnvelope<proto::SaveBuffer>,
 672    ) -> Result<proto::BufferSaved> {
 673        let host = self
 674            .state()
 675            .await
 676            .read_project(request.payload.project_id, request.sender_id)?
 677            .host_connection_id;
 678        let response = self
 679            .peer
 680            .forward_request(request.sender_id, host, request.payload.clone())
 681            .await?;
 682
 683        let mut guests = self
 684            .state()
 685            .await
 686            .read_project(request.payload.project_id, request.sender_id)?
 687            .connection_ids();
 688        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 689        broadcast(host, guests, |conn_id| {
 690            self.peer.forward_send(host, conn_id, response.clone())
 691        });
 692
 693        Ok(response)
 694    }
 695
 696    async fn update_buffer(
 697        self: Arc<Server>,
 698        request: TypedEnvelope<proto::UpdateBuffer>,
 699    ) -> Result<proto::Ack> {
 700        let receiver_ids = self
 701            .state()
 702            .await
 703            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 704        broadcast(request.sender_id, receiver_ids, |connection_id| {
 705            self.peer
 706                .forward_send(request.sender_id, connection_id, request.payload.clone())
 707        });
 708        Ok(proto::Ack {})
 709    }
 710
 711    async fn update_buffer_file(
 712        self: Arc<Server>,
 713        request: TypedEnvelope<proto::UpdateBufferFile>,
 714    ) -> Result<()> {
 715        let receiver_ids = self
 716            .state()
 717            .await
 718            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 719        broadcast(request.sender_id, receiver_ids, |connection_id| {
 720            self.peer
 721                .forward_send(request.sender_id, connection_id, request.payload.clone())
 722        });
 723        Ok(())
 724    }
 725
 726    async fn buffer_reloaded(
 727        self: Arc<Server>,
 728        request: TypedEnvelope<proto::BufferReloaded>,
 729    ) -> Result<()> {
 730        let receiver_ids = self
 731            .state()
 732            .await
 733            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 734        broadcast(request.sender_id, receiver_ids, |connection_id| {
 735            self.peer
 736                .forward_send(request.sender_id, connection_id, request.payload.clone())
 737        });
 738        Ok(())
 739    }
 740
 741    async fn buffer_saved(
 742        self: Arc<Server>,
 743        request: TypedEnvelope<proto::BufferSaved>,
 744    ) -> Result<()> {
 745        let receiver_ids = self
 746            .state()
 747            .await
 748            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 749        broadcast(request.sender_id, receiver_ids, |connection_id| {
 750            self.peer
 751                .forward_send(request.sender_id, connection_id, request.payload.clone())
 752        });
 753        Ok(())
 754    }
 755
 756    async fn follow(
 757        self: Arc<Self>,
 758        request: TypedEnvelope<proto::Follow>,
 759    ) -> Result<proto::FollowResponse> {
 760        let leader_id = ConnectionId(request.payload.leader_id);
 761        let follower_id = request.sender_id;
 762        if !self
 763            .state()
 764            .await
 765            .project_connection_ids(request.payload.project_id, follower_id)?
 766            .contains(&leader_id)
 767        {
 768            Err(anyhow!("no such peer"))?;
 769        }
 770        let mut response = self
 771            .peer
 772            .forward_request(request.sender_id, leader_id, request.payload)
 773            .await?;
 774        response
 775            .views
 776            .retain(|view| view.leader_id != Some(follower_id.0));
 777        Ok(response)
 778    }
 779
 780    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 781        let leader_id = ConnectionId(request.payload.leader_id);
 782        if !self
 783            .state()
 784            .await
 785            .project_connection_ids(request.payload.project_id, request.sender_id)?
 786            .contains(&leader_id)
 787        {
 788            Err(anyhow!("no such peer"))?;
 789        }
 790        self.peer
 791            .forward_send(request.sender_id, leader_id, request.payload)?;
 792        Ok(())
 793    }
 794
 795    async fn update_followers(
 796        self: Arc<Self>,
 797        request: TypedEnvelope<proto::UpdateFollowers>,
 798    ) -> Result<()> {
 799        let connection_ids = self
 800            .state()
 801            .await
 802            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 803        let leader_id = request
 804            .payload
 805            .variant
 806            .as_ref()
 807            .and_then(|variant| match variant {
 808                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 809                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 810                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 811            });
 812        for follower_id in &request.payload.follower_ids {
 813            let follower_id = ConnectionId(*follower_id);
 814            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 815                self.peer
 816                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 817            }
 818        }
 819        Ok(())
 820    }
 821
 822    async fn get_channels(
 823        self: Arc<Server>,
 824        request: TypedEnvelope<proto::GetChannels>,
 825    ) -> Result<proto::GetChannelsResponse> {
 826        let user_id = self
 827            .state()
 828            .await
 829            .user_id_for_connection(request.sender_id)?;
 830        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 831        Ok(proto::GetChannelsResponse {
 832            channels: channels
 833                .into_iter()
 834                .map(|chan| proto::Channel {
 835                    id: chan.id.to_proto(),
 836                    name: chan.name,
 837                })
 838                .collect(),
 839        })
 840    }
 841
 842    async fn get_users(
 843        self: Arc<Server>,
 844        request: TypedEnvelope<proto::GetUsers>,
 845    ) -> Result<proto::GetUsersResponse> {
 846        let user_ids = request
 847            .payload
 848            .user_ids
 849            .into_iter()
 850            .map(UserId::from_proto)
 851            .collect();
 852        let users = self
 853            .app_state
 854            .db
 855            .get_users_by_ids(user_ids)
 856            .await?
 857            .into_iter()
 858            .map(|user| proto::User {
 859                id: user.id.to_proto(),
 860                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 861                github_login: user.github_login,
 862            })
 863            .collect();
 864        Ok(proto::GetUsersResponse { users })
 865    }
 866
 867    #[instrument(skip(self, state, user_ids))]
 868    fn update_contacts_for_users<'a>(
 869        self: &Arc<Self>,
 870        state: &Store,
 871        user_ids: impl IntoIterator<Item = &'a UserId>,
 872    ) {
 873        for user_id in user_ids {
 874            let contacts = state.contacts_for_user(*user_id);
 875            for connection_id in state.connection_ids_for_user(*user_id) {
 876                self.peer
 877                    .send(
 878                        connection_id,
 879                        proto::UpdateContacts {
 880                            contacts: contacts.clone(),
 881                        },
 882                    )
 883                    .trace_err();
 884            }
 885        }
 886    }
 887
 888    async fn join_channel(
 889        self: Arc<Self>,
 890        request: TypedEnvelope<proto::JoinChannel>,
 891    ) -> Result<proto::JoinChannelResponse> {
 892        let user_id = self
 893            .state()
 894            .await
 895            .user_id_for_connection(request.sender_id)?;
 896        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 897        if !self
 898            .app_state
 899            .db
 900            .can_user_access_channel(user_id, channel_id)
 901            .await?
 902        {
 903            Err(anyhow!("access denied"))?;
 904        }
 905
 906        self.state_mut()
 907            .await
 908            .join_channel(request.sender_id, channel_id);
 909        let messages = self
 910            .app_state
 911            .db
 912            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 913            .await?
 914            .into_iter()
 915            .map(|msg| proto::ChannelMessage {
 916                id: msg.id.to_proto(),
 917                body: msg.body,
 918                timestamp: msg.sent_at.unix_timestamp() as u64,
 919                sender_id: msg.sender_id.to_proto(),
 920                nonce: Some(msg.nonce.as_u128().into()),
 921            })
 922            .collect::<Vec<_>>();
 923        Ok(proto::JoinChannelResponse {
 924            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 925            messages,
 926        })
 927    }
 928
 929    async fn leave_channel(
 930        self: Arc<Self>,
 931        request: TypedEnvelope<proto::LeaveChannel>,
 932    ) -> Result<()> {
 933        let user_id = self
 934            .state()
 935            .await
 936            .user_id_for_connection(request.sender_id)?;
 937        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 938        if !self
 939            .app_state
 940            .db
 941            .can_user_access_channel(user_id, channel_id)
 942            .await?
 943        {
 944            Err(anyhow!("access denied"))?;
 945        }
 946
 947        self.state_mut()
 948            .await
 949            .leave_channel(request.sender_id, channel_id);
 950
 951        Ok(())
 952    }
 953
 954    async fn send_channel_message(
 955        self: Arc<Self>,
 956        request: TypedEnvelope<proto::SendChannelMessage>,
 957    ) -> Result<proto::SendChannelMessageResponse> {
 958        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 959        let user_id;
 960        let connection_ids;
 961        {
 962            let state = self.state().await;
 963            user_id = state.user_id_for_connection(request.sender_id)?;
 964            connection_ids = state.channel_connection_ids(channel_id)?;
 965        }
 966
 967        // Validate the message body.
 968        let body = request.payload.body.trim().to_string();
 969        if body.len() > MAX_MESSAGE_LEN {
 970            return Err(anyhow!("message is too long"))?;
 971        }
 972        if body.is_empty() {
 973            return Err(anyhow!("message can't be blank"))?;
 974        }
 975
 976        let timestamp = OffsetDateTime::now_utc();
 977        let nonce = request
 978            .payload
 979            .nonce
 980            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 981
 982        let message_id = self
 983            .app_state
 984            .db
 985            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 986            .await?
 987            .to_proto();
 988        let message = proto::ChannelMessage {
 989            sender_id: user_id.to_proto(),
 990            id: message_id,
 991            body,
 992            timestamp: timestamp.unix_timestamp() as u64,
 993            nonce: Some(nonce),
 994        };
 995        broadcast(request.sender_id, connection_ids, |conn_id| {
 996            self.peer.send(
 997                conn_id,
 998                proto::ChannelMessageSent {
 999                    channel_id: channel_id.to_proto(),
1000                    message: Some(message.clone()),
1001                },
1002            )
1003        });
1004        Ok(proto::SendChannelMessageResponse {
1005            message: Some(message),
1006        })
1007    }
1008
1009    async fn get_channel_messages(
1010        self: Arc<Self>,
1011        request: TypedEnvelope<proto::GetChannelMessages>,
1012    ) -> Result<proto::GetChannelMessagesResponse> {
1013        let user_id = self
1014            .state()
1015            .await
1016            .user_id_for_connection(request.sender_id)?;
1017        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1018        if !self
1019            .app_state
1020            .db
1021            .can_user_access_channel(user_id, channel_id)
1022            .await?
1023        {
1024            Err(anyhow!("access denied"))?;
1025        }
1026
1027        let messages = self
1028            .app_state
1029            .db
1030            .get_channel_messages(
1031                channel_id,
1032                MESSAGE_COUNT_PER_PAGE,
1033                Some(MessageId::from_proto(request.payload.before_message_id)),
1034            )
1035            .await?
1036            .into_iter()
1037            .map(|msg| proto::ChannelMessage {
1038                id: msg.id.to_proto(),
1039                body: msg.body,
1040                timestamp: msg.sent_at.unix_timestamp() as u64,
1041                sender_id: msg.sender_id.to_proto(),
1042                nonce: Some(msg.nonce.as_u128().into()),
1043            })
1044            .collect::<Vec<_>>();
1045
1046        Ok(proto::GetChannelMessagesResponse {
1047            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1048            messages,
1049        })
1050    }
1051
1052    async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1053        #[cfg(test)]
1054        tokio::task::yield_now().await;
1055        let guard = self.store.read().await;
1056        #[cfg(test)]
1057        tokio::task::yield_now().await;
1058        StoreReadGuard {
1059            guard,
1060            _not_send: PhantomData,
1061        }
1062    }
1063
1064    async fn state_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1065        #[cfg(test)]
1066        tokio::task::yield_now().await;
1067        let guard = self.store.write().await;
1068        #[cfg(test)]
1069        tokio::task::yield_now().await;
1070        StoreWriteGuard {
1071            guard,
1072            _not_send: PhantomData,
1073        }
1074    }
1075}
1076
1077impl<'a> Deref for StoreReadGuard<'a> {
1078    type Target = Store;
1079
1080    fn deref(&self) -> &Self::Target {
1081        &*self.guard
1082    }
1083}
1084
1085impl<'a> Deref for StoreWriteGuard<'a> {
1086    type Target = Store;
1087
1088    fn deref(&self) -> &Self::Target {
1089        &*self.guard
1090    }
1091}
1092
1093impl<'a> DerefMut for StoreWriteGuard<'a> {
1094    fn deref_mut(&mut self) -> &mut Self::Target {
1095        &mut *self.guard
1096    }
1097}
1098
1099impl<'a> Drop for StoreWriteGuard<'a> {
1100    fn drop(&mut self) {
1101        #[cfg(test)]
1102        self.check_invariants();
1103
1104        let metrics = self.metrics();
1105        tracing::info!(
1106            connections = metrics.connections,
1107            registered_projects = metrics.registered_projects,
1108            shared_projects = metrics.shared_projects,
1109            "metrics"
1110        );
1111    }
1112}
1113
1114impl Executor for RealExecutor {
1115    type Sleep = Sleep;
1116
1117    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1118        tokio::task::spawn(future);
1119    }
1120
1121    fn sleep(&self, duration: Duration) -> Self::Sleep {
1122        tokio::time::sleep(duration)
1123    }
1124}
1125
1126#[instrument(skip(f))]
1127fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1128where
1129    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1130{
1131    for receiver_id in receiver_ids {
1132        if receiver_id != sender_id {
1133            f(receiver_id).trace_err();
1134        }
1135    }
1136}
1137
1138lazy_static! {
1139    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1140}
1141
1142pub struct ProtocolVersion(u32);
1143
1144impl Header for ProtocolVersion {
1145    fn name() -> &'static HeaderName {
1146        &ZED_PROTOCOL_VERSION
1147    }
1148
1149    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1150    where
1151        Self: Sized,
1152        I: Iterator<Item = &'i axum::http::HeaderValue>,
1153    {
1154        let version = values
1155            .next()
1156            .ok_or_else(|| axum::headers::Error::invalid())?
1157            .to_str()
1158            .map_err(|_| axum::headers::Error::invalid())?
1159            .parse()
1160            .map_err(|_| axum::headers::Error::invalid())?;
1161        Ok(Self(version))
1162    }
1163
1164    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1165        values.extend([self.0.to_string().parse().unwrap()]);
1166    }
1167}
1168
1169pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1170    let server = Server::new(app_state.clone(), None);
1171    Router::new()
1172        .route("/rpc", get(handle_websocket_request))
1173        .layer(
1174            ServiceBuilder::new()
1175                .layer(Extension(app_state))
1176                .layer(middleware::from_fn(auth::validate_header))
1177                .layer(Extension(server)),
1178        )
1179}
1180
1181pub async fn handle_websocket_request(
1182    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1183    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1184    Extension(server): Extension<Arc<Server>>,
1185    Extension(user_id): Extension<UserId>,
1186    ws: WebSocketUpgrade,
1187) -> Response {
1188    if protocol_version != rpc::PROTOCOL_VERSION {
1189        return (
1190            StatusCode::UPGRADE_REQUIRED,
1191            "client must be upgraded".to_string(),
1192        )
1193            .into_response();
1194    }
1195    let socket_address = socket_address.to_string();
1196    ws.on_upgrade(move |socket| {
1197        let socket = socket
1198            .map_ok(to_tungstenite_message)
1199            .err_into()
1200            .with(|message| async move { Ok(to_axum_message(message)) });
1201        let connection = Connection::new(Box::pin(socket));
1202        server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
1203    })
1204}
1205
1206fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1207    match message {
1208        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1209        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1210        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1211        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1212        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1213            code: frame.code.into(),
1214            reason: frame.reason,
1215        })),
1216    }
1217}
1218
1219fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1220    match message {
1221        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1222        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1223        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1224        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1225        AxumMessage::Close(frame) => {
1226            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1227                code: frame.code.into(),
1228                reason: frame.reason,
1229            }))
1230        }
1231    }
1232}
1233
1234pub trait ResultExt {
1235    type Ok;
1236
1237    fn trace_err(self) -> Option<Self::Ok>;
1238}
1239
1240impl<T, E> ResultExt for Result<T, E>
1241where
1242    E: std::fmt::Debug,
1243{
1244    type Ok = T;
1245
1246    fn trace_err(self) -> Option<T> {
1247        match self {
1248            Ok(value) => Some(value),
1249            Err(error) => {
1250                tracing::error!("{:?}", error);
1251                None
1252            }
1253        }
1254    }
1255}
1256
1257#[cfg(test)]
1258mod tests {
1259    use super::*;
1260    use crate::{
1261        db::{tests::TestDb, UserId},
1262        AppState,
1263    };
1264    use ::rpc::Peer;
1265    use client::{
1266        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1267        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1268    };
1269    use collections::BTreeMap;
1270    use editor::{
1271        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1272        ToOffset, ToggleCodeActions, Undo,
1273    };
1274    use gpui::{
1275        executor::{self, Deterministic},
1276        geometry::vector::vec2f,
1277        ModelHandle, TestAppContext, ViewHandle,
1278    };
1279    use language::{
1280        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1281        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1282    };
1283    use lsp::{self, FakeLanguageServer};
1284    use parking_lot::Mutex;
1285    use project::{
1286        fs::{FakeFs, Fs as _},
1287        search::SearchQuery,
1288        worktree::WorktreeHandle,
1289        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1290    };
1291    use rand::prelude::*;
1292    use rpc::PeerId;
1293    use serde_json::json;
1294    use settings::Settings;
1295    use sqlx::types::time::OffsetDateTime;
1296    use std::{
1297        env,
1298        ops::Deref,
1299        path::{Path, PathBuf},
1300        rc::Rc,
1301        sync::{
1302            atomic::{AtomicBool, Ordering::SeqCst},
1303            Arc,
1304        },
1305        time::Duration,
1306    };
1307    use theme::ThemeRegistry;
1308    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1309
1310    #[cfg(test)]
1311    #[ctor::ctor]
1312    fn init_logger() {
1313        if std::env::var("RUST_LOG").is_ok() {
1314            env_logger::init();
1315        }
1316    }
1317
1318    #[gpui::test(iterations = 10)]
1319    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1320        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1321        let lang_registry = Arc::new(LanguageRegistry::test());
1322        let fs = FakeFs::new(cx_a.background());
1323        cx_a.foreground().forbid_parking();
1324
1325        // Connect to a server as 2 clients.
1326        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1327        let client_a = server.create_client(cx_a, "user_a").await;
1328        let client_b = server.create_client(cx_b, "user_b").await;
1329
1330        // Share a project as client A
1331        fs.insert_tree(
1332            "/a",
1333            json!({
1334                ".zed.toml": r#"collaborators = ["user_b"]"#,
1335                "a.txt": "a-contents",
1336                "b.txt": "b-contents",
1337            }),
1338        )
1339        .await;
1340        let project_a = cx_a.update(|cx| {
1341            Project::local(
1342                client_a.clone(),
1343                client_a.user_store.clone(),
1344                lang_registry.clone(),
1345                fs.clone(),
1346                cx,
1347            )
1348        });
1349        let (worktree_a, _) = project_a
1350            .update(cx_a, |p, cx| {
1351                p.find_or_create_local_worktree("/a", true, cx)
1352            })
1353            .await
1354            .unwrap();
1355        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1356        worktree_a
1357            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1358            .await;
1359        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1360        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1361
1362        // Join that project as client B
1363        let project_b = Project::remote(
1364            project_id,
1365            client_b.clone(),
1366            client_b.user_store.clone(),
1367            lang_registry.clone(),
1368            fs.clone(),
1369            &mut cx_b.to_async(),
1370        )
1371        .await
1372        .unwrap();
1373
1374        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1375            assert_eq!(
1376                project
1377                    .collaborators()
1378                    .get(&client_a.peer_id)
1379                    .unwrap()
1380                    .user
1381                    .github_login,
1382                "user_a"
1383            );
1384            project.replica_id()
1385        });
1386        project_a
1387            .condition(&cx_a, |tree, _| {
1388                tree.collaborators()
1389                    .get(&client_b.peer_id)
1390                    .map_or(false, |collaborator| {
1391                        collaborator.replica_id == replica_id_b
1392                            && collaborator.user.github_login == "user_b"
1393                    })
1394            })
1395            .await;
1396
1397        // Open the same file as client B and client A.
1398        let buffer_b = project_b
1399            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1400            .await
1401            .unwrap();
1402        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1403        project_a.read_with(cx_a, |project, cx| {
1404            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1405        });
1406        let buffer_a = project_a
1407            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1408            .await
1409            .unwrap();
1410
1411        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1412
1413        // TODO
1414        // // Create a selection set as client B and see that selection set as client A.
1415        // buffer_a
1416        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1417        //     .await;
1418
1419        // Edit the buffer as client B and see that edit as client A.
1420        editor_b.update(cx_b, |editor, cx| {
1421            editor.handle_input(&Input("ok, ".into()), cx)
1422        });
1423        buffer_a
1424            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1425            .await;
1426
1427        // TODO
1428        // // Remove the selection set as client B, see those selections disappear as client A.
1429        cx_b.update(move |_| drop(editor_b));
1430        // buffer_a
1431        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1432        //     .await;
1433
1434        // Dropping the client B's project removes client B from client A's collaborators.
1435        cx_b.update(move |_| drop(project_b));
1436        project_a
1437            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1438            .await;
1439    }
1440
1441    #[gpui::test(iterations = 10)]
1442    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1443        let lang_registry = Arc::new(LanguageRegistry::test());
1444        let fs = FakeFs::new(cx_a.background());
1445        cx_a.foreground().forbid_parking();
1446
1447        // Connect to a server as 2 clients.
1448        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1449        let client_a = server.create_client(cx_a, "user_a").await;
1450        let client_b = server.create_client(cx_b, "user_b").await;
1451
1452        // Share a project as client A
1453        fs.insert_tree(
1454            "/a",
1455            json!({
1456                ".zed.toml": r#"collaborators = ["user_b"]"#,
1457                "a.txt": "a-contents",
1458                "b.txt": "b-contents",
1459            }),
1460        )
1461        .await;
1462        let project_a = cx_a.update(|cx| {
1463            Project::local(
1464                client_a.clone(),
1465                client_a.user_store.clone(),
1466                lang_registry.clone(),
1467                fs.clone(),
1468                cx,
1469            )
1470        });
1471        let (worktree_a, _) = project_a
1472            .update(cx_a, |p, cx| {
1473                p.find_or_create_local_worktree("/a", true, cx)
1474            })
1475            .await
1476            .unwrap();
1477        worktree_a
1478            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1479            .await;
1480        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1481        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1482        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1483        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1484
1485        // Join that project as client B
1486        let project_b = Project::remote(
1487            project_id,
1488            client_b.clone(),
1489            client_b.user_store.clone(),
1490            lang_registry.clone(),
1491            fs.clone(),
1492            &mut cx_b.to_async(),
1493        )
1494        .await
1495        .unwrap();
1496        project_b
1497            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1498            .await
1499            .unwrap();
1500
1501        // Unshare the project as client A
1502        project_a.update(cx_a, |project, cx| project.unshare(cx));
1503        project_b
1504            .condition(cx_b, |project, _| project.is_read_only())
1505            .await;
1506        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1507        cx_b.update(|_| {
1508            drop(project_b);
1509        });
1510
1511        // Share the project again and ensure guests can still join.
1512        project_a
1513            .update(cx_a, |project, cx| project.share(cx))
1514            .await
1515            .unwrap();
1516        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1517
1518        let project_b2 = Project::remote(
1519            project_id,
1520            client_b.clone(),
1521            client_b.user_store.clone(),
1522            lang_registry.clone(),
1523            fs.clone(),
1524            &mut cx_b.to_async(),
1525        )
1526        .await
1527        .unwrap();
1528        project_b2
1529            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1530            .await
1531            .unwrap();
1532    }
1533
1534    #[gpui::test(iterations = 10)]
1535    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1536        let lang_registry = Arc::new(LanguageRegistry::test());
1537        let fs = FakeFs::new(cx_a.background());
1538        cx_a.foreground().forbid_parking();
1539
1540        // Connect to a server as 2 clients.
1541        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1542        let client_a = server.create_client(cx_a, "user_a").await;
1543        let client_b = server.create_client(cx_b, "user_b").await;
1544
1545        // Share a project as client A
1546        fs.insert_tree(
1547            "/a",
1548            json!({
1549                ".zed.toml": r#"collaborators = ["user_b"]"#,
1550                "a.txt": "a-contents",
1551                "b.txt": "b-contents",
1552            }),
1553        )
1554        .await;
1555        let project_a = cx_a.update(|cx| {
1556            Project::local(
1557                client_a.clone(),
1558                client_a.user_store.clone(),
1559                lang_registry.clone(),
1560                fs.clone(),
1561                cx,
1562            )
1563        });
1564        let (worktree_a, _) = project_a
1565            .update(cx_a, |p, cx| {
1566                p.find_or_create_local_worktree("/a", true, cx)
1567            })
1568            .await
1569            .unwrap();
1570        worktree_a
1571            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1572            .await;
1573        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1574        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1575        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1576        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1577
1578        // Join that project as client B
1579        let project_b = Project::remote(
1580            project_id,
1581            client_b.clone(),
1582            client_b.user_store.clone(),
1583            lang_registry.clone(),
1584            fs.clone(),
1585            &mut cx_b.to_async(),
1586        )
1587        .await
1588        .unwrap();
1589        project_b
1590            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1591            .await
1592            .unwrap();
1593
1594        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1595        server.disconnect_client(client_a.current_user_id(cx_a));
1596        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1597        project_a
1598            .condition(cx_a, |project, _| project.collaborators().is_empty())
1599            .await;
1600        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1601        project_b
1602            .condition(cx_b, |project, _| project.is_read_only())
1603            .await;
1604        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1605        cx_b.update(|_| {
1606            drop(project_b);
1607        });
1608
1609        // Await reconnection
1610        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1611
1612        // Share the project again and ensure guests can still join.
1613        project_a
1614            .update(cx_a, |project, cx| project.share(cx))
1615            .await
1616            .unwrap();
1617        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1618
1619        let project_b2 = Project::remote(
1620            project_id,
1621            client_b.clone(),
1622            client_b.user_store.clone(),
1623            lang_registry.clone(),
1624            fs.clone(),
1625            &mut cx_b.to_async(),
1626        )
1627        .await
1628        .unwrap();
1629        project_b2
1630            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1631            .await
1632            .unwrap();
1633    }
1634
1635    #[gpui::test(iterations = 10)]
1636    async fn test_propagate_saves_and_fs_changes(
1637        cx_a: &mut TestAppContext,
1638        cx_b: &mut TestAppContext,
1639        cx_c: &mut TestAppContext,
1640    ) {
1641        let lang_registry = Arc::new(LanguageRegistry::test());
1642        let fs = FakeFs::new(cx_a.background());
1643        cx_a.foreground().forbid_parking();
1644
1645        // Connect to a server as 3 clients.
1646        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1647        let client_a = server.create_client(cx_a, "user_a").await;
1648        let client_b = server.create_client(cx_b, "user_b").await;
1649        let client_c = server.create_client(cx_c, "user_c").await;
1650
1651        // Share a worktree as client A.
1652        fs.insert_tree(
1653            "/a",
1654            json!({
1655                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1656                "file1": "",
1657                "file2": ""
1658            }),
1659        )
1660        .await;
1661        let project_a = cx_a.update(|cx| {
1662            Project::local(
1663                client_a.clone(),
1664                client_a.user_store.clone(),
1665                lang_registry.clone(),
1666                fs.clone(),
1667                cx,
1668            )
1669        });
1670        let (worktree_a, _) = project_a
1671            .update(cx_a, |p, cx| {
1672                p.find_or_create_local_worktree("/a", true, cx)
1673            })
1674            .await
1675            .unwrap();
1676        worktree_a
1677            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1678            .await;
1679        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1680        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1681        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1682
1683        // Join that worktree as clients B and C.
1684        let project_b = Project::remote(
1685            project_id,
1686            client_b.clone(),
1687            client_b.user_store.clone(),
1688            lang_registry.clone(),
1689            fs.clone(),
1690            &mut cx_b.to_async(),
1691        )
1692        .await
1693        .unwrap();
1694        let project_c = Project::remote(
1695            project_id,
1696            client_c.clone(),
1697            client_c.user_store.clone(),
1698            lang_registry.clone(),
1699            fs.clone(),
1700            &mut cx_c.to_async(),
1701        )
1702        .await
1703        .unwrap();
1704        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1705        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1706
1707        // Open and edit a buffer as both guests B and C.
1708        let buffer_b = project_b
1709            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1710            .await
1711            .unwrap();
1712        let buffer_c = project_c
1713            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1714            .await
1715            .unwrap();
1716        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1717        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1718
1719        // Open and edit that buffer as the host.
1720        let buffer_a = project_a
1721            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1722            .await
1723            .unwrap();
1724
1725        buffer_a
1726            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1727            .await;
1728        buffer_a.update(cx_a, |buf, cx| {
1729            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1730        });
1731
1732        // Wait for edits to propagate
1733        buffer_a
1734            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1735            .await;
1736        buffer_b
1737            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1738            .await;
1739        buffer_c
1740            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1741            .await;
1742
1743        // Edit the buffer as the host and concurrently save as guest B.
1744        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1745        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
1746        save_b.await.unwrap();
1747        assert_eq!(
1748            fs.load("/a/file1".as_ref()).await.unwrap(),
1749            "hi-a, i-am-c, i-am-b, i-am-a"
1750        );
1751        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1752        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1753        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1754
1755        worktree_a.flush_fs_events(cx_a).await;
1756
1757        // Make changes on host's file system, see those changes on guest worktrees.
1758        fs.rename(
1759            "/a/file1".as_ref(),
1760            "/a/file1-renamed".as_ref(),
1761            Default::default(),
1762        )
1763        .await
1764        .unwrap();
1765
1766        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1767            .await
1768            .unwrap();
1769        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1770
1771        worktree_a
1772            .condition(&cx_a, |tree, _| {
1773                tree.paths()
1774                    .map(|p| p.to_string_lossy())
1775                    .collect::<Vec<_>>()
1776                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1777            })
1778            .await;
1779        worktree_b
1780            .condition(&cx_b, |tree, _| {
1781                tree.paths()
1782                    .map(|p| p.to_string_lossy())
1783                    .collect::<Vec<_>>()
1784                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1785            })
1786            .await;
1787        worktree_c
1788            .condition(&cx_c, |tree, _| {
1789                tree.paths()
1790                    .map(|p| p.to_string_lossy())
1791                    .collect::<Vec<_>>()
1792                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1793            })
1794            .await;
1795
1796        // Ensure buffer files are updated as well.
1797        buffer_a
1798            .condition(&cx_a, |buf, _| {
1799                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1800            })
1801            .await;
1802        buffer_b
1803            .condition(&cx_b, |buf, _| {
1804                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1805            })
1806            .await;
1807        buffer_c
1808            .condition(&cx_c, |buf, _| {
1809                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1810            })
1811            .await;
1812    }
1813
1814    #[gpui::test(iterations = 10)]
1815    async fn test_fs_operations(
1816        executor: Arc<Deterministic>,
1817        cx_a: &mut TestAppContext,
1818        cx_b: &mut TestAppContext,
1819    ) {
1820        executor.forbid_parking();
1821        let fs = FakeFs::new(cx_a.background());
1822
1823        // Connect to a server as 2 clients.
1824        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1825        let mut client_a = server.create_client(cx_a, "user_a").await;
1826        let mut client_b = server.create_client(cx_b, "user_b").await;
1827
1828        // Share a project as client A
1829        fs.insert_tree(
1830            "/dir",
1831            json!({
1832                ".zed.toml": r#"collaborators = ["user_b"]"#,
1833                "a.txt": "a-contents",
1834                "b.txt": "b-contents",
1835            }),
1836        )
1837        .await;
1838
1839        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
1840        let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
1841        project_a
1842            .update(cx_a, |project, cx| project.share(cx))
1843            .await
1844            .unwrap();
1845
1846        let project_b = client_b.build_remote_project(project_id, cx_b).await;
1847
1848        let worktree_a =
1849            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
1850        let worktree_b =
1851            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
1852
1853        let entry = project_b
1854            .update(cx_b, |project, cx| {
1855                project
1856                    .create_entry((worktree_id, "c.txt"), false, cx)
1857                    .unwrap()
1858            })
1859            .await
1860            .unwrap();
1861        worktree_a.read_with(cx_a, |worktree, _| {
1862            assert_eq!(
1863                worktree
1864                    .paths()
1865                    .map(|p| p.to_string_lossy())
1866                    .collect::<Vec<_>>(),
1867                [".zed.toml", "a.txt", "b.txt", "c.txt"]
1868            );
1869        });
1870        worktree_b.read_with(cx_b, |worktree, _| {
1871            assert_eq!(
1872                worktree
1873                    .paths()
1874                    .map(|p| p.to_string_lossy())
1875                    .collect::<Vec<_>>(),
1876                [".zed.toml", "a.txt", "b.txt", "c.txt"]
1877            );
1878        });
1879
1880        project_b
1881            .update(cx_b, |project, cx| {
1882                project.rename_entry(entry.id, Path::new("d.txt"), cx)
1883            })
1884            .unwrap()
1885            .await
1886            .unwrap();
1887        worktree_a.read_with(cx_a, |worktree, _| {
1888            assert_eq!(
1889                worktree
1890                    .paths()
1891                    .map(|p| p.to_string_lossy())
1892                    .collect::<Vec<_>>(),
1893                [".zed.toml", "a.txt", "b.txt", "d.txt"]
1894            );
1895        });
1896        worktree_b.read_with(cx_b, |worktree, _| {
1897            assert_eq!(
1898                worktree
1899                    .paths()
1900                    .map(|p| p.to_string_lossy())
1901                    .collect::<Vec<_>>(),
1902                [".zed.toml", "a.txt", "b.txt", "d.txt"]
1903            );
1904        });
1905
1906        let dir_entry = project_b
1907            .update(cx_b, |project, cx| {
1908                project
1909                    .create_entry((worktree_id, "DIR"), true, cx)
1910                    .unwrap()
1911            })
1912            .await
1913            .unwrap();
1914        worktree_a.read_with(cx_a, |worktree, _| {
1915            assert_eq!(
1916                worktree
1917                    .paths()
1918                    .map(|p| p.to_string_lossy())
1919                    .collect::<Vec<_>>(),
1920                [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
1921            );
1922        });
1923        worktree_b.read_with(cx_b, |worktree, _| {
1924            assert_eq!(
1925                worktree
1926                    .paths()
1927                    .map(|p| p.to_string_lossy())
1928                    .collect::<Vec<_>>(),
1929                [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
1930            );
1931        });
1932
1933        project_b
1934            .update(cx_b, |project, cx| {
1935                project.delete_entry(dir_entry.id, cx).unwrap()
1936            })
1937            .await
1938            .unwrap();
1939        worktree_a.read_with(cx_a, |worktree, _| {
1940            assert_eq!(
1941                worktree
1942                    .paths()
1943                    .map(|p| p.to_string_lossy())
1944                    .collect::<Vec<_>>(),
1945                [".zed.toml", "a.txt", "b.txt", "d.txt"]
1946            );
1947        });
1948        worktree_b.read_with(cx_b, |worktree, _| {
1949            assert_eq!(
1950                worktree
1951                    .paths()
1952                    .map(|p| p.to_string_lossy())
1953                    .collect::<Vec<_>>(),
1954                [".zed.toml", "a.txt", "b.txt", "d.txt"]
1955            );
1956        });
1957
1958        project_b
1959            .update(cx_b, |project, cx| {
1960                project.delete_entry(entry.id, cx).unwrap()
1961            })
1962            .await
1963            .unwrap();
1964        worktree_a.read_with(cx_a, |worktree, _| {
1965            assert_eq!(
1966                worktree
1967                    .paths()
1968                    .map(|p| p.to_string_lossy())
1969                    .collect::<Vec<_>>(),
1970                [".zed.toml", "a.txt", "b.txt"]
1971            );
1972        });
1973        worktree_b.read_with(cx_b, |worktree, _| {
1974            assert_eq!(
1975                worktree
1976                    .paths()
1977                    .map(|p| p.to_string_lossy())
1978                    .collect::<Vec<_>>(),
1979                [".zed.toml", "a.txt", "b.txt"]
1980            );
1981        });
1982    }
1983
1984    #[gpui::test(iterations = 10)]
1985    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1986        cx_a.foreground().forbid_parking();
1987        let lang_registry = Arc::new(LanguageRegistry::test());
1988        let fs = FakeFs::new(cx_a.background());
1989
1990        // Connect to a server as 2 clients.
1991        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1992        let client_a = server.create_client(cx_a, "user_a").await;
1993        let client_b = server.create_client(cx_b, "user_b").await;
1994
1995        // Share a project as client A
1996        fs.insert_tree(
1997            "/dir",
1998            json!({
1999                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2000                "a.txt": "a-contents",
2001            }),
2002        )
2003        .await;
2004
2005        let project_a = cx_a.update(|cx| {
2006            Project::local(
2007                client_a.clone(),
2008                client_a.user_store.clone(),
2009                lang_registry.clone(),
2010                fs.clone(),
2011                cx,
2012            )
2013        });
2014        let (worktree_a, _) = project_a
2015            .update(cx_a, |p, cx| {
2016                p.find_or_create_local_worktree("/dir", true, cx)
2017            })
2018            .await
2019            .unwrap();
2020        worktree_a
2021            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2022            .await;
2023        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2024        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2025        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2026
2027        // Join that project as client B
2028        let project_b = Project::remote(
2029            project_id,
2030            client_b.clone(),
2031            client_b.user_store.clone(),
2032            lang_registry.clone(),
2033            fs.clone(),
2034            &mut cx_b.to_async(),
2035        )
2036        .await
2037        .unwrap();
2038
2039        // Open a buffer as client B
2040        let buffer_b = project_b
2041            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2042            .await
2043            .unwrap();
2044
2045        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2046        buffer_b.read_with(cx_b, |buf, _| {
2047            assert!(buf.is_dirty());
2048            assert!(!buf.has_conflict());
2049        });
2050
2051        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2052        buffer_b
2053            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2054            .await;
2055        buffer_b.read_with(cx_b, |buf, _| {
2056            assert!(!buf.has_conflict());
2057        });
2058
2059        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2060        buffer_b.read_with(cx_b, |buf, _| {
2061            assert!(buf.is_dirty());
2062            assert!(!buf.has_conflict());
2063        });
2064    }
2065
2066    #[gpui::test(iterations = 10)]
2067    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2068        cx_a.foreground().forbid_parking();
2069        let lang_registry = Arc::new(LanguageRegistry::test());
2070        let fs = FakeFs::new(cx_a.background());
2071
2072        // Connect to a server as 2 clients.
2073        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2074        let client_a = server.create_client(cx_a, "user_a").await;
2075        let client_b = server.create_client(cx_b, "user_b").await;
2076
2077        // Share a project as client A
2078        fs.insert_tree(
2079            "/dir",
2080            json!({
2081                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2082                "a.txt": "a-contents",
2083            }),
2084        )
2085        .await;
2086
2087        let project_a = cx_a.update(|cx| {
2088            Project::local(
2089                client_a.clone(),
2090                client_a.user_store.clone(),
2091                lang_registry.clone(),
2092                fs.clone(),
2093                cx,
2094            )
2095        });
2096        let (worktree_a, _) = project_a
2097            .update(cx_a, |p, cx| {
2098                p.find_or_create_local_worktree("/dir", true, cx)
2099            })
2100            .await
2101            .unwrap();
2102        worktree_a
2103            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2104            .await;
2105        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2106        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2107        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2108
2109        // Join that project as client B
2110        let project_b = Project::remote(
2111            project_id,
2112            client_b.clone(),
2113            client_b.user_store.clone(),
2114            lang_registry.clone(),
2115            fs.clone(),
2116            &mut cx_b.to_async(),
2117        )
2118        .await
2119        .unwrap();
2120        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2121
2122        // Open a buffer as client B
2123        let buffer_b = project_b
2124            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2125            .await
2126            .unwrap();
2127        buffer_b.read_with(cx_b, |buf, _| {
2128            assert!(!buf.is_dirty());
2129            assert!(!buf.has_conflict());
2130        });
2131
2132        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2133            .await
2134            .unwrap();
2135        buffer_b
2136            .condition(&cx_b, |buf, _| {
2137                buf.text() == "new contents" && !buf.is_dirty()
2138            })
2139            .await;
2140        buffer_b.read_with(cx_b, |buf, _| {
2141            assert!(!buf.has_conflict());
2142        });
2143    }
2144
2145    #[gpui::test(iterations = 10)]
2146    async fn test_editing_while_guest_opens_buffer(
2147        cx_a: &mut TestAppContext,
2148        cx_b: &mut TestAppContext,
2149    ) {
2150        cx_a.foreground().forbid_parking();
2151        let lang_registry = Arc::new(LanguageRegistry::test());
2152        let fs = FakeFs::new(cx_a.background());
2153
2154        // Connect to a server as 2 clients.
2155        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2156        let client_a = server.create_client(cx_a, "user_a").await;
2157        let client_b = server.create_client(cx_b, "user_b").await;
2158
2159        // Share a project as client A
2160        fs.insert_tree(
2161            "/dir",
2162            json!({
2163                ".zed.toml": r#"collaborators = ["user_b"]"#,
2164                "a.txt": "a-contents",
2165            }),
2166        )
2167        .await;
2168        let project_a = cx_a.update(|cx| {
2169            Project::local(
2170                client_a.clone(),
2171                client_a.user_store.clone(),
2172                lang_registry.clone(),
2173                fs.clone(),
2174                cx,
2175            )
2176        });
2177        let (worktree_a, _) = project_a
2178            .update(cx_a, |p, cx| {
2179                p.find_or_create_local_worktree("/dir", true, cx)
2180            })
2181            .await
2182            .unwrap();
2183        worktree_a
2184            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2185            .await;
2186        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2187        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2188        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2189
2190        // Join that project as client B
2191        let project_b = Project::remote(
2192            project_id,
2193            client_b.clone(),
2194            client_b.user_store.clone(),
2195            lang_registry.clone(),
2196            fs.clone(),
2197            &mut cx_b.to_async(),
2198        )
2199        .await
2200        .unwrap();
2201
2202        // Open a buffer as client A
2203        let buffer_a = project_a
2204            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2205            .await
2206            .unwrap();
2207
2208        // Start opening the same buffer as client B
2209        let buffer_b = cx_b
2210            .background()
2211            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2212
2213        // Edit the buffer as client A while client B is still opening it.
2214        cx_b.background().simulate_random_delay().await;
2215        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2216        cx_b.background().simulate_random_delay().await;
2217        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2218
2219        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2220        let buffer_b = buffer_b.await.unwrap();
2221        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2222    }
2223
2224    #[gpui::test(iterations = 10)]
2225    async fn test_leaving_worktree_while_opening_buffer(
2226        cx_a: &mut TestAppContext,
2227        cx_b: &mut TestAppContext,
2228    ) {
2229        cx_a.foreground().forbid_parking();
2230        let lang_registry = Arc::new(LanguageRegistry::test());
2231        let fs = FakeFs::new(cx_a.background());
2232
2233        // Connect to a server as 2 clients.
2234        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2235        let client_a = server.create_client(cx_a, "user_a").await;
2236        let client_b = server.create_client(cx_b, "user_b").await;
2237
2238        // Share a project as client A
2239        fs.insert_tree(
2240            "/dir",
2241            json!({
2242                ".zed.toml": r#"collaborators = ["user_b"]"#,
2243                "a.txt": "a-contents",
2244            }),
2245        )
2246        .await;
2247        let project_a = cx_a.update(|cx| {
2248            Project::local(
2249                client_a.clone(),
2250                client_a.user_store.clone(),
2251                lang_registry.clone(),
2252                fs.clone(),
2253                cx,
2254            )
2255        });
2256        let (worktree_a, _) = project_a
2257            .update(cx_a, |p, cx| {
2258                p.find_or_create_local_worktree("/dir", true, cx)
2259            })
2260            .await
2261            .unwrap();
2262        worktree_a
2263            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2264            .await;
2265        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2266        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2267        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2268
2269        // Join that project as client B
2270        let project_b = Project::remote(
2271            project_id,
2272            client_b.clone(),
2273            client_b.user_store.clone(),
2274            lang_registry.clone(),
2275            fs.clone(),
2276            &mut cx_b.to_async(),
2277        )
2278        .await
2279        .unwrap();
2280
2281        // See that a guest has joined as client A.
2282        project_a
2283            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2284            .await;
2285
2286        // Begin opening a buffer as client B, but leave the project before the open completes.
2287        let buffer_b = cx_b
2288            .background()
2289            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2290        cx_b.update(|_| drop(project_b));
2291        drop(buffer_b);
2292
2293        // See that the guest has left.
2294        project_a
2295            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2296            .await;
2297    }
2298
2299    #[gpui::test(iterations = 10)]
2300    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2301        cx_a.foreground().forbid_parking();
2302        let lang_registry = Arc::new(LanguageRegistry::test());
2303        let fs = FakeFs::new(cx_a.background());
2304
2305        // Connect to a server as 2 clients.
2306        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2307        let client_a = server.create_client(cx_a, "user_a").await;
2308        let client_b = server.create_client(cx_b, "user_b").await;
2309
2310        // Share a project as client A
2311        fs.insert_tree(
2312            "/a",
2313            json!({
2314                ".zed.toml": r#"collaborators = ["user_b"]"#,
2315                "a.txt": "a-contents",
2316                "b.txt": "b-contents",
2317            }),
2318        )
2319        .await;
2320        let project_a = cx_a.update(|cx| {
2321            Project::local(
2322                client_a.clone(),
2323                client_a.user_store.clone(),
2324                lang_registry.clone(),
2325                fs.clone(),
2326                cx,
2327            )
2328        });
2329        let (worktree_a, _) = project_a
2330            .update(cx_a, |p, cx| {
2331                p.find_or_create_local_worktree("/a", true, cx)
2332            })
2333            .await
2334            .unwrap();
2335        worktree_a
2336            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2337            .await;
2338        let project_id = project_a
2339            .update(cx_a, |project, _| project.next_remote_id())
2340            .await;
2341        project_a
2342            .update(cx_a, |project, cx| project.share(cx))
2343            .await
2344            .unwrap();
2345
2346        // Join that project as client B
2347        let _project_b = Project::remote(
2348            project_id,
2349            client_b.clone(),
2350            client_b.user_store.clone(),
2351            lang_registry.clone(),
2352            fs.clone(),
2353            &mut cx_b.to_async(),
2354        )
2355        .await
2356        .unwrap();
2357
2358        // Client A sees that a guest has joined.
2359        project_a
2360            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2361            .await;
2362
2363        // Drop client B's connection and ensure client A observes client B leaving the project.
2364        client_b.disconnect(&cx_b.to_async()).unwrap();
2365        project_a
2366            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2367            .await;
2368
2369        // Rejoin the project as client B
2370        let _project_b = Project::remote(
2371            project_id,
2372            client_b.clone(),
2373            client_b.user_store.clone(),
2374            lang_registry.clone(),
2375            fs.clone(),
2376            &mut cx_b.to_async(),
2377        )
2378        .await
2379        .unwrap();
2380
2381        // Client A sees that a guest has re-joined.
2382        project_a
2383            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2384            .await;
2385
2386        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2387        client_b.wait_for_current_user(cx_b).await;
2388        server.disconnect_client(client_b.current_user_id(cx_b));
2389        cx_a.foreground().advance_clock(Duration::from_secs(3));
2390        project_a
2391            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2392            .await;
2393    }
2394
2395    #[gpui::test(iterations = 10)]
2396    async fn test_collaborating_with_diagnostics(
2397        cx_a: &mut TestAppContext,
2398        cx_b: &mut TestAppContext,
2399    ) {
2400        cx_a.foreground().forbid_parking();
2401        let lang_registry = Arc::new(LanguageRegistry::test());
2402        let fs = FakeFs::new(cx_a.background());
2403
2404        // Set up a fake language server.
2405        let mut language = Language::new(
2406            LanguageConfig {
2407                name: "Rust".into(),
2408                path_suffixes: vec!["rs".to_string()],
2409                ..Default::default()
2410            },
2411            Some(tree_sitter_rust::language()),
2412        );
2413        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2414        lang_registry.add(Arc::new(language));
2415
2416        // Connect to a server as 2 clients.
2417        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2418        let client_a = server.create_client(cx_a, "user_a").await;
2419        let client_b = server.create_client(cx_b, "user_b").await;
2420
2421        // Share a project as client A
2422        fs.insert_tree(
2423            "/a",
2424            json!({
2425                ".zed.toml": r#"collaborators = ["user_b"]"#,
2426                "a.rs": "let one = two",
2427                "other.rs": "",
2428            }),
2429        )
2430        .await;
2431        let project_a = cx_a.update(|cx| {
2432            Project::local(
2433                client_a.clone(),
2434                client_a.user_store.clone(),
2435                lang_registry.clone(),
2436                fs.clone(),
2437                cx,
2438            )
2439        });
2440        let (worktree_a, _) = project_a
2441            .update(cx_a, |p, cx| {
2442                p.find_or_create_local_worktree("/a", true, cx)
2443            })
2444            .await
2445            .unwrap();
2446        worktree_a
2447            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2448            .await;
2449        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2450        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2451        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2452
2453        // Cause the language server to start.
2454        let _ = cx_a
2455            .background()
2456            .spawn(project_a.update(cx_a, |project, cx| {
2457                project.open_buffer(
2458                    ProjectPath {
2459                        worktree_id,
2460                        path: Path::new("other.rs").into(),
2461                    },
2462                    cx,
2463                )
2464            }))
2465            .await
2466            .unwrap();
2467
2468        // Simulate a language server reporting errors for a file.
2469        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2470        fake_language_server
2471            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2472            .await;
2473        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2474            lsp::PublishDiagnosticsParams {
2475                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2476                version: None,
2477                diagnostics: vec![lsp::Diagnostic {
2478                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2479                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2480                    message: "message 1".to_string(),
2481                    ..Default::default()
2482                }],
2483            },
2484        );
2485
2486        // Wait for server to see the diagnostics update.
2487        server
2488            .condition(|store| {
2489                let worktree = store
2490                    .project(project_id)
2491                    .unwrap()
2492                    .share
2493                    .as_ref()
2494                    .unwrap()
2495                    .worktrees
2496                    .get(&worktree_id.to_proto())
2497                    .unwrap();
2498
2499                !worktree.diagnostic_summaries.is_empty()
2500            })
2501            .await;
2502
2503        // Join the worktree as client B.
2504        let project_b = Project::remote(
2505            project_id,
2506            client_b.clone(),
2507            client_b.user_store.clone(),
2508            lang_registry.clone(),
2509            fs.clone(),
2510            &mut cx_b.to_async(),
2511        )
2512        .await
2513        .unwrap();
2514
2515        project_b.read_with(cx_b, |project, cx| {
2516            assert_eq!(
2517                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2518                &[(
2519                    ProjectPath {
2520                        worktree_id,
2521                        path: Arc::from(Path::new("a.rs")),
2522                    },
2523                    DiagnosticSummary {
2524                        error_count: 1,
2525                        warning_count: 0,
2526                        ..Default::default()
2527                    },
2528                )]
2529            )
2530        });
2531
2532        // Simulate a language server reporting more errors for a file.
2533        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2534            lsp::PublishDiagnosticsParams {
2535                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2536                version: None,
2537                diagnostics: vec![
2538                    lsp::Diagnostic {
2539                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2540                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2541                        message: "message 1".to_string(),
2542                        ..Default::default()
2543                    },
2544                    lsp::Diagnostic {
2545                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2546                        range: lsp::Range::new(
2547                            lsp::Position::new(0, 10),
2548                            lsp::Position::new(0, 13),
2549                        ),
2550                        message: "message 2".to_string(),
2551                        ..Default::default()
2552                    },
2553                ],
2554            },
2555        );
2556
2557        // Client b gets the updated summaries
2558        project_b
2559            .condition(&cx_b, |project, cx| {
2560                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2561                    == &[(
2562                        ProjectPath {
2563                            worktree_id,
2564                            path: Arc::from(Path::new("a.rs")),
2565                        },
2566                        DiagnosticSummary {
2567                            error_count: 1,
2568                            warning_count: 1,
2569                            ..Default::default()
2570                        },
2571                    )]
2572            })
2573            .await;
2574
2575        // Open the file with the errors on client B. They should be present.
2576        let buffer_b = cx_b
2577            .background()
2578            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2579            .await
2580            .unwrap();
2581
2582        buffer_b.read_with(cx_b, |buffer, _| {
2583            assert_eq!(
2584                buffer
2585                    .snapshot()
2586                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2587                    .map(|entry| entry)
2588                    .collect::<Vec<_>>(),
2589                &[
2590                    DiagnosticEntry {
2591                        range: Point::new(0, 4)..Point::new(0, 7),
2592                        diagnostic: Diagnostic {
2593                            group_id: 0,
2594                            message: "message 1".to_string(),
2595                            severity: lsp::DiagnosticSeverity::ERROR,
2596                            is_primary: true,
2597                            ..Default::default()
2598                        }
2599                    },
2600                    DiagnosticEntry {
2601                        range: Point::new(0, 10)..Point::new(0, 13),
2602                        diagnostic: Diagnostic {
2603                            group_id: 1,
2604                            severity: lsp::DiagnosticSeverity::WARNING,
2605                            message: "message 2".to_string(),
2606                            is_primary: true,
2607                            ..Default::default()
2608                        }
2609                    }
2610                ]
2611            );
2612        });
2613
2614        // Simulate a language server reporting no errors for a file.
2615        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2616            lsp::PublishDiagnosticsParams {
2617                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2618                version: None,
2619                diagnostics: vec![],
2620            },
2621        );
2622        project_a
2623            .condition(cx_a, |project, cx| {
2624                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2625            })
2626            .await;
2627        project_b
2628            .condition(cx_b, |project, cx| {
2629                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2630            })
2631            .await;
2632    }
2633
2634    #[gpui::test(iterations = 10)]
2635    async fn test_collaborating_with_completion(
2636        cx_a: &mut TestAppContext,
2637        cx_b: &mut TestAppContext,
2638    ) {
2639        cx_a.foreground().forbid_parking();
2640        let lang_registry = Arc::new(LanguageRegistry::test());
2641        let fs = FakeFs::new(cx_a.background());
2642
2643        // Set up a fake language server.
2644        let mut language = Language::new(
2645            LanguageConfig {
2646                name: "Rust".into(),
2647                path_suffixes: vec!["rs".to_string()],
2648                ..Default::default()
2649            },
2650            Some(tree_sitter_rust::language()),
2651        );
2652        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2653            capabilities: lsp::ServerCapabilities {
2654                completion_provider: Some(lsp::CompletionOptions {
2655                    trigger_characters: Some(vec![".".to_string()]),
2656                    ..Default::default()
2657                }),
2658                ..Default::default()
2659            },
2660            ..Default::default()
2661        });
2662        lang_registry.add(Arc::new(language));
2663
2664        // Connect to a server as 2 clients.
2665        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2666        let client_a = server.create_client(cx_a, "user_a").await;
2667        let client_b = server.create_client(cx_b, "user_b").await;
2668
2669        // Share a project as client A
2670        fs.insert_tree(
2671            "/a",
2672            json!({
2673                ".zed.toml": r#"collaborators = ["user_b"]"#,
2674                "main.rs": "fn main() { a }",
2675                "other.rs": "",
2676            }),
2677        )
2678        .await;
2679        let project_a = cx_a.update(|cx| {
2680            Project::local(
2681                client_a.clone(),
2682                client_a.user_store.clone(),
2683                lang_registry.clone(),
2684                fs.clone(),
2685                cx,
2686            )
2687        });
2688        let (worktree_a, _) = project_a
2689            .update(cx_a, |p, cx| {
2690                p.find_or_create_local_worktree("/a", true, cx)
2691            })
2692            .await
2693            .unwrap();
2694        worktree_a
2695            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2696            .await;
2697        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2698        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2699        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2700
2701        // Join the worktree as client B.
2702        let project_b = Project::remote(
2703            project_id,
2704            client_b.clone(),
2705            client_b.user_store.clone(),
2706            lang_registry.clone(),
2707            fs.clone(),
2708            &mut cx_b.to_async(),
2709        )
2710        .await
2711        .unwrap();
2712
2713        // Open a file in an editor as the guest.
2714        let buffer_b = project_b
2715            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2716            .await
2717            .unwrap();
2718        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2719        let editor_b = cx_b.add_view(window_b, |cx| {
2720            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2721        });
2722
2723        let fake_language_server = fake_language_servers.next().await.unwrap();
2724        buffer_b
2725            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2726            .await;
2727
2728        // Type a completion trigger character as the guest.
2729        editor_b.update(cx_b, |editor, cx| {
2730            editor.select_ranges([13..13], None, cx);
2731            editor.handle_input(&Input(".".into()), cx);
2732            cx.focus(&editor_b);
2733        });
2734
2735        // Receive a completion request as the host's language server.
2736        // Return some completions from the host's language server.
2737        cx_a.foreground().start_waiting();
2738        fake_language_server
2739            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2740                assert_eq!(
2741                    params.text_document_position.text_document.uri,
2742                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2743                );
2744                assert_eq!(
2745                    params.text_document_position.position,
2746                    lsp::Position::new(0, 14),
2747                );
2748
2749                Ok(Some(lsp::CompletionResponse::Array(vec![
2750                    lsp::CompletionItem {
2751                        label: "first_method(…)".into(),
2752                        detail: Some("fn(&mut self, B) -> C".into()),
2753                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2754                            new_text: "first_method($1)".to_string(),
2755                            range: lsp::Range::new(
2756                                lsp::Position::new(0, 14),
2757                                lsp::Position::new(0, 14),
2758                            ),
2759                        })),
2760                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2761                        ..Default::default()
2762                    },
2763                    lsp::CompletionItem {
2764                        label: "second_method(…)".into(),
2765                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2766                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2767                            new_text: "second_method()".to_string(),
2768                            range: lsp::Range::new(
2769                                lsp::Position::new(0, 14),
2770                                lsp::Position::new(0, 14),
2771                            ),
2772                        })),
2773                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2774                        ..Default::default()
2775                    },
2776                ])))
2777            })
2778            .next()
2779            .await
2780            .unwrap();
2781        cx_a.foreground().finish_waiting();
2782
2783        // Open the buffer on the host.
2784        let buffer_a = project_a
2785            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2786            .await
2787            .unwrap();
2788        buffer_a
2789            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2790            .await;
2791
2792        // Confirm a completion on the guest.
2793        editor_b
2794            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2795            .await;
2796        editor_b.update(cx_b, |editor, cx| {
2797            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2798            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2799        });
2800
2801        // Return a resolved completion from the host's language server.
2802        // The resolved completion has an additional text edit.
2803        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2804            |params, _| async move {
2805                assert_eq!(params.label, "first_method(…)");
2806                Ok(lsp::CompletionItem {
2807                    label: "first_method(…)".into(),
2808                    detail: Some("fn(&mut self, B) -> C".into()),
2809                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2810                        new_text: "first_method($1)".to_string(),
2811                        range: lsp::Range::new(
2812                            lsp::Position::new(0, 14),
2813                            lsp::Position::new(0, 14),
2814                        ),
2815                    })),
2816                    additional_text_edits: Some(vec![lsp::TextEdit {
2817                        new_text: "use d::SomeTrait;\n".to_string(),
2818                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2819                    }]),
2820                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2821                    ..Default::default()
2822                })
2823            },
2824        );
2825
2826        // The additional edit is applied.
2827        buffer_a
2828            .condition(&cx_a, |buffer, _| {
2829                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2830            })
2831            .await;
2832        buffer_b
2833            .condition(&cx_b, |buffer, _| {
2834                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2835            })
2836            .await;
2837    }
2838
2839    #[gpui::test(iterations = 10)]
2840    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2841        cx_a.foreground().forbid_parking();
2842        let lang_registry = Arc::new(LanguageRegistry::test());
2843        let fs = FakeFs::new(cx_a.background());
2844
2845        // Connect to a server as 2 clients.
2846        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2847        let client_a = server.create_client(cx_a, "user_a").await;
2848        let client_b = server.create_client(cx_b, "user_b").await;
2849
2850        // Share a project as client A
2851        fs.insert_tree(
2852            "/a",
2853            json!({
2854                ".zed.toml": r#"collaborators = ["user_b"]"#,
2855                "a.rs": "let one = 1;",
2856            }),
2857        )
2858        .await;
2859        let project_a = cx_a.update(|cx| {
2860            Project::local(
2861                client_a.clone(),
2862                client_a.user_store.clone(),
2863                lang_registry.clone(),
2864                fs.clone(),
2865                cx,
2866            )
2867        });
2868        let (worktree_a, _) = project_a
2869            .update(cx_a, |p, cx| {
2870                p.find_or_create_local_worktree("/a", true, cx)
2871            })
2872            .await
2873            .unwrap();
2874        worktree_a
2875            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2876            .await;
2877        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2878        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2879        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2880        let buffer_a = project_a
2881            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2882            .await
2883            .unwrap();
2884
2885        // Join the worktree as client B.
2886        let project_b = Project::remote(
2887            project_id,
2888            client_b.clone(),
2889            client_b.user_store.clone(),
2890            lang_registry.clone(),
2891            fs.clone(),
2892            &mut cx_b.to_async(),
2893        )
2894        .await
2895        .unwrap();
2896
2897        let buffer_b = cx_b
2898            .background()
2899            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2900            .await
2901            .unwrap();
2902        buffer_b.update(cx_b, |buffer, cx| {
2903            buffer.edit([(4..7, "six")], cx);
2904            buffer.edit([(10..11, "6")], cx);
2905            assert_eq!(buffer.text(), "let six = 6;");
2906            assert!(buffer.is_dirty());
2907            assert!(!buffer.has_conflict());
2908        });
2909        buffer_a
2910            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
2911            .await;
2912
2913        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
2914            .await
2915            .unwrap();
2916        buffer_a
2917            .condition(cx_a, |buffer, _| buffer.has_conflict())
2918            .await;
2919        buffer_b
2920            .condition(cx_b, |buffer, _| buffer.has_conflict())
2921            .await;
2922
2923        project_b
2924            .update(cx_b, |project, cx| {
2925                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
2926            })
2927            .await
2928            .unwrap();
2929        buffer_a.read_with(cx_a, |buffer, _| {
2930            assert_eq!(buffer.text(), "let seven = 7;");
2931            assert!(!buffer.is_dirty());
2932            assert!(!buffer.has_conflict());
2933        });
2934        buffer_b.read_with(cx_b, |buffer, _| {
2935            assert_eq!(buffer.text(), "let seven = 7;");
2936            assert!(!buffer.is_dirty());
2937            assert!(!buffer.has_conflict());
2938        });
2939
2940        buffer_a.update(cx_a, |buffer, cx| {
2941            // Undoing on the host is a no-op when the reload was initiated by the guest.
2942            buffer.undo(cx);
2943            assert_eq!(buffer.text(), "let seven = 7;");
2944            assert!(!buffer.is_dirty());
2945            assert!(!buffer.has_conflict());
2946        });
2947        buffer_b.update(cx_b, |buffer, cx| {
2948            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
2949            buffer.undo(cx);
2950            assert_eq!(buffer.text(), "let six = 6;");
2951            assert!(buffer.is_dirty());
2952            assert!(!buffer.has_conflict());
2953        });
2954    }
2955
2956    #[gpui::test(iterations = 10)]
2957    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2958        cx_a.foreground().forbid_parking();
2959        let lang_registry = Arc::new(LanguageRegistry::test());
2960        let fs = FakeFs::new(cx_a.background());
2961
2962        // Set up a fake language server.
2963        let mut language = Language::new(
2964            LanguageConfig {
2965                name: "Rust".into(),
2966                path_suffixes: vec!["rs".to_string()],
2967                ..Default::default()
2968            },
2969            Some(tree_sitter_rust::language()),
2970        );
2971        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2972        lang_registry.add(Arc::new(language));
2973
2974        // Connect to a server as 2 clients.
2975        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2976        let client_a = server.create_client(cx_a, "user_a").await;
2977        let client_b = server.create_client(cx_b, "user_b").await;
2978
2979        // Share a project as client A
2980        fs.insert_tree(
2981            "/a",
2982            json!({
2983                ".zed.toml": r#"collaborators = ["user_b"]"#,
2984                "a.rs": "let one = two",
2985            }),
2986        )
2987        .await;
2988        let project_a = cx_a.update(|cx| {
2989            Project::local(
2990                client_a.clone(),
2991                client_a.user_store.clone(),
2992                lang_registry.clone(),
2993                fs.clone(),
2994                cx,
2995            )
2996        });
2997        let (worktree_a, _) = project_a
2998            .update(cx_a, |p, cx| {
2999                p.find_or_create_local_worktree("/a", true, cx)
3000            })
3001            .await
3002            .unwrap();
3003        worktree_a
3004            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3005            .await;
3006        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3007        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3008        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3009
3010        // Join the worktree as client B.
3011        let project_b = Project::remote(
3012            project_id,
3013            client_b.clone(),
3014            client_b.user_store.clone(),
3015            lang_registry.clone(),
3016            fs.clone(),
3017            &mut cx_b.to_async(),
3018        )
3019        .await
3020        .unwrap();
3021
3022        let buffer_b = cx_b
3023            .background()
3024            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3025            .await
3026            .unwrap();
3027
3028        let fake_language_server = fake_language_servers.next().await.unwrap();
3029        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3030            Ok(Some(vec![
3031                lsp::TextEdit {
3032                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3033                    new_text: "h".to_string(),
3034                },
3035                lsp::TextEdit {
3036                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3037                    new_text: "y".to_string(),
3038                },
3039            ]))
3040        });
3041
3042        project_b
3043            .update(cx_b, |project, cx| {
3044                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3045            })
3046            .await
3047            .unwrap();
3048        assert_eq!(
3049            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3050            "let honey = two"
3051        );
3052    }
3053
3054    #[gpui::test(iterations = 10)]
3055    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3056        cx_a.foreground().forbid_parking();
3057        let lang_registry = Arc::new(LanguageRegistry::test());
3058        let fs = FakeFs::new(cx_a.background());
3059        fs.insert_tree(
3060            "/root-1",
3061            json!({
3062                ".zed.toml": r#"collaborators = ["user_b"]"#,
3063                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3064            }),
3065        )
3066        .await;
3067        fs.insert_tree(
3068            "/root-2",
3069            json!({
3070                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3071            }),
3072        )
3073        .await;
3074
3075        // Set up a fake language server.
3076        let mut language = Language::new(
3077            LanguageConfig {
3078                name: "Rust".into(),
3079                path_suffixes: vec!["rs".to_string()],
3080                ..Default::default()
3081            },
3082            Some(tree_sitter_rust::language()),
3083        );
3084        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3085        lang_registry.add(Arc::new(language));
3086
3087        // Connect to a server as 2 clients.
3088        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3089        let client_a = server.create_client(cx_a, "user_a").await;
3090        let client_b = server.create_client(cx_b, "user_b").await;
3091
3092        // Share a project as client A
3093        let project_a = cx_a.update(|cx| {
3094            Project::local(
3095                client_a.clone(),
3096                client_a.user_store.clone(),
3097                lang_registry.clone(),
3098                fs.clone(),
3099                cx,
3100            )
3101        });
3102        let (worktree_a, _) = project_a
3103            .update(cx_a, |p, cx| {
3104                p.find_or_create_local_worktree("/root-1", true, cx)
3105            })
3106            .await
3107            .unwrap();
3108        worktree_a
3109            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3110            .await;
3111        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3112        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3113        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3114
3115        // Join the worktree as client B.
3116        let project_b = Project::remote(
3117            project_id,
3118            client_b.clone(),
3119            client_b.user_store.clone(),
3120            lang_registry.clone(),
3121            fs.clone(),
3122            &mut cx_b.to_async(),
3123        )
3124        .await
3125        .unwrap();
3126
3127        // Open the file on client B.
3128        let buffer_b = cx_b
3129            .background()
3130            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3131            .await
3132            .unwrap();
3133
3134        // Request the definition of a symbol as the guest.
3135        let fake_language_server = fake_language_servers.next().await.unwrap();
3136        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3137            |_, _| async move {
3138                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3139                    lsp::Location::new(
3140                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3141                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3142                    ),
3143                )))
3144            },
3145        );
3146
3147        let definitions_1 = project_b
3148            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3149            .await
3150            .unwrap();
3151        cx_b.read(|cx| {
3152            assert_eq!(definitions_1.len(), 1);
3153            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3154            let target_buffer = definitions_1[0].buffer.read(cx);
3155            assert_eq!(
3156                target_buffer.text(),
3157                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3158            );
3159            assert_eq!(
3160                definitions_1[0].range.to_point(target_buffer),
3161                Point::new(0, 6)..Point::new(0, 9)
3162            );
3163        });
3164
3165        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3166        // the previous call to `definition`.
3167        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3168            |_, _| async move {
3169                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3170                    lsp::Location::new(
3171                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3172                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3173                    ),
3174                )))
3175            },
3176        );
3177
3178        let definitions_2 = project_b
3179            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3180            .await
3181            .unwrap();
3182        cx_b.read(|cx| {
3183            assert_eq!(definitions_2.len(), 1);
3184            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3185            let target_buffer = definitions_2[0].buffer.read(cx);
3186            assert_eq!(
3187                target_buffer.text(),
3188                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3189            );
3190            assert_eq!(
3191                definitions_2[0].range.to_point(target_buffer),
3192                Point::new(1, 6)..Point::new(1, 11)
3193            );
3194        });
3195        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3196    }
3197
3198    #[gpui::test(iterations = 10)]
3199    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3200        cx_a.foreground().forbid_parking();
3201        let lang_registry = Arc::new(LanguageRegistry::test());
3202        let fs = FakeFs::new(cx_a.background());
3203        fs.insert_tree(
3204            "/root-1",
3205            json!({
3206                ".zed.toml": r#"collaborators = ["user_b"]"#,
3207                "one.rs": "const ONE: usize = 1;",
3208                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3209            }),
3210        )
3211        .await;
3212        fs.insert_tree(
3213            "/root-2",
3214            json!({
3215                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3216            }),
3217        )
3218        .await;
3219
3220        // Set up a fake language server.
3221        let mut language = Language::new(
3222            LanguageConfig {
3223                name: "Rust".into(),
3224                path_suffixes: vec!["rs".to_string()],
3225                ..Default::default()
3226            },
3227            Some(tree_sitter_rust::language()),
3228        );
3229        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3230        lang_registry.add(Arc::new(language));
3231
3232        // Connect to a server as 2 clients.
3233        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3234        let client_a = server.create_client(cx_a, "user_a").await;
3235        let client_b = server.create_client(cx_b, "user_b").await;
3236
3237        // Share a project as client A
3238        let project_a = cx_a.update(|cx| {
3239            Project::local(
3240                client_a.clone(),
3241                client_a.user_store.clone(),
3242                lang_registry.clone(),
3243                fs.clone(),
3244                cx,
3245            )
3246        });
3247        let (worktree_a, _) = project_a
3248            .update(cx_a, |p, cx| {
3249                p.find_or_create_local_worktree("/root-1", true, cx)
3250            })
3251            .await
3252            .unwrap();
3253        worktree_a
3254            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3255            .await;
3256        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3257        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3258        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3259
3260        // Join the worktree as client B.
3261        let project_b = Project::remote(
3262            project_id,
3263            client_b.clone(),
3264            client_b.user_store.clone(),
3265            lang_registry.clone(),
3266            fs.clone(),
3267            &mut cx_b.to_async(),
3268        )
3269        .await
3270        .unwrap();
3271
3272        // Open the file on client B.
3273        let buffer_b = cx_b
3274            .background()
3275            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3276            .await
3277            .unwrap();
3278
3279        // Request references to a symbol as the guest.
3280        let fake_language_server = fake_language_servers.next().await.unwrap();
3281        fake_language_server.handle_request::<lsp::request::References, _, _>(
3282            |params, _| async move {
3283                assert_eq!(
3284                    params.text_document_position.text_document.uri.as_str(),
3285                    "file:///root-1/one.rs"
3286                );
3287                Ok(Some(vec![
3288                    lsp::Location {
3289                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3290                        range: lsp::Range::new(
3291                            lsp::Position::new(0, 24),
3292                            lsp::Position::new(0, 27),
3293                        ),
3294                    },
3295                    lsp::Location {
3296                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3297                        range: lsp::Range::new(
3298                            lsp::Position::new(0, 35),
3299                            lsp::Position::new(0, 38),
3300                        ),
3301                    },
3302                    lsp::Location {
3303                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3304                        range: lsp::Range::new(
3305                            lsp::Position::new(0, 37),
3306                            lsp::Position::new(0, 40),
3307                        ),
3308                    },
3309                ]))
3310            },
3311        );
3312
3313        let references = project_b
3314            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3315            .await
3316            .unwrap();
3317        cx_b.read(|cx| {
3318            assert_eq!(references.len(), 3);
3319            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3320
3321            let two_buffer = references[0].buffer.read(cx);
3322            let three_buffer = references[2].buffer.read(cx);
3323            assert_eq!(
3324                two_buffer.file().unwrap().path().as_ref(),
3325                Path::new("two.rs")
3326            );
3327            assert_eq!(references[1].buffer, references[0].buffer);
3328            assert_eq!(
3329                three_buffer.file().unwrap().full_path(cx),
3330                Path::new("three.rs")
3331            );
3332
3333            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3334            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3335            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3336        });
3337    }
3338
3339    #[gpui::test(iterations = 10)]
3340    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3341        cx_a.foreground().forbid_parking();
3342        let lang_registry = Arc::new(LanguageRegistry::test());
3343        let fs = FakeFs::new(cx_a.background());
3344        fs.insert_tree(
3345            "/root-1",
3346            json!({
3347                ".zed.toml": r#"collaborators = ["user_b"]"#,
3348                "a": "hello world",
3349                "b": "goodnight moon",
3350                "c": "a world of goo",
3351                "d": "world champion of clown world",
3352            }),
3353        )
3354        .await;
3355        fs.insert_tree(
3356            "/root-2",
3357            json!({
3358                "e": "disney world is fun",
3359            }),
3360        )
3361        .await;
3362
3363        // Connect to a server as 2 clients.
3364        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3365        let client_a = server.create_client(cx_a, "user_a").await;
3366        let client_b = server.create_client(cx_b, "user_b").await;
3367
3368        // Share a project as client A
3369        let project_a = cx_a.update(|cx| {
3370            Project::local(
3371                client_a.clone(),
3372                client_a.user_store.clone(),
3373                lang_registry.clone(),
3374                fs.clone(),
3375                cx,
3376            )
3377        });
3378        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3379
3380        let (worktree_1, _) = project_a
3381            .update(cx_a, |p, cx| {
3382                p.find_or_create_local_worktree("/root-1", true, cx)
3383            })
3384            .await
3385            .unwrap();
3386        worktree_1
3387            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3388            .await;
3389        let (worktree_2, _) = project_a
3390            .update(cx_a, |p, cx| {
3391                p.find_or_create_local_worktree("/root-2", true, cx)
3392            })
3393            .await
3394            .unwrap();
3395        worktree_2
3396            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3397            .await;
3398
3399        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3400
3401        // Join the worktree as client B.
3402        let project_b = Project::remote(
3403            project_id,
3404            client_b.clone(),
3405            client_b.user_store.clone(),
3406            lang_registry.clone(),
3407            fs.clone(),
3408            &mut cx_b.to_async(),
3409        )
3410        .await
3411        .unwrap();
3412
3413        let results = project_b
3414            .update(cx_b, |project, cx| {
3415                project.search(SearchQuery::text("world", false, false), cx)
3416            })
3417            .await
3418            .unwrap();
3419
3420        let mut ranges_by_path = results
3421            .into_iter()
3422            .map(|(buffer, ranges)| {
3423                buffer.read_with(cx_b, |buffer, cx| {
3424                    let path = buffer.file().unwrap().full_path(cx);
3425                    let offset_ranges = ranges
3426                        .into_iter()
3427                        .map(|range| range.to_offset(buffer))
3428                        .collect::<Vec<_>>();
3429                    (path, offset_ranges)
3430                })
3431            })
3432            .collect::<Vec<_>>();
3433        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3434
3435        assert_eq!(
3436            ranges_by_path,
3437            &[
3438                (PathBuf::from("root-1/a"), vec![6..11]),
3439                (PathBuf::from("root-1/c"), vec![2..7]),
3440                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3441                (PathBuf::from("root-2/e"), vec![7..12]),
3442            ]
3443        );
3444    }
3445
3446    #[gpui::test(iterations = 10)]
3447    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3448        cx_a.foreground().forbid_parking();
3449        let lang_registry = Arc::new(LanguageRegistry::test());
3450        let fs = FakeFs::new(cx_a.background());
3451        fs.insert_tree(
3452            "/root-1",
3453            json!({
3454                ".zed.toml": r#"collaborators = ["user_b"]"#,
3455                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3456            }),
3457        )
3458        .await;
3459
3460        // Set up a fake language server.
3461        let mut language = Language::new(
3462            LanguageConfig {
3463                name: "Rust".into(),
3464                path_suffixes: vec!["rs".to_string()],
3465                ..Default::default()
3466            },
3467            Some(tree_sitter_rust::language()),
3468        );
3469        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3470        lang_registry.add(Arc::new(language));
3471
3472        // Connect to a server as 2 clients.
3473        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3474        let client_a = server.create_client(cx_a, "user_a").await;
3475        let client_b = server.create_client(cx_b, "user_b").await;
3476
3477        // Share a project as client A
3478        let project_a = cx_a.update(|cx| {
3479            Project::local(
3480                client_a.clone(),
3481                client_a.user_store.clone(),
3482                lang_registry.clone(),
3483                fs.clone(),
3484                cx,
3485            )
3486        });
3487        let (worktree_a, _) = project_a
3488            .update(cx_a, |p, cx| {
3489                p.find_or_create_local_worktree("/root-1", true, cx)
3490            })
3491            .await
3492            .unwrap();
3493        worktree_a
3494            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3495            .await;
3496        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3497        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3498        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3499
3500        // Join the worktree as client B.
3501        let project_b = Project::remote(
3502            project_id,
3503            client_b.clone(),
3504            client_b.user_store.clone(),
3505            lang_registry.clone(),
3506            fs.clone(),
3507            &mut cx_b.to_async(),
3508        )
3509        .await
3510        .unwrap();
3511
3512        // Open the file on client B.
3513        let buffer_b = cx_b
3514            .background()
3515            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3516            .await
3517            .unwrap();
3518
3519        // Request document highlights as the guest.
3520        let fake_language_server = fake_language_servers.next().await.unwrap();
3521        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3522            |params, _| async move {
3523                assert_eq!(
3524                    params
3525                        .text_document_position_params
3526                        .text_document
3527                        .uri
3528                        .as_str(),
3529                    "file:///root-1/main.rs"
3530                );
3531                assert_eq!(
3532                    params.text_document_position_params.position,
3533                    lsp::Position::new(0, 34)
3534                );
3535                Ok(Some(vec![
3536                    lsp::DocumentHighlight {
3537                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3538                        range: lsp::Range::new(
3539                            lsp::Position::new(0, 10),
3540                            lsp::Position::new(0, 16),
3541                        ),
3542                    },
3543                    lsp::DocumentHighlight {
3544                        kind: Some(lsp::DocumentHighlightKind::READ),
3545                        range: lsp::Range::new(
3546                            lsp::Position::new(0, 32),
3547                            lsp::Position::new(0, 38),
3548                        ),
3549                    },
3550                    lsp::DocumentHighlight {
3551                        kind: Some(lsp::DocumentHighlightKind::READ),
3552                        range: lsp::Range::new(
3553                            lsp::Position::new(0, 41),
3554                            lsp::Position::new(0, 47),
3555                        ),
3556                    },
3557                ]))
3558            },
3559        );
3560
3561        let highlights = project_b
3562            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3563            .await
3564            .unwrap();
3565        buffer_b.read_with(cx_b, |buffer, _| {
3566            let snapshot = buffer.snapshot();
3567
3568            let highlights = highlights
3569                .into_iter()
3570                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3571                .collect::<Vec<_>>();
3572            assert_eq!(
3573                highlights,
3574                &[
3575                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3576                    (lsp::DocumentHighlightKind::READ, 32..38),
3577                    (lsp::DocumentHighlightKind::READ, 41..47)
3578                ]
3579            )
3580        });
3581    }
3582
3583    #[gpui::test(iterations = 10)]
3584    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3585        cx_a.foreground().forbid_parking();
3586        let lang_registry = Arc::new(LanguageRegistry::test());
3587        let fs = FakeFs::new(cx_a.background());
3588        fs.insert_tree(
3589            "/code",
3590            json!({
3591                "crate-1": {
3592                    ".zed.toml": r#"collaborators = ["user_b"]"#,
3593                    "one.rs": "const ONE: usize = 1;",
3594                },
3595                "crate-2": {
3596                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3597                },
3598                "private": {
3599                    "passwords.txt": "the-password",
3600                }
3601            }),
3602        )
3603        .await;
3604
3605        // Set up a fake language server.
3606        let mut language = Language::new(
3607            LanguageConfig {
3608                name: "Rust".into(),
3609                path_suffixes: vec!["rs".to_string()],
3610                ..Default::default()
3611            },
3612            Some(tree_sitter_rust::language()),
3613        );
3614        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3615        lang_registry.add(Arc::new(language));
3616
3617        // Connect to a server as 2 clients.
3618        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3619        let client_a = server.create_client(cx_a, "user_a").await;
3620        let client_b = server.create_client(cx_b, "user_b").await;
3621
3622        // Share a project as client A
3623        let project_a = cx_a.update(|cx| {
3624            Project::local(
3625                client_a.clone(),
3626                client_a.user_store.clone(),
3627                lang_registry.clone(),
3628                fs.clone(),
3629                cx,
3630            )
3631        });
3632        let (worktree_a, _) = project_a
3633            .update(cx_a, |p, cx| {
3634                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3635            })
3636            .await
3637            .unwrap();
3638        worktree_a
3639            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3640            .await;
3641        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3642        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3643        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3644
3645        // Join the worktree as client B.
3646        let project_b = Project::remote(
3647            project_id,
3648            client_b.clone(),
3649            client_b.user_store.clone(),
3650            lang_registry.clone(),
3651            fs.clone(),
3652            &mut cx_b.to_async(),
3653        )
3654        .await
3655        .unwrap();
3656
3657        // Cause the language server to start.
3658        let _buffer = cx_b
3659            .background()
3660            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3661            .await
3662            .unwrap();
3663
3664        let fake_language_server = fake_language_servers.next().await.unwrap();
3665        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3666            |_, _| async move {
3667                #[allow(deprecated)]
3668                Ok(Some(vec![lsp::SymbolInformation {
3669                    name: "TWO".into(),
3670                    location: lsp::Location {
3671                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3672                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3673                    },
3674                    kind: lsp::SymbolKind::CONSTANT,
3675                    tags: None,
3676                    container_name: None,
3677                    deprecated: None,
3678                }]))
3679            },
3680        );
3681
3682        // Request the definition of a symbol as the guest.
3683        let symbols = project_b
3684            .update(cx_b, |p, cx| p.symbols("two", cx))
3685            .await
3686            .unwrap();
3687        assert_eq!(symbols.len(), 1);
3688        assert_eq!(symbols[0].name, "TWO");
3689
3690        // Open one of the returned symbols.
3691        let buffer_b_2 = project_b
3692            .update(cx_b, |project, cx| {
3693                project.open_buffer_for_symbol(&symbols[0], cx)
3694            })
3695            .await
3696            .unwrap();
3697        buffer_b_2.read_with(cx_b, |buffer, _| {
3698            assert_eq!(
3699                buffer.file().unwrap().path().as_ref(),
3700                Path::new("../crate-2/two.rs")
3701            );
3702        });
3703
3704        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3705        let mut fake_symbol = symbols[0].clone();
3706        fake_symbol.path = Path::new("/code/secrets").into();
3707        let error = project_b
3708            .update(cx_b, |project, cx| {
3709                project.open_buffer_for_symbol(&fake_symbol, cx)
3710            })
3711            .await
3712            .unwrap_err();
3713        assert!(error.to_string().contains("invalid symbol signature"));
3714    }
3715
3716    #[gpui::test(iterations = 10)]
3717    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3718        cx_a: &mut TestAppContext,
3719        cx_b: &mut TestAppContext,
3720        mut rng: StdRng,
3721    ) {
3722        cx_a.foreground().forbid_parking();
3723        let lang_registry = Arc::new(LanguageRegistry::test());
3724        let fs = FakeFs::new(cx_a.background());
3725        fs.insert_tree(
3726            "/root",
3727            json!({
3728                ".zed.toml": r#"collaborators = ["user_b"]"#,
3729                "a.rs": "const ONE: usize = b::TWO;",
3730                "b.rs": "const TWO: usize = 2",
3731            }),
3732        )
3733        .await;
3734
3735        // Set up a fake language server.
3736        let mut language = Language::new(
3737            LanguageConfig {
3738                name: "Rust".into(),
3739                path_suffixes: vec!["rs".to_string()],
3740                ..Default::default()
3741            },
3742            Some(tree_sitter_rust::language()),
3743        );
3744        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3745        lang_registry.add(Arc::new(language));
3746
3747        // Connect to a server as 2 clients.
3748        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3749        let client_a = server.create_client(cx_a, "user_a").await;
3750        let client_b = server.create_client(cx_b, "user_b").await;
3751
3752        // Share a project as client A
3753        let project_a = cx_a.update(|cx| {
3754            Project::local(
3755                client_a.clone(),
3756                client_a.user_store.clone(),
3757                lang_registry.clone(),
3758                fs.clone(),
3759                cx,
3760            )
3761        });
3762
3763        let (worktree_a, _) = project_a
3764            .update(cx_a, |p, cx| {
3765                p.find_or_create_local_worktree("/root", true, cx)
3766            })
3767            .await
3768            .unwrap();
3769        worktree_a
3770            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3771            .await;
3772        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3773        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3774        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3775
3776        // Join the worktree as client B.
3777        let project_b = Project::remote(
3778            project_id,
3779            client_b.clone(),
3780            client_b.user_store.clone(),
3781            lang_registry.clone(),
3782            fs.clone(),
3783            &mut cx_b.to_async(),
3784        )
3785        .await
3786        .unwrap();
3787
3788        let buffer_b1 = cx_b
3789            .background()
3790            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3791            .await
3792            .unwrap();
3793
3794        let fake_language_server = fake_language_servers.next().await.unwrap();
3795        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3796            |_, _| async move {
3797                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3798                    lsp::Location::new(
3799                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
3800                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3801                    ),
3802                )))
3803            },
3804        );
3805
3806        let definitions;
3807        let buffer_b2;
3808        if rng.gen() {
3809            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3810            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3811        } else {
3812            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3813            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3814        }
3815
3816        let buffer_b2 = buffer_b2.await.unwrap();
3817        let definitions = definitions.await.unwrap();
3818        assert_eq!(definitions.len(), 1);
3819        assert_eq!(definitions[0].buffer, buffer_b2);
3820    }
3821
3822    #[gpui::test(iterations = 10)]
3823    async fn test_collaborating_with_code_actions(
3824        cx_a: &mut TestAppContext,
3825        cx_b: &mut TestAppContext,
3826    ) {
3827        cx_a.foreground().forbid_parking();
3828        let lang_registry = Arc::new(LanguageRegistry::test());
3829        let fs = FakeFs::new(cx_a.background());
3830        cx_b.update(|cx| editor::init(cx));
3831
3832        // Set up a fake language server.
3833        let mut language = Language::new(
3834            LanguageConfig {
3835                name: "Rust".into(),
3836                path_suffixes: vec!["rs".to_string()],
3837                ..Default::default()
3838            },
3839            Some(tree_sitter_rust::language()),
3840        );
3841        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3842        lang_registry.add(Arc::new(language));
3843
3844        // Connect to a server as 2 clients.
3845        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3846        let client_a = server.create_client(cx_a, "user_a").await;
3847        let client_b = server.create_client(cx_b, "user_b").await;
3848
3849        // Share a project as client A
3850        fs.insert_tree(
3851            "/a",
3852            json!({
3853                ".zed.toml": r#"collaborators = ["user_b"]"#,
3854                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3855                "other.rs": "pub fn foo() -> usize { 4 }",
3856            }),
3857        )
3858        .await;
3859        let project_a = cx_a.update(|cx| {
3860            Project::local(
3861                client_a.clone(),
3862                client_a.user_store.clone(),
3863                lang_registry.clone(),
3864                fs.clone(),
3865                cx,
3866            )
3867        });
3868        let (worktree_a, _) = project_a
3869            .update(cx_a, |p, cx| {
3870                p.find_or_create_local_worktree("/a", true, cx)
3871            })
3872            .await
3873            .unwrap();
3874        worktree_a
3875            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3876            .await;
3877        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3878        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3879        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3880
3881        // Join the worktree as client B.
3882        let project_b = Project::remote(
3883            project_id,
3884            client_b.clone(),
3885            client_b.user_store.clone(),
3886            lang_registry.clone(),
3887            fs.clone(),
3888            &mut cx_b.to_async(),
3889        )
3890        .await
3891        .unwrap();
3892        let mut params = cx_b.update(WorkspaceParams::test);
3893        params.languages = lang_registry.clone();
3894        params.client = client_b.client.clone();
3895        params.user_store = client_b.user_store.clone();
3896        params.project = project_b;
3897
3898        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3899        let editor_b = workspace_b
3900            .update(cx_b, |workspace, cx| {
3901                workspace.open_path((worktree_id, "main.rs"), true, cx)
3902            })
3903            .await
3904            .unwrap()
3905            .downcast::<Editor>()
3906            .unwrap();
3907
3908        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3909        fake_language_server
3910            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3911                assert_eq!(
3912                    params.text_document.uri,
3913                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3914                );
3915                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3916                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3917                Ok(None)
3918            })
3919            .next()
3920            .await;
3921
3922        // Move cursor to a location that contains code actions.
3923        editor_b.update(cx_b, |editor, cx| {
3924            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3925            cx.focus(&editor_b);
3926        });
3927
3928        fake_language_server
3929            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3930                assert_eq!(
3931                    params.text_document.uri,
3932                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3933                );
3934                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3935                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3936
3937                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
3938                    lsp::CodeAction {
3939                        title: "Inline into all callers".to_string(),
3940                        edit: Some(lsp::WorkspaceEdit {
3941                            changes: Some(
3942                                [
3943                                    (
3944                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3945                                        vec![lsp::TextEdit::new(
3946                                            lsp::Range::new(
3947                                                lsp::Position::new(1, 22),
3948                                                lsp::Position::new(1, 34),
3949                                            ),
3950                                            "4".to_string(),
3951                                        )],
3952                                    ),
3953                                    (
3954                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3955                                        vec![lsp::TextEdit::new(
3956                                            lsp::Range::new(
3957                                                lsp::Position::new(0, 0),
3958                                                lsp::Position::new(0, 27),
3959                                            ),
3960                                            "".to_string(),
3961                                        )],
3962                                    ),
3963                                ]
3964                                .into_iter()
3965                                .collect(),
3966                            ),
3967                            ..Default::default()
3968                        }),
3969                        data: Some(json!({
3970                            "codeActionParams": {
3971                                "range": {
3972                                    "start": {"line": 1, "column": 31},
3973                                    "end": {"line": 1, "column": 31},
3974                                }
3975                            }
3976                        })),
3977                        ..Default::default()
3978                    },
3979                )]))
3980            })
3981            .next()
3982            .await;
3983
3984        // Toggle code actions and wait for them to display.
3985        editor_b.update(cx_b, |editor, cx| {
3986            editor.toggle_code_actions(
3987                &ToggleCodeActions {
3988                    deployed_from_indicator: false,
3989                },
3990                cx,
3991            );
3992        });
3993        editor_b
3994            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3995            .await;
3996
3997        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3998
3999        // Confirming the code action will trigger a resolve request.
4000        let confirm_action = workspace_b
4001            .update(cx_b, |workspace, cx| {
4002                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4003            })
4004            .unwrap();
4005        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4006            |_, _| async move {
4007                Ok(lsp::CodeAction {
4008                    title: "Inline into all callers".to_string(),
4009                    edit: Some(lsp::WorkspaceEdit {
4010                        changes: Some(
4011                            [
4012                                (
4013                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4014                                    vec![lsp::TextEdit::new(
4015                                        lsp::Range::new(
4016                                            lsp::Position::new(1, 22),
4017                                            lsp::Position::new(1, 34),
4018                                        ),
4019                                        "4".to_string(),
4020                                    )],
4021                                ),
4022                                (
4023                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4024                                    vec![lsp::TextEdit::new(
4025                                        lsp::Range::new(
4026                                            lsp::Position::new(0, 0),
4027                                            lsp::Position::new(0, 27),
4028                                        ),
4029                                        "".to_string(),
4030                                    )],
4031                                ),
4032                            ]
4033                            .into_iter()
4034                            .collect(),
4035                        ),
4036                        ..Default::default()
4037                    }),
4038                    ..Default::default()
4039                })
4040            },
4041        );
4042
4043        // After the action is confirmed, an editor containing both modified files is opened.
4044        confirm_action.await.unwrap();
4045        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4046            workspace
4047                .active_item(cx)
4048                .unwrap()
4049                .downcast::<Editor>()
4050                .unwrap()
4051        });
4052        code_action_editor.update(cx_b, |editor, cx| {
4053            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4054            editor.undo(&Undo, cx);
4055            assert_eq!(
4056                editor.text(cx),
4057                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4058            );
4059            editor.redo(&Redo, cx);
4060            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4061        });
4062    }
4063
4064    #[gpui::test(iterations = 10)]
4065    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4066        cx_a.foreground().forbid_parking();
4067        let lang_registry = Arc::new(LanguageRegistry::test());
4068        let fs = FakeFs::new(cx_a.background());
4069        cx_b.update(|cx| editor::init(cx));
4070
4071        // Set up a fake language server.
4072        let mut language = Language::new(
4073            LanguageConfig {
4074                name: "Rust".into(),
4075                path_suffixes: vec!["rs".to_string()],
4076                ..Default::default()
4077            },
4078            Some(tree_sitter_rust::language()),
4079        );
4080        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4081            capabilities: lsp::ServerCapabilities {
4082                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4083                    prepare_provider: Some(true),
4084                    work_done_progress_options: Default::default(),
4085                })),
4086                ..Default::default()
4087            },
4088            ..Default::default()
4089        });
4090        lang_registry.add(Arc::new(language));
4091
4092        // Connect to a server as 2 clients.
4093        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4094        let client_a = server.create_client(cx_a, "user_a").await;
4095        let client_b = server.create_client(cx_b, "user_b").await;
4096
4097        // Share a project as client A
4098        fs.insert_tree(
4099            "/dir",
4100            json!({
4101                ".zed.toml": r#"collaborators = ["user_b"]"#,
4102                "one.rs": "const ONE: usize = 1;",
4103                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4104            }),
4105        )
4106        .await;
4107        let project_a = cx_a.update(|cx| {
4108            Project::local(
4109                client_a.clone(),
4110                client_a.user_store.clone(),
4111                lang_registry.clone(),
4112                fs.clone(),
4113                cx,
4114            )
4115        });
4116        let (worktree_a, _) = project_a
4117            .update(cx_a, |p, cx| {
4118                p.find_or_create_local_worktree("/dir", true, cx)
4119            })
4120            .await
4121            .unwrap();
4122        worktree_a
4123            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4124            .await;
4125        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4126        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4127        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4128
4129        // Join the worktree as client B.
4130        let project_b = Project::remote(
4131            project_id,
4132            client_b.clone(),
4133            client_b.user_store.clone(),
4134            lang_registry.clone(),
4135            fs.clone(),
4136            &mut cx_b.to_async(),
4137        )
4138        .await
4139        .unwrap();
4140        let mut params = cx_b.update(WorkspaceParams::test);
4141        params.languages = lang_registry.clone();
4142        params.client = client_b.client.clone();
4143        params.user_store = client_b.user_store.clone();
4144        params.project = project_b;
4145
4146        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4147        let editor_b = workspace_b
4148            .update(cx_b, |workspace, cx| {
4149                workspace.open_path((worktree_id, "one.rs"), true, cx)
4150            })
4151            .await
4152            .unwrap()
4153            .downcast::<Editor>()
4154            .unwrap();
4155        let fake_language_server = fake_language_servers.next().await.unwrap();
4156
4157        // Move cursor to a location that can be renamed.
4158        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4159            editor.select_ranges([7..7], None, cx);
4160            editor.rename(&Rename, cx).unwrap()
4161        });
4162
4163        fake_language_server
4164            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4165                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4166                assert_eq!(params.position, lsp::Position::new(0, 7));
4167                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4168                    lsp::Position::new(0, 6),
4169                    lsp::Position::new(0, 9),
4170                ))))
4171            })
4172            .next()
4173            .await
4174            .unwrap();
4175        prepare_rename.await.unwrap();
4176        editor_b.update(cx_b, |editor, cx| {
4177            let rename = editor.pending_rename().unwrap();
4178            let buffer = editor.buffer().read(cx).snapshot(cx);
4179            assert_eq!(
4180                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4181                6..9
4182            );
4183            rename.editor.update(cx, |rename_editor, cx| {
4184                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4185                    rename_buffer.edit([(0..3, "THREE")], cx);
4186                });
4187            });
4188        });
4189
4190        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4191            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4192        });
4193        fake_language_server
4194            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4195                assert_eq!(
4196                    params.text_document_position.text_document.uri.as_str(),
4197                    "file:///dir/one.rs"
4198                );
4199                assert_eq!(
4200                    params.text_document_position.position,
4201                    lsp::Position::new(0, 6)
4202                );
4203                assert_eq!(params.new_name, "THREE");
4204                Ok(Some(lsp::WorkspaceEdit {
4205                    changes: Some(
4206                        [
4207                            (
4208                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4209                                vec![lsp::TextEdit::new(
4210                                    lsp::Range::new(
4211                                        lsp::Position::new(0, 6),
4212                                        lsp::Position::new(0, 9),
4213                                    ),
4214                                    "THREE".to_string(),
4215                                )],
4216                            ),
4217                            (
4218                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4219                                vec![
4220                                    lsp::TextEdit::new(
4221                                        lsp::Range::new(
4222                                            lsp::Position::new(0, 24),
4223                                            lsp::Position::new(0, 27),
4224                                        ),
4225                                        "THREE".to_string(),
4226                                    ),
4227                                    lsp::TextEdit::new(
4228                                        lsp::Range::new(
4229                                            lsp::Position::new(0, 35),
4230                                            lsp::Position::new(0, 38),
4231                                        ),
4232                                        "THREE".to_string(),
4233                                    ),
4234                                ],
4235                            ),
4236                        ]
4237                        .into_iter()
4238                        .collect(),
4239                    ),
4240                    ..Default::default()
4241                }))
4242            })
4243            .next()
4244            .await
4245            .unwrap();
4246        confirm_rename.await.unwrap();
4247
4248        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4249            workspace
4250                .active_item(cx)
4251                .unwrap()
4252                .downcast::<Editor>()
4253                .unwrap()
4254        });
4255        rename_editor.update(cx_b, |editor, cx| {
4256            assert_eq!(
4257                editor.text(cx),
4258                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4259            );
4260            editor.undo(&Undo, cx);
4261            assert_eq!(
4262                editor.text(cx),
4263                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4264            );
4265            editor.redo(&Redo, cx);
4266            assert_eq!(
4267                editor.text(cx),
4268                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4269            );
4270        });
4271
4272        // Ensure temporary rename edits cannot be undone/redone.
4273        editor_b.update(cx_b, |editor, cx| {
4274            editor.undo(&Undo, cx);
4275            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4276            editor.undo(&Undo, cx);
4277            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4278            editor.redo(&Redo, cx);
4279            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4280        })
4281    }
4282
4283    #[gpui::test(iterations = 10)]
4284    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4285        cx_a.foreground().forbid_parking();
4286
4287        // Connect to a server as 2 clients.
4288        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4289        let client_a = server.create_client(cx_a, "user_a").await;
4290        let client_b = server.create_client(cx_b, "user_b").await;
4291
4292        // Create an org that includes these 2 users.
4293        let db = &server.app_state.db;
4294        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4295        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4296            .await
4297            .unwrap();
4298        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4299            .await
4300            .unwrap();
4301
4302        // Create a channel that includes all the users.
4303        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4304        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4305            .await
4306            .unwrap();
4307        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4308            .await
4309            .unwrap();
4310        db.create_channel_message(
4311            channel_id,
4312            client_b.current_user_id(&cx_b),
4313            "hello A, it's B.",
4314            OffsetDateTime::now_utc(),
4315            1,
4316        )
4317        .await
4318        .unwrap();
4319
4320        let channels_a = cx_a
4321            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4322        channels_a
4323            .condition(cx_a, |list, _| list.available_channels().is_some())
4324            .await;
4325        channels_a.read_with(cx_a, |list, _| {
4326            assert_eq!(
4327                list.available_channels().unwrap(),
4328                &[ChannelDetails {
4329                    id: channel_id.to_proto(),
4330                    name: "test-channel".to_string()
4331                }]
4332            )
4333        });
4334        let channel_a = channels_a.update(cx_a, |this, cx| {
4335            this.get_channel(channel_id.to_proto(), cx).unwrap()
4336        });
4337        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4338        channel_a
4339            .condition(&cx_a, |channel, _| {
4340                channel_messages(channel)
4341                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4342            })
4343            .await;
4344
4345        let channels_b = cx_b
4346            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4347        channels_b
4348            .condition(cx_b, |list, _| list.available_channels().is_some())
4349            .await;
4350        channels_b.read_with(cx_b, |list, _| {
4351            assert_eq!(
4352                list.available_channels().unwrap(),
4353                &[ChannelDetails {
4354                    id: channel_id.to_proto(),
4355                    name: "test-channel".to_string()
4356                }]
4357            )
4358        });
4359
4360        let channel_b = channels_b.update(cx_b, |this, cx| {
4361            this.get_channel(channel_id.to_proto(), cx).unwrap()
4362        });
4363        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4364        channel_b
4365            .condition(&cx_b, |channel, _| {
4366                channel_messages(channel)
4367                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4368            })
4369            .await;
4370
4371        channel_a
4372            .update(cx_a, |channel, cx| {
4373                channel
4374                    .send_message("oh, hi B.".to_string(), cx)
4375                    .unwrap()
4376                    .detach();
4377                let task = channel.send_message("sup".to_string(), cx).unwrap();
4378                assert_eq!(
4379                    channel_messages(channel),
4380                    &[
4381                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4382                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4383                        ("user_a".to_string(), "sup".to_string(), true)
4384                    ]
4385                );
4386                task
4387            })
4388            .await
4389            .unwrap();
4390
4391        channel_b
4392            .condition(&cx_b, |channel, _| {
4393                channel_messages(channel)
4394                    == [
4395                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4396                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4397                        ("user_a".to_string(), "sup".to_string(), false),
4398                    ]
4399            })
4400            .await;
4401
4402        assert_eq!(
4403            server
4404                .state()
4405                .await
4406                .channel(channel_id)
4407                .unwrap()
4408                .connection_ids
4409                .len(),
4410            2
4411        );
4412        cx_b.update(|_| drop(channel_b));
4413        server
4414            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4415            .await;
4416
4417        cx_a.update(|_| drop(channel_a));
4418        server
4419            .condition(|state| state.channel(channel_id).is_none())
4420            .await;
4421    }
4422
4423    #[gpui::test(iterations = 10)]
4424    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4425        cx_a.foreground().forbid_parking();
4426
4427        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4428        let client_a = server.create_client(cx_a, "user_a").await;
4429
4430        let db = &server.app_state.db;
4431        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4432        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4433        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4434            .await
4435            .unwrap();
4436        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4437            .await
4438            .unwrap();
4439
4440        let channels_a = cx_a
4441            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4442        channels_a
4443            .condition(cx_a, |list, _| list.available_channels().is_some())
4444            .await;
4445        let channel_a = channels_a.update(cx_a, |this, cx| {
4446            this.get_channel(channel_id.to_proto(), cx).unwrap()
4447        });
4448
4449        // Messages aren't allowed to be too long.
4450        channel_a
4451            .update(cx_a, |channel, cx| {
4452                let long_body = "this is long.\n".repeat(1024);
4453                channel.send_message(long_body, cx).unwrap()
4454            })
4455            .await
4456            .unwrap_err();
4457
4458        // Messages aren't allowed to be blank.
4459        channel_a.update(cx_a, |channel, cx| {
4460            channel.send_message(String::new(), cx).unwrap_err()
4461        });
4462
4463        // Leading and trailing whitespace are trimmed.
4464        channel_a
4465            .update(cx_a, |channel, cx| {
4466                channel
4467                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4468                    .unwrap()
4469            })
4470            .await
4471            .unwrap();
4472        assert_eq!(
4473            db.get_channel_messages(channel_id, 10, None)
4474                .await
4475                .unwrap()
4476                .iter()
4477                .map(|m| &m.body)
4478                .collect::<Vec<_>>(),
4479            &["surrounded by whitespace"]
4480        );
4481    }
4482
4483    #[gpui::test(iterations = 10)]
4484    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4485        cx_a.foreground().forbid_parking();
4486
4487        // Connect to a server as 2 clients.
4488        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4489        let client_a = server.create_client(cx_a, "user_a").await;
4490        let client_b = server.create_client(cx_b, "user_b").await;
4491        let mut status_b = client_b.status();
4492
4493        // Create an org that includes these 2 users.
4494        let db = &server.app_state.db;
4495        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4496        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4497            .await
4498            .unwrap();
4499        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4500            .await
4501            .unwrap();
4502
4503        // Create a channel that includes all the users.
4504        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4505        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4506            .await
4507            .unwrap();
4508        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4509            .await
4510            .unwrap();
4511        db.create_channel_message(
4512            channel_id,
4513            client_b.current_user_id(&cx_b),
4514            "hello A, it's B.",
4515            OffsetDateTime::now_utc(),
4516            2,
4517        )
4518        .await
4519        .unwrap();
4520
4521        let channels_a = cx_a
4522            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4523        channels_a
4524            .condition(cx_a, |list, _| list.available_channels().is_some())
4525            .await;
4526
4527        channels_a.read_with(cx_a, |list, _| {
4528            assert_eq!(
4529                list.available_channels().unwrap(),
4530                &[ChannelDetails {
4531                    id: channel_id.to_proto(),
4532                    name: "test-channel".to_string()
4533                }]
4534            )
4535        });
4536        let channel_a = channels_a.update(cx_a, |this, cx| {
4537            this.get_channel(channel_id.to_proto(), cx).unwrap()
4538        });
4539        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4540        channel_a
4541            .condition(&cx_a, |channel, _| {
4542                channel_messages(channel)
4543                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4544            })
4545            .await;
4546
4547        let channels_b = cx_b
4548            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4549        channels_b
4550            .condition(cx_b, |list, _| list.available_channels().is_some())
4551            .await;
4552        channels_b.read_with(cx_b, |list, _| {
4553            assert_eq!(
4554                list.available_channels().unwrap(),
4555                &[ChannelDetails {
4556                    id: channel_id.to_proto(),
4557                    name: "test-channel".to_string()
4558                }]
4559            )
4560        });
4561
4562        let channel_b = channels_b.update(cx_b, |this, cx| {
4563            this.get_channel(channel_id.to_proto(), cx).unwrap()
4564        });
4565        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4566        channel_b
4567            .condition(&cx_b, |channel, _| {
4568                channel_messages(channel)
4569                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4570            })
4571            .await;
4572
4573        // Disconnect client B, ensuring we can still access its cached channel data.
4574        server.forbid_connections();
4575        server.disconnect_client(client_b.current_user_id(&cx_b));
4576        cx_b.foreground().advance_clock(Duration::from_secs(3));
4577        while !matches!(
4578            status_b.next().await,
4579            Some(client::Status::ReconnectionError { .. })
4580        ) {}
4581
4582        channels_b.read_with(cx_b, |channels, _| {
4583            assert_eq!(
4584                channels.available_channels().unwrap(),
4585                [ChannelDetails {
4586                    id: channel_id.to_proto(),
4587                    name: "test-channel".to_string()
4588                }]
4589            )
4590        });
4591        channel_b.read_with(cx_b, |channel, _| {
4592            assert_eq!(
4593                channel_messages(channel),
4594                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4595            )
4596        });
4597
4598        // Send a message from client B while it is disconnected.
4599        channel_b
4600            .update(cx_b, |channel, cx| {
4601                let task = channel
4602                    .send_message("can you see this?".to_string(), cx)
4603                    .unwrap();
4604                assert_eq!(
4605                    channel_messages(channel),
4606                    &[
4607                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4608                        ("user_b".to_string(), "can you see this?".to_string(), true)
4609                    ]
4610                );
4611                task
4612            })
4613            .await
4614            .unwrap_err();
4615
4616        // Send a message from client A while B is disconnected.
4617        channel_a
4618            .update(cx_a, |channel, cx| {
4619                channel
4620                    .send_message("oh, hi B.".to_string(), cx)
4621                    .unwrap()
4622                    .detach();
4623                let task = channel.send_message("sup".to_string(), cx).unwrap();
4624                assert_eq!(
4625                    channel_messages(channel),
4626                    &[
4627                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4628                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4629                        ("user_a".to_string(), "sup".to_string(), true)
4630                    ]
4631                );
4632                task
4633            })
4634            .await
4635            .unwrap();
4636
4637        // Give client B a chance to reconnect.
4638        server.allow_connections();
4639        cx_b.foreground().advance_clock(Duration::from_secs(10));
4640
4641        // Verify that B sees the new messages upon reconnection, as well as the message client B
4642        // sent while offline.
4643        channel_b
4644            .condition(&cx_b, |channel, _| {
4645                channel_messages(channel)
4646                    == [
4647                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4648                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4649                        ("user_a".to_string(), "sup".to_string(), false),
4650                        ("user_b".to_string(), "can you see this?".to_string(), false),
4651                    ]
4652            })
4653            .await;
4654
4655        // Ensure client A and B can communicate normally after reconnection.
4656        channel_a
4657            .update(cx_a, |channel, cx| {
4658                channel.send_message("you online?".to_string(), cx).unwrap()
4659            })
4660            .await
4661            .unwrap();
4662        channel_b
4663            .condition(&cx_b, |channel, _| {
4664                channel_messages(channel)
4665                    == [
4666                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4667                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4668                        ("user_a".to_string(), "sup".to_string(), false),
4669                        ("user_b".to_string(), "can you see this?".to_string(), false),
4670                        ("user_a".to_string(), "you online?".to_string(), false),
4671                    ]
4672            })
4673            .await;
4674
4675        channel_b
4676            .update(cx_b, |channel, cx| {
4677                channel.send_message("yep".to_string(), cx).unwrap()
4678            })
4679            .await
4680            .unwrap();
4681        channel_a
4682            .condition(&cx_a, |channel, _| {
4683                channel_messages(channel)
4684                    == [
4685                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4686                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4687                        ("user_a".to_string(), "sup".to_string(), false),
4688                        ("user_b".to_string(), "can you see this?".to_string(), false),
4689                        ("user_a".to_string(), "you online?".to_string(), false),
4690                        ("user_b".to_string(), "yep".to_string(), false),
4691                    ]
4692            })
4693            .await;
4694    }
4695
4696    #[gpui::test(iterations = 10)]
4697    async fn test_contacts(
4698        cx_a: &mut TestAppContext,
4699        cx_b: &mut TestAppContext,
4700        cx_c: &mut TestAppContext,
4701    ) {
4702        cx_a.foreground().forbid_parking();
4703        let lang_registry = Arc::new(LanguageRegistry::test());
4704        let fs = FakeFs::new(cx_a.background());
4705
4706        // Connect to a server as 3 clients.
4707        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4708        let client_a = server.create_client(cx_a, "user_a").await;
4709        let client_b = server.create_client(cx_b, "user_b").await;
4710        let client_c = server.create_client(cx_c, "user_c").await;
4711
4712        // Share a worktree as client A.
4713        fs.insert_tree(
4714            "/a",
4715            json!({
4716                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4717            }),
4718        )
4719        .await;
4720
4721        let project_a = cx_a.update(|cx| {
4722            Project::local(
4723                client_a.clone(),
4724                client_a.user_store.clone(),
4725                lang_registry.clone(),
4726                fs.clone(),
4727                cx,
4728            )
4729        });
4730        let (worktree_a, _) = project_a
4731            .update(cx_a, |p, cx| {
4732                p.find_or_create_local_worktree("/a", true, cx)
4733            })
4734            .await
4735            .unwrap();
4736        worktree_a
4737            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4738            .await;
4739
4740        client_a
4741            .user_store
4742            .condition(&cx_a, |user_store, _| {
4743                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4744            })
4745            .await;
4746        client_b
4747            .user_store
4748            .condition(&cx_b, |user_store, _| {
4749                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4750            })
4751            .await;
4752        client_c
4753            .user_store
4754            .condition(&cx_c, |user_store, _| {
4755                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4756            })
4757            .await;
4758
4759        let project_id = project_a
4760            .update(cx_a, |project, _| project.next_remote_id())
4761            .await;
4762        project_a
4763            .update(cx_a, |project, cx| project.share(cx))
4764            .await
4765            .unwrap();
4766        client_a
4767            .user_store
4768            .condition(&cx_a, |user_store, _| {
4769                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4770            })
4771            .await;
4772        client_b
4773            .user_store
4774            .condition(&cx_b, |user_store, _| {
4775                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4776            })
4777            .await;
4778        client_c
4779            .user_store
4780            .condition(&cx_c, |user_store, _| {
4781                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4782            })
4783            .await;
4784
4785        let _project_b = Project::remote(
4786            project_id,
4787            client_b.clone(),
4788            client_b.user_store.clone(),
4789            lang_registry.clone(),
4790            fs.clone(),
4791            &mut cx_b.to_async(),
4792        )
4793        .await
4794        .unwrap();
4795
4796        client_a
4797            .user_store
4798            .condition(&cx_a, |user_store, _| {
4799                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4800            })
4801            .await;
4802        client_b
4803            .user_store
4804            .condition(&cx_b, |user_store, _| {
4805                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4806            })
4807            .await;
4808        client_c
4809            .user_store
4810            .condition(&cx_c, |user_store, _| {
4811                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4812            })
4813            .await;
4814
4815        project_a
4816            .condition(&cx_a, |project, _| {
4817                project.collaborators().contains_key(&client_b.peer_id)
4818            })
4819            .await;
4820
4821        cx_a.update(move |_| drop(project_a));
4822        client_a
4823            .user_store
4824            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4825            .await;
4826        client_b
4827            .user_store
4828            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4829            .await;
4830        client_c
4831            .user_store
4832            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4833            .await;
4834
4835        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
4836            user_store
4837                .contacts()
4838                .iter()
4839                .map(|contact| {
4840                    let worktrees = contact
4841                        .projects
4842                        .iter()
4843                        .map(|p| {
4844                            (
4845                                p.worktree_root_names[0].as_str(),
4846                                p.is_shared,
4847                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4848                            )
4849                        })
4850                        .collect();
4851                    (contact.user.github_login.as_str(), worktrees)
4852                })
4853                .collect()
4854        }
4855    }
4856
4857    #[gpui::test(iterations = 10)]
4858    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4859        cx_a.foreground().forbid_parking();
4860        let fs = FakeFs::new(cx_a.background());
4861
4862        // 2 clients connect to a server.
4863        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4864        let mut client_a = server.create_client(cx_a, "user_a").await;
4865        let mut client_b = server.create_client(cx_b, "user_b").await;
4866        cx_a.update(editor::init);
4867        cx_b.update(editor::init);
4868
4869        // Client A shares a project.
4870        fs.insert_tree(
4871            "/a",
4872            json!({
4873                ".zed.toml": r#"collaborators = ["user_b"]"#,
4874                "1.txt": "one",
4875                "2.txt": "two",
4876                "3.txt": "three",
4877            }),
4878        )
4879        .await;
4880        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4881        project_a
4882            .update(cx_a, |project, cx| project.share(cx))
4883            .await
4884            .unwrap();
4885
4886        // Client B joins the project.
4887        let project_b = client_b
4888            .build_remote_project(
4889                project_a
4890                    .read_with(cx_a, |project, _| project.remote_id())
4891                    .unwrap(),
4892                cx_b,
4893            )
4894            .await;
4895
4896        // Client A opens some editors.
4897        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4898        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4899        let editor_a1 = workspace_a
4900            .update(cx_a, |workspace, cx| {
4901                workspace.open_path((worktree_id, "1.txt"), true, cx)
4902            })
4903            .await
4904            .unwrap()
4905            .downcast::<Editor>()
4906            .unwrap();
4907        let editor_a2 = workspace_a
4908            .update(cx_a, |workspace, cx| {
4909                workspace.open_path((worktree_id, "2.txt"), true, cx)
4910            })
4911            .await
4912            .unwrap()
4913            .downcast::<Editor>()
4914            .unwrap();
4915
4916        // Client B opens an editor.
4917        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4918        let editor_b1 = workspace_b
4919            .update(cx_b, |workspace, cx| {
4920                workspace.open_path((worktree_id, "1.txt"), true, cx)
4921            })
4922            .await
4923            .unwrap()
4924            .downcast::<Editor>()
4925            .unwrap();
4926
4927        let client_a_id = project_b.read_with(cx_b, |project, _| {
4928            project.collaborators().values().next().unwrap().peer_id
4929        });
4930        let client_b_id = project_a.read_with(cx_a, |project, _| {
4931            project.collaborators().values().next().unwrap().peer_id
4932        });
4933
4934        // When client B starts following client A, all visible view states are replicated to client B.
4935        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4936        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4937        workspace_b
4938            .update(cx_b, |workspace, cx| {
4939                workspace
4940                    .toggle_follow(&ToggleFollow(client_a_id), cx)
4941                    .unwrap()
4942            })
4943            .await
4944            .unwrap();
4945        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4946            workspace
4947                .active_item(cx)
4948                .unwrap()
4949                .downcast::<Editor>()
4950                .unwrap()
4951        });
4952        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4953        assert_eq!(
4954            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4955            Some((worktree_id, "2.txt").into())
4956        );
4957        assert_eq!(
4958            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4959            vec![2..3]
4960        );
4961        assert_eq!(
4962            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4963            vec![0..1]
4964        );
4965
4966        // When client A activates a different editor, client B does so as well.
4967        workspace_a.update(cx_a, |workspace, cx| {
4968            workspace.activate_item(&editor_a1, cx)
4969        });
4970        workspace_b
4971            .condition(cx_b, |workspace, cx| {
4972                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4973            })
4974            .await;
4975
4976        // When client A navigates back and forth, client B does so as well.
4977        workspace_a
4978            .update(cx_a, |workspace, cx| {
4979                workspace::Pane::go_back(workspace, None, cx)
4980            })
4981            .await;
4982        workspace_b
4983            .condition(cx_b, |workspace, cx| {
4984                workspace.active_item(cx).unwrap().id() == editor_b2.id()
4985            })
4986            .await;
4987
4988        workspace_a
4989            .update(cx_a, |workspace, cx| {
4990                workspace::Pane::go_forward(workspace, None, cx)
4991            })
4992            .await;
4993        workspace_b
4994            .condition(cx_b, |workspace, cx| {
4995                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4996            })
4997            .await;
4998
4999        // Changes to client A's editor are reflected on client B.
5000        editor_a1.update(cx_a, |editor, cx| {
5001            editor.select_ranges([1..1, 2..2], None, cx);
5002        });
5003        editor_b1
5004            .condition(cx_b, |editor, cx| {
5005                editor.selected_ranges(cx) == vec![1..1, 2..2]
5006            })
5007            .await;
5008
5009        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5010        editor_b1
5011            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5012            .await;
5013
5014        editor_a1.update(cx_a, |editor, cx| {
5015            editor.select_ranges([3..3], None, cx);
5016            editor.set_scroll_position(vec2f(0., 100.), cx);
5017        });
5018        editor_b1
5019            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5020            .await;
5021
5022        // After unfollowing, client B stops receiving updates from client A.
5023        workspace_b.update(cx_b, |workspace, cx| {
5024            workspace.unfollow(&workspace.active_pane().clone(), cx)
5025        });
5026        workspace_a.update(cx_a, |workspace, cx| {
5027            workspace.activate_item(&editor_a2, cx)
5028        });
5029        cx_a.foreground().run_until_parked();
5030        assert_eq!(
5031            workspace_b.read_with(cx_b, |workspace, cx| workspace
5032                .active_item(cx)
5033                .unwrap()
5034                .id()),
5035            editor_b1.id()
5036        );
5037
5038        // Client A starts following client B.
5039        workspace_a
5040            .update(cx_a, |workspace, cx| {
5041                workspace
5042                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5043                    .unwrap()
5044            })
5045            .await
5046            .unwrap();
5047        assert_eq!(
5048            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5049            Some(client_b_id)
5050        );
5051        assert_eq!(
5052            workspace_a.read_with(cx_a, |workspace, cx| workspace
5053                .active_item(cx)
5054                .unwrap()
5055                .id()),
5056            editor_a1.id()
5057        );
5058
5059        // Following interrupts when client B disconnects.
5060        client_b.disconnect(&cx_b.to_async()).unwrap();
5061        cx_a.foreground().run_until_parked();
5062        assert_eq!(
5063            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5064            None
5065        );
5066    }
5067
5068    #[gpui::test(iterations = 10)]
5069    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5070        cx_a.foreground().forbid_parking();
5071        let fs = FakeFs::new(cx_a.background());
5072
5073        // 2 clients connect to a server.
5074        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5075        let mut client_a = server.create_client(cx_a, "user_a").await;
5076        let mut client_b = server.create_client(cx_b, "user_b").await;
5077        cx_a.update(editor::init);
5078        cx_b.update(editor::init);
5079
5080        // Client A shares a project.
5081        fs.insert_tree(
5082            "/a",
5083            json!({
5084                ".zed.toml": r#"collaborators = ["user_b"]"#,
5085                "1.txt": "one",
5086                "2.txt": "two",
5087                "3.txt": "three",
5088                "4.txt": "four",
5089            }),
5090        )
5091        .await;
5092        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5093        project_a
5094            .update(cx_a, |project, cx| project.share(cx))
5095            .await
5096            .unwrap();
5097
5098        // Client B joins the project.
5099        let project_b = client_b
5100            .build_remote_project(
5101                project_a
5102                    .read_with(cx_a, |project, _| project.remote_id())
5103                    .unwrap(),
5104                cx_b,
5105            )
5106            .await;
5107
5108        // Client A opens some editors.
5109        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5110        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5111        let _editor_a1 = workspace_a
5112            .update(cx_a, |workspace, cx| {
5113                workspace.open_path((worktree_id, "1.txt"), true, cx)
5114            })
5115            .await
5116            .unwrap()
5117            .downcast::<Editor>()
5118            .unwrap();
5119
5120        // Client B opens an editor.
5121        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5122        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5123        let _editor_b1 = workspace_b
5124            .update(cx_b, |workspace, cx| {
5125                workspace.open_path((worktree_id, "2.txt"), true, cx)
5126            })
5127            .await
5128            .unwrap()
5129            .downcast::<Editor>()
5130            .unwrap();
5131
5132        // Clients A and B follow each other in split panes
5133        workspace_a
5134            .update(cx_a, |workspace, cx| {
5135                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5136                assert_ne!(*workspace.active_pane(), pane_a1);
5137                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5138                workspace
5139                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5140                    .unwrap()
5141            })
5142            .await
5143            .unwrap();
5144        workspace_b
5145            .update(cx_b, |workspace, cx| {
5146                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5147                assert_ne!(*workspace.active_pane(), pane_b1);
5148                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5149                workspace
5150                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5151                    .unwrap()
5152            })
5153            .await
5154            .unwrap();
5155
5156        workspace_a
5157            .update(cx_a, |workspace, cx| {
5158                workspace.activate_next_pane(cx);
5159                assert_eq!(*workspace.active_pane(), pane_a1);
5160                workspace.open_path((worktree_id, "3.txt"), true, cx)
5161            })
5162            .await
5163            .unwrap();
5164        workspace_b
5165            .update(cx_b, |workspace, cx| {
5166                workspace.activate_next_pane(cx);
5167                assert_eq!(*workspace.active_pane(), pane_b1);
5168                workspace.open_path((worktree_id, "4.txt"), true, cx)
5169            })
5170            .await
5171            .unwrap();
5172        cx_a.foreground().run_until_parked();
5173
5174        // Ensure leader updates don't change the active pane of followers
5175        workspace_a.read_with(cx_a, |workspace, _| {
5176            assert_eq!(*workspace.active_pane(), pane_a1);
5177        });
5178        workspace_b.read_with(cx_b, |workspace, _| {
5179            assert_eq!(*workspace.active_pane(), pane_b1);
5180        });
5181
5182        // Ensure peers following each other doesn't cause an infinite loop.
5183        assert_eq!(
5184            workspace_a.read_with(cx_a, |workspace, cx| workspace
5185                .active_item(cx)
5186                .unwrap()
5187                .project_path(cx)),
5188            Some((worktree_id, "3.txt").into())
5189        );
5190        workspace_a.update(cx_a, |workspace, cx| {
5191            assert_eq!(
5192                workspace.active_item(cx).unwrap().project_path(cx),
5193                Some((worktree_id, "3.txt").into())
5194            );
5195            workspace.activate_next_pane(cx);
5196            assert_eq!(
5197                workspace.active_item(cx).unwrap().project_path(cx),
5198                Some((worktree_id, "4.txt").into())
5199            );
5200        });
5201        workspace_b.update(cx_b, |workspace, cx| {
5202            assert_eq!(
5203                workspace.active_item(cx).unwrap().project_path(cx),
5204                Some((worktree_id, "4.txt").into())
5205            );
5206            workspace.activate_next_pane(cx);
5207            assert_eq!(
5208                workspace.active_item(cx).unwrap().project_path(cx),
5209                Some((worktree_id, "3.txt").into())
5210            );
5211        });
5212    }
5213
5214    #[gpui::test(iterations = 10)]
5215    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5216        cx_a.foreground().forbid_parking();
5217        let fs = FakeFs::new(cx_a.background());
5218
5219        // 2 clients connect to a server.
5220        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5221        let mut client_a = server.create_client(cx_a, "user_a").await;
5222        let mut client_b = server.create_client(cx_b, "user_b").await;
5223        cx_a.update(editor::init);
5224        cx_b.update(editor::init);
5225
5226        // Client A shares a project.
5227        fs.insert_tree(
5228            "/a",
5229            json!({
5230                ".zed.toml": r#"collaborators = ["user_b"]"#,
5231                "1.txt": "one",
5232                "2.txt": "two",
5233                "3.txt": "three",
5234            }),
5235        )
5236        .await;
5237        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5238        project_a
5239            .update(cx_a, |project, cx| project.share(cx))
5240            .await
5241            .unwrap();
5242
5243        // Client B joins the project.
5244        let project_b = client_b
5245            .build_remote_project(
5246                project_a
5247                    .read_with(cx_a, |project, _| project.remote_id())
5248                    .unwrap(),
5249                cx_b,
5250            )
5251            .await;
5252
5253        // Client A opens some editors.
5254        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5255        let _editor_a1 = workspace_a
5256            .update(cx_a, |workspace, cx| {
5257                workspace.open_path((worktree_id, "1.txt"), true, cx)
5258            })
5259            .await
5260            .unwrap()
5261            .downcast::<Editor>()
5262            .unwrap();
5263
5264        // Client B starts following client A.
5265        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5266        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5267        let leader_id = project_b.read_with(cx_b, |project, _| {
5268            project.collaborators().values().next().unwrap().peer_id
5269        });
5270        workspace_b
5271            .update(cx_b, |workspace, cx| {
5272                workspace
5273                    .toggle_follow(&ToggleFollow(leader_id), cx)
5274                    .unwrap()
5275            })
5276            .await
5277            .unwrap();
5278        assert_eq!(
5279            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5280            Some(leader_id)
5281        );
5282        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5283            workspace
5284                .active_item(cx)
5285                .unwrap()
5286                .downcast::<Editor>()
5287                .unwrap()
5288        });
5289
5290        // When client B moves, it automatically stops following client A.
5291        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5292        assert_eq!(
5293            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5294            None
5295        );
5296
5297        workspace_b
5298            .update(cx_b, |workspace, cx| {
5299                workspace
5300                    .toggle_follow(&ToggleFollow(leader_id), cx)
5301                    .unwrap()
5302            })
5303            .await
5304            .unwrap();
5305        assert_eq!(
5306            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5307            Some(leader_id)
5308        );
5309
5310        // When client B edits, it automatically stops following client A.
5311        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5312        assert_eq!(
5313            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5314            None
5315        );
5316
5317        workspace_b
5318            .update(cx_b, |workspace, cx| {
5319                workspace
5320                    .toggle_follow(&ToggleFollow(leader_id), cx)
5321                    .unwrap()
5322            })
5323            .await
5324            .unwrap();
5325        assert_eq!(
5326            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5327            Some(leader_id)
5328        );
5329
5330        // When client B scrolls, it automatically stops following client A.
5331        editor_b2.update(cx_b, |editor, cx| {
5332            editor.set_scroll_position(vec2f(0., 3.), cx)
5333        });
5334        assert_eq!(
5335            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5336            None
5337        );
5338
5339        workspace_b
5340            .update(cx_b, |workspace, cx| {
5341                workspace
5342                    .toggle_follow(&ToggleFollow(leader_id), cx)
5343                    .unwrap()
5344            })
5345            .await
5346            .unwrap();
5347        assert_eq!(
5348            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5349            Some(leader_id)
5350        );
5351
5352        // When client B activates a different pane, it continues following client A in the original pane.
5353        workspace_b.update(cx_b, |workspace, cx| {
5354            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5355        });
5356        assert_eq!(
5357            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5358            Some(leader_id)
5359        );
5360
5361        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5362        assert_eq!(
5363            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5364            Some(leader_id)
5365        );
5366
5367        // When client B activates a different item in the original pane, it automatically stops following client A.
5368        workspace_b
5369            .update(cx_b, |workspace, cx| {
5370                workspace.open_path((worktree_id, "2.txt"), true, cx)
5371            })
5372            .await
5373            .unwrap();
5374        assert_eq!(
5375            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5376            None
5377        );
5378    }
5379
5380    #[gpui::test(iterations = 100)]
5381    async fn test_random_collaboration(
5382        cx: &mut TestAppContext,
5383        deterministic: Arc<Deterministic>,
5384        rng: StdRng,
5385    ) {
5386        cx.foreground().forbid_parking();
5387        let max_peers = env::var("MAX_PEERS")
5388            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5389            .unwrap_or(5);
5390        assert!(max_peers <= 5);
5391
5392        let max_operations = env::var("OPERATIONS")
5393            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5394            .unwrap_or(10);
5395
5396        let rng = Arc::new(Mutex::new(rng));
5397
5398        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5399        let host_language_registry = Arc::new(LanguageRegistry::test());
5400
5401        let fs = FakeFs::new(cx.background());
5402        fs.insert_tree(
5403            "/_collab",
5404            json!({
5405                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5406            }),
5407        )
5408        .await;
5409
5410        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5411        let mut clients = Vec::new();
5412        let mut user_ids = Vec::new();
5413        let mut op_start_signals = Vec::new();
5414        let files = Arc::new(Mutex::new(Vec::new()));
5415
5416        let mut next_entity_id = 100000;
5417        let mut host_cx = TestAppContext::new(
5418            cx.foreground_platform(),
5419            cx.platform(),
5420            deterministic.build_foreground(next_entity_id),
5421            deterministic.build_background(),
5422            cx.font_cache(),
5423            cx.leak_detector(),
5424            next_entity_id,
5425        );
5426        let host = server.create_client(&mut host_cx, "host").await;
5427        let host_project = host_cx.update(|cx| {
5428            Project::local(
5429                host.client.clone(),
5430                host.user_store.clone(),
5431                host_language_registry.clone(),
5432                fs.clone(),
5433                cx,
5434            )
5435        });
5436        let host_project_id = host_project
5437            .update(&mut host_cx, |p, _| p.next_remote_id())
5438            .await;
5439
5440        let (collab_worktree, _) = host_project
5441            .update(&mut host_cx, |project, cx| {
5442                project.find_or_create_local_worktree("/_collab", true, cx)
5443            })
5444            .await
5445            .unwrap();
5446        collab_worktree
5447            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5448            .await;
5449        host_project
5450            .update(&mut host_cx, |project, cx| project.share(cx))
5451            .await
5452            .unwrap();
5453
5454        // Set up fake language servers.
5455        let mut language = Language::new(
5456            LanguageConfig {
5457                name: "Rust".into(),
5458                path_suffixes: vec!["rs".to_string()],
5459                ..Default::default()
5460            },
5461            None,
5462        );
5463        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5464            name: "the-fake-language-server",
5465            capabilities: lsp::LanguageServer::full_capabilities(),
5466            initializer: Some(Box::new({
5467                let rng = rng.clone();
5468                let files = files.clone();
5469                let project = host_project.downgrade();
5470                move |fake_server: &mut FakeLanguageServer| {
5471                    fake_server.handle_request::<lsp::request::Completion, _, _>(
5472                        |_, _| async move {
5473                            Ok(Some(lsp::CompletionResponse::Array(vec![
5474                                lsp::CompletionItem {
5475                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5476                                        range: lsp::Range::new(
5477                                            lsp::Position::new(0, 0),
5478                                            lsp::Position::new(0, 0),
5479                                        ),
5480                                        new_text: "the-new-text".to_string(),
5481                                    })),
5482                                    ..Default::default()
5483                                },
5484                            ])))
5485                        },
5486                    );
5487
5488                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5489                        |_, _| async move {
5490                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5491                                lsp::CodeAction {
5492                                    title: "the-code-action".to_string(),
5493                                    ..Default::default()
5494                                },
5495                            )]))
5496                        },
5497                    );
5498
5499                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5500                        |params, _| async move {
5501                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5502                                params.position,
5503                                params.position,
5504                            ))))
5505                        },
5506                    );
5507
5508                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5509                        let files = files.clone();
5510                        let rng = rng.clone();
5511                        move |_, _| {
5512                            let files = files.clone();
5513                            let rng = rng.clone();
5514                            async move {
5515                                let files = files.lock();
5516                                let mut rng = rng.lock();
5517                                let count = rng.gen_range::<usize, _>(1..3);
5518                                let files = (0..count)
5519                                    .map(|_| files.choose(&mut *rng).unwrap())
5520                                    .collect::<Vec<_>>();
5521                                log::info!("LSP: Returning definitions in files {:?}", &files);
5522                                Ok(Some(lsp::GotoDefinitionResponse::Array(
5523                                    files
5524                                        .into_iter()
5525                                        .map(|file| lsp::Location {
5526                                            uri: lsp::Url::from_file_path(file).unwrap(),
5527                                            range: Default::default(),
5528                                        })
5529                                        .collect(),
5530                                )))
5531                            }
5532                        }
5533                    });
5534
5535                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5536                        let rng = rng.clone();
5537                        let project = project.clone();
5538                        move |params, mut cx| {
5539                            let highlights = if let Some(project) = project.upgrade(&cx) {
5540                                project.update(&mut cx, |project, cx| {
5541                                    let path = params
5542                                        .text_document_position_params
5543                                        .text_document
5544                                        .uri
5545                                        .to_file_path()
5546                                        .unwrap();
5547                                    let (worktree, relative_path) =
5548                                        project.find_local_worktree(&path, cx)?;
5549                                    let project_path =
5550                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
5551                                    let buffer =
5552                                        project.get_open_buffer(&project_path, cx)?.read(cx);
5553
5554                                    let mut highlights = Vec::new();
5555                                    let highlight_count = rng.lock().gen_range(1..=5);
5556                                    let mut prev_end = 0;
5557                                    for _ in 0..highlight_count {
5558                                        let range =
5559                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
5560
5561                                        highlights.push(lsp::DocumentHighlight {
5562                                            range: range_to_lsp(range.to_point_utf16(buffer)),
5563                                            kind: Some(lsp::DocumentHighlightKind::READ),
5564                                        });
5565                                        prev_end = range.end;
5566                                    }
5567                                    Some(highlights)
5568                                })
5569                            } else {
5570                                None
5571                            };
5572                            async move { Ok(highlights) }
5573                        }
5574                    });
5575                }
5576            })),
5577            ..Default::default()
5578        });
5579        host_language_registry.add(Arc::new(language));
5580
5581        let op_start_signal = futures::channel::mpsc::unbounded();
5582        user_ids.push(host.current_user_id(&host_cx));
5583        op_start_signals.push(op_start_signal.0);
5584        clients.push(host_cx.foreground().spawn(host.simulate_host(
5585            host_project,
5586            files,
5587            op_start_signal.1,
5588            rng.clone(),
5589            host_cx,
5590        )));
5591
5592        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5593            rng.lock().gen_range(0..max_operations)
5594        } else {
5595            max_operations
5596        };
5597        let mut available_guests = vec![
5598            "guest-1".to_string(),
5599            "guest-2".to_string(),
5600            "guest-3".to_string(),
5601            "guest-4".to_string(),
5602        ];
5603        let mut operations = 0;
5604        while operations < max_operations {
5605            if operations == disconnect_host_at {
5606                server.disconnect_client(user_ids[0]);
5607                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5608                drop(op_start_signals);
5609                let mut clients = futures::future::join_all(clients).await;
5610                cx.foreground().run_until_parked();
5611
5612                let (host, mut host_cx, host_err) = clients.remove(0);
5613                if let Some(host_err) = host_err {
5614                    log::error!("host error - {}", host_err);
5615                }
5616                host.project
5617                    .as_ref()
5618                    .unwrap()
5619                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5620                for (guest, mut guest_cx, guest_err) in clients {
5621                    if let Some(guest_err) = guest_err {
5622                        log::error!("{} error - {}", guest.username, guest_err);
5623                    }
5624                    let contacts = server
5625                        .store
5626                        .read()
5627                        .await
5628                        .contacts_for_user(guest.current_user_id(&guest_cx));
5629                    assert!(!contacts
5630                        .iter()
5631                        .flat_map(|contact| &contact.projects)
5632                        .any(|project| project.id == host_project_id));
5633                    guest
5634                        .project
5635                        .as_ref()
5636                        .unwrap()
5637                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5638                    guest_cx.update(|_| drop(guest));
5639                }
5640                host_cx.update(|_| drop(host));
5641
5642                return;
5643            }
5644
5645            let distribution = rng.lock().gen_range(0..100);
5646            match distribution {
5647                0..=19 if !available_guests.is_empty() => {
5648                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
5649                    let guest_username = available_guests.remove(guest_ix);
5650                    log::info!("Adding new connection for {}", guest_username);
5651                    next_entity_id += 100000;
5652                    let mut guest_cx = TestAppContext::new(
5653                        cx.foreground_platform(),
5654                        cx.platform(),
5655                        deterministic.build_foreground(next_entity_id),
5656                        deterministic.build_background(),
5657                        cx.font_cache(),
5658                        cx.leak_detector(),
5659                        next_entity_id,
5660                    );
5661                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
5662                    let guest_project = Project::remote(
5663                        host_project_id,
5664                        guest.client.clone(),
5665                        guest.user_store.clone(),
5666                        guest_lang_registry.clone(),
5667                        FakeFs::new(cx.background()),
5668                        &mut guest_cx.to_async(),
5669                    )
5670                    .await
5671                    .unwrap();
5672                    let op_start_signal = futures::channel::mpsc::unbounded();
5673                    user_ids.push(guest.current_user_id(&guest_cx));
5674                    op_start_signals.push(op_start_signal.0);
5675                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5676                        guest_username.clone(),
5677                        guest_project,
5678                        op_start_signal.1,
5679                        rng.clone(),
5680                        guest_cx,
5681                    )));
5682
5683                    log::info!("Added connection for {}", guest_username);
5684                    operations += 1;
5685                }
5686                20..=29 if clients.len() > 1 => {
5687                    log::info!("Removing guest");
5688                    let guest_ix = rng.lock().gen_range(1..clients.len());
5689                    let removed_guest_id = user_ids.remove(guest_ix);
5690                    let guest = clients.remove(guest_ix);
5691                    op_start_signals.remove(guest_ix);
5692                    server.disconnect_client(removed_guest_id);
5693                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5694                    let (guest, mut guest_cx, guest_err) = guest.await;
5695                    if let Some(guest_err) = guest_err {
5696                        log::error!("{} error - {}", guest.username, guest_err);
5697                    }
5698                    guest
5699                        .project
5700                        .as_ref()
5701                        .unwrap()
5702                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5703                    for user_id in &user_ids {
5704                        for contact in server.store.read().await.contacts_for_user(*user_id) {
5705                            assert_ne!(
5706                                contact.user_id, removed_guest_id.0 as u64,
5707                                "removed guest is still a contact of another peer"
5708                            );
5709                            for project in contact.projects {
5710                                for project_guest_id in project.guests {
5711                                    assert_ne!(
5712                                        project_guest_id, removed_guest_id.0 as u64,
5713                                        "removed guest appears as still participating on a project"
5714                                    );
5715                                }
5716                            }
5717                        }
5718                    }
5719
5720                    log::info!("{} removed", guest.username);
5721                    available_guests.push(guest.username.clone());
5722                    guest_cx.update(|_| drop(guest));
5723
5724                    operations += 1;
5725                }
5726                _ => {
5727                    while operations < max_operations && rng.lock().gen_bool(0.7) {
5728                        op_start_signals
5729                            .choose(&mut *rng.lock())
5730                            .unwrap()
5731                            .unbounded_send(())
5732                            .unwrap();
5733                        operations += 1;
5734                    }
5735
5736                    if rng.lock().gen_bool(0.8) {
5737                        cx.foreground().run_until_parked();
5738                    }
5739                }
5740            }
5741        }
5742
5743        drop(op_start_signals);
5744        let mut clients = futures::future::join_all(clients).await;
5745        cx.foreground().run_until_parked();
5746
5747        let (host_client, mut host_cx, host_err) = clients.remove(0);
5748        if let Some(host_err) = host_err {
5749            panic!("host error - {}", host_err);
5750        }
5751        let host_project = host_client.project.as_ref().unwrap();
5752        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5753            project
5754                .worktrees(cx)
5755                .map(|worktree| {
5756                    let snapshot = worktree.read(cx).snapshot();
5757                    (snapshot.id(), snapshot)
5758                })
5759                .collect::<BTreeMap<_, _>>()
5760        });
5761
5762        host_client
5763            .project
5764            .as_ref()
5765            .unwrap()
5766            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5767
5768        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5769            if let Some(guest_err) = guest_err {
5770                panic!("{} error - {}", guest_client.username, guest_err);
5771            }
5772            let worktree_snapshots =
5773                guest_client
5774                    .project
5775                    .as_ref()
5776                    .unwrap()
5777                    .read_with(&guest_cx, |project, cx| {
5778                        project
5779                            .worktrees(cx)
5780                            .map(|worktree| {
5781                                let worktree = worktree.read(cx);
5782                                (worktree.id(), worktree.snapshot())
5783                            })
5784                            .collect::<BTreeMap<_, _>>()
5785                    });
5786
5787            assert_eq!(
5788                worktree_snapshots.keys().collect::<Vec<_>>(),
5789                host_worktree_snapshots.keys().collect::<Vec<_>>(),
5790                "{} has different worktrees than the host",
5791                guest_client.username
5792            );
5793            for (id, host_snapshot) in &host_worktree_snapshots {
5794                let guest_snapshot = &worktree_snapshots[id];
5795                assert_eq!(
5796                    guest_snapshot.root_name(),
5797                    host_snapshot.root_name(),
5798                    "{} has different root name than the host for worktree {}",
5799                    guest_client.username,
5800                    id
5801                );
5802                assert_eq!(
5803                    guest_snapshot.entries(false).collect::<Vec<_>>(),
5804                    host_snapshot.entries(false).collect::<Vec<_>>(),
5805                    "{} has different snapshot than the host for worktree {}",
5806                    guest_client.username,
5807                    id
5808                );
5809                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
5810            }
5811
5812            guest_client
5813                .project
5814                .as_ref()
5815                .unwrap()
5816                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5817
5818            for guest_buffer in &guest_client.buffers {
5819                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5820                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5821                    project.buffer_for_id(buffer_id, cx).expect(&format!(
5822                        "host does not have buffer for guest:{}, peer:{}, id:{}",
5823                        guest_client.username, guest_client.peer_id, buffer_id
5824                    ))
5825                });
5826                let path = host_buffer
5827                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5828
5829                assert_eq!(
5830                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5831                    0,
5832                    "{}, buffer {}, path {:?} has deferred operations",
5833                    guest_client.username,
5834                    buffer_id,
5835                    path,
5836                );
5837                assert_eq!(
5838                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5839                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5840                    "{}, buffer {}, path {:?}, differs from the host's buffer",
5841                    guest_client.username,
5842                    buffer_id,
5843                    path
5844                );
5845            }
5846
5847            guest_cx.update(|_| drop(guest_client));
5848        }
5849
5850        host_cx.update(|_| drop(host_client));
5851    }
5852
5853    struct TestServer {
5854        peer: Arc<Peer>,
5855        app_state: Arc<AppState>,
5856        server: Arc<Server>,
5857        foreground: Rc<executor::Foreground>,
5858        notifications: mpsc::UnboundedReceiver<()>,
5859        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5860        forbid_connections: Arc<AtomicBool>,
5861        _test_db: TestDb,
5862    }
5863
5864    impl TestServer {
5865        async fn start(
5866            foreground: Rc<executor::Foreground>,
5867            background: Arc<executor::Background>,
5868        ) -> Self {
5869            let test_db = TestDb::fake(background);
5870            let app_state = Self::build_app_state(&test_db).await;
5871            let peer = Peer::new();
5872            let notifications = mpsc::unbounded();
5873            let server = Server::new(app_state.clone(), Some(notifications.0));
5874            Self {
5875                peer,
5876                app_state,
5877                server,
5878                foreground,
5879                notifications: notifications.1,
5880                connection_killers: Default::default(),
5881                forbid_connections: Default::default(),
5882                _test_db: test_db,
5883            }
5884        }
5885
5886        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
5887            cx.update(|cx| {
5888                let settings = Settings::test(cx);
5889                cx.set_global(settings);
5890            });
5891
5892            let http = FakeHttpClient::with_404_response();
5893            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
5894            let client_name = name.to_string();
5895            let mut client = Client::new(http.clone());
5896            let server = self.server.clone();
5897            let connection_killers = self.connection_killers.clone();
5898            let forbid_connections = self.forbid_connections.clone();
5899            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
5900
5901            Arc::get_mut(&mut client)
5902                .unwrap()
5903                .override_authenticate(move |cx| {
5904                    cx.spawn(|_| async move {
5905                        let access_token = "the-token".to_string();
5906                        Ok(Credentials {
5907                            user_id: user_id.0 as u64,
5908                            access_token,
5909                        })
5910                    })
5911                })
5912                .override_establish_connection(move |credentials, cx| {
5913                    assert_eq!(credentials.user_id, user_id.0 as u64);
5914                    assert_eq!(credentials.access_token, "the-token");
5915
5916                    let server = server.clone();
5917                    let connection_killers = connection_killers.clone();
5918                    let forbid_connections = forbid_connections.clone();
5919                    let client_name = client_name.clone();
5920                    let connection_id_tx = connection_id_tx.clone();
5921                    cx.spawn(move |cx| async move {
5922                        if forbid_connections.load(SeqCst) {
5923                            Err(EstablishConnectionError::other(anyhow!(
5924                                "server is forbidding connections"
5925                            )))
5926                        } else {
5927                            let (client_conn, server_conn, killed) =
5928                                Connection::in_memory(cx.background());
5929                            connection_killers.lock().insert(user_id, killed);
5930                            cx.background()
5931                                .spawn(server.handle_connection(
5932                                    server_conn,
5933                                    client_name,
5934                                    user_id,
5935                                    Some(connection_id_tx),
5936                                    cx.background(),
5937                                ))
5938                                .detach();
5939                            Ok(client_conn)
5940                        }
5941                    })
5942                });
5943
5944            client
5945                .authenticate_and_connect(false, &cx.to_async())
5946                .await
5947                .unwrap();
5948
5949            Channel::init(&client);
5950            Project::init(&client);
5951            cx.update(|cx| {
5952                workspace::init(&client, cx);
5953            });
5954
5955            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5956            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5957
5958            let client = TestClient {
5959                client,
5960                peer_id,
5961                username: name.to_string(),
5962                user_store,
5963                language_registry: Arc::new(LanguageRegistry::test()),
5964                project: Default::default(),
5965                buffers: Default::default(),
5966            };
5967            client.wait_for_current_user(cx).await;
5968            client
5969        }
5970
5971        fn disconnect_client(&self, user_id: UserId) {
5972            self.connection_killers
5973                .lock()
5974                .remove(&user_id)
5975                .unwrap()
5976                .store(true, SeqCst);
5977        }
5978
5979        fn forbid_connections(&self) {
5980            self.forbid_connections.store(true, SeqCst);
5981        }
5982
5983        fn allow_connections(&self) {
5984            self.forbid_connections.store(false, SeqCst);
5985        }
5986
5987        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
5988            Arc::new(AppState {
5989                db: test_db.db().clone(),
5990                api_token: Default::default(),
5991            })
5992        }
5993
5994        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
5995            self.server.store.read().await
5996        }
5997
5998        async fn condition<F>(&mut self, mut predicate: F)
5999        where
6000            F: FnMut(&Store) -> bool,
6001        {
6002            assert!(
6003                self.foreground.parking_forbidden(),
6004                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6005            );
6006            while !(predicate)(&*self.server.store.read().await) {
6007                self.foreground.start_waiting();
6008                self.notifications.next().await;
6009                self.foreground.finish_waiting();
6010            }
6011        }
6012    }
6013
6014    impl Deref for TestServer {
6015        type Target = Server;
6016
6017        fn deref(&self) -> &Self::Target {
6018            &self.server
6019        }
6020    }
6021
6022    impl Drop for TestServer {
6023        fn drop(&mut self) {
6024            self.peer.reset();
6025        }
6026    }
6027
6028    struct TestClient {
6029        client: Arc<Client>,
6030        username: String,
6031        pub peer_id: PeerId,
6032        pub user_store: ModelHandle<UserStore>,
6033        language_registry: Arc<LanguageRegistry>,
6034        project: Option<ModelHandle<Project>>,
6035        buffers: HashSet<ModelHandle<language::Buffer>>,
6036    }
6037
6038    impl Deref for TestClient {
6039        type Target = Arc<Client>;
6040
6041        fn deref(&self) -> &Self::Target {
6042            &self.client
6043        }
6044    }
6045
6046    impl TestClient {
6047        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6048            UserId::from_proto(
6049                self.user_store
6050                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6051            )
6052        }
6053
6054        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6055            let mut authed_user = self
6056                .user_store
6057                .read_with(cx, |user_store, _| user_store.watch_current_user());
6058            while authed_user.next().await.unwrap().is_none() {}
6059        }
6060
6061        async fn build_local_project(
6062            &mut self,
6063            fs: Arc<FakeFs>,
6064            root_path: impl AsRef<Path>,
6065            cx: &mut TestAppContext,
6066        ) -> (ModelHandle<Project>, WorktreeId) {
6067            let project = cx.update(|cx| {
6068                Project::local(
6069                    self.client.clone(),
6070                    self.user_store.clone(),
6071                    self.language_registry.clone(),
6072                    fs,
6073                    cx,
6074                )
6075            });
6076            self.project = Some(project.clone());
6077            let (worktree, _) = project
6078                .update(cx, |p, cx| {
6079                    p.find_or_create_local_worktree(root_path, true, cx)
6080                })
6081                .await
6082                .unwrap();
6083            worktree
6084                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6085                .await;
6086            project
6087                .update(cx, |project, _| project.next_remote_id())
6088                .await;
6089            (project, worktree.read_with(cx, |tree, _| tree.id()))
6090        }
6091
6092        async fn build_remote_project(
6093            &mut self,
6094            project_id: u64,
6095            cx: &mut TestAppContext,
6096        ) -> ModelHandle<Project> {
6097            let project = Project::remote(
6098                project_id,
6099                self.client.clone(),
6100                self.user_store.clone(),
6101                self.language_registry.clone(),
6102                FakeFs::new(cx.background()),
6103                &mut cx.to_async(),
6104            )
6105            .await
6106            .unwrap();
6107            self.project = Some(project.clone());
6108            project
6109        }
6110
6111        fn build_workspace(
6112            &self,
6113            project: &ModelHandle<Project>,
6114            cx: &mut TestAppContext,
6115        ) -> ViewHandle<Workspace> {
6116            let (window_id, _) = cx.add_window(|_| EmptyView);
6117            cx.add_view(window_id, |cx| {
6118                let fs = project.read(cx).fs().clone();
6119                Workspace::new(
6120                    &WorkspaceParams {
6121                        fs,
6122                        project: project.clone(),
6123                        user_store: self.user_store.clone(),
6124                        languages: self.language_registry.clone(),
6125                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6126                        channel_list: cx.add_model(|cx| {
6127                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6128                        }),
6129                        client: self.client.clone(),
6130                    },
6131                    cx,
6132                )
6133            })
6134        }
6135
6136        async fn simulate_host(
6137            mut self,
6138            project: ModelHandle<Project>,
6139            files: Arc<Mutex<Vec<PathBuf>>>,
6140            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6141            rng: Arc<Mutex<StdRng>>,
6142            mut cx: TestAppContext,
6143        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6144            async fn simulate_host_internal(
6145                client: &mut TestClient,
6146                project: ModelHandle<Project>,
6147                files: Arc<Mutex<Vec<PathBuf>>>,
6148                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6149                rng: Arc<Mutex<StdRng>>,
6150                cx: &mut TestAppContext,
6151            ) -> anyhow::Result<()> {
6152                let fs = project.read_with(cx, |project, _| project.fs().clone());
6153
6154                while op_start_signal.next().await.is_some() {
6155                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6156                    match distribution {
6157                        0..=20 if !files.lock().is_empty() => {
6158                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6159                            let mut path = path.as_path();
6160                            while let Some(parent_path) = path.parent() {
6161                                path = parent_path;
6162                                if rng.lock().gen() {
6163                                    break;
6164                                }
6165                            }
6166
6167                            log::info!("Host: find/create local worktree {:?}", path);
6168                            let find_or_create_worktree = project.update(cx, |project, cx| {
6169                                project.find_or_create_local_worktree(path, true, cx)
6170                            });
6171                            if rng.lock().gen() {
6172                                cx.background().spawn(find_or_create_worktree).detach();
6173                            } else {
6174                                find_or_create_worktree.await?;
6175                            }
6176                        }
6177                        10..=80 if !files.lock().is_empty() => {
6178                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6179                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6180                                let (worktree, path) = project
6181                                    .update(cx, |project, cx| {
6182                                        project.find_or_create_local_worktree(
6183                                            file.clone(),
6184                                            true,
6185                                            cx,
6186                                        )
6187                                    })
6188                                    .await?;
6189                                let project_path =
6190                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6191                                log::info!(
6192                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6193                                    file,
6194                                    project_path.0,
6195                                    project_path.1
6196                                );
6197                                let buffer = project
6198                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6199                                    .await
6200                                    .unwrap();
6201                                client.buffers.insert(buffer.clone());
6202                                buffer
6203                            } else {
6204                                client
6205                                    .buffers
6206                                    .iter()
6207                                    .choose(&mut *rng.lock())
6208                                    .unwrap()
6209                                    .clone()
6210                            };
6211
6212                            if rng.lock().gen_bool(0.1) {
6213                                cx.update(|cx| {
6214                                    log::info!(
6215                                        "Host: dropping buffer {:?}",
6216                                        buffer.read(cx).file().unwrap().full_path(cx)
6217                                    );
6218                                    client.buffers.remove(&buffer);
6219                                    drop(buffer);
6220                                });
6221                            } else {
6222                                buffer.update(cx, |buffer, cx| {
6223                                    log::info!(
6224                                        "Host: updating buffer {:?} ({})",
6225                                        buffer.file().unwrap().full_path(cx),
6226                                        buffer.remote_id()
6227                                    );
6228
6229                                    if rng.lock().gen_bool(0.7) {
6230                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6231                                    } else {
6232                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6233                                    }
6234                                });
6235                            }
6236                        }
6237                        _ => loop {
6238                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6239                            let mut path = PathBuf::new();
6240                            path.push("/");
6241                            for _ in 0..path_component_count {
6242                                let letter = rng.lock().gen_range(b'a'..=b'z');
6243                                path.push(std::str::from_utf8(&[letter]).unwrap());
6244                            }
6245                            path.set_extension("rs");
6246                            let parent_path = path.parent().unwrap();
6247
6248                            log::info!("Host: creating file {:?}", path,);
6249
6250                            if fs.create_dir(&parent_path).await.is_ok()
6251                                && fs.create_file(&path, Default::default()).await.is_ok()
6252                            {
6253                                files.lock().push(path);
6254                                break;
6255                            } else {
6256                                log::info!("Host: cannot create file");
6257                            }
6258                        },
6259                    }
6260
6261                    cx.background().simulate_random_delay().await;
6262                }
6263
6264                Ok(())
6265            }
6266
6267            let result = simulate_host_internal(
6268                &mut self,
6269                project.clone(),
6270                files,
6271                op_start_signal,
6272                rng,
6273                &mut cx,
6274            )
6275            .await;
6276            log::info!("Host done");
6277            self.project = Some(project);
6278            (self, cx, result.err())
6279        }
6280
6281        pub async fn simulate_guest(
6282            mut self,
6283            guest_username: String,
6284            project: ModelHandle<Project>,
6285            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6286            rng: Arc<Mutex<StdRng>>,
6287            mut cx: TestAppContext,
6288        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6289            async fn simulate_guest_internal(
6290                client: &mut TestClient,
6291                guest_username: &str,
6292                project: ModelHandle<Project>,
6293                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6294                rng: Arc<Mutex<StdRng>>,
6295                cx: &mut TestAppContext,
6296            ) -> anyhow::Result<()> {
6297                while op_start_signal.next().await.is_some() {
6298                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6299                        let worktree = if let Some(worktree) =
6300                            project.read_with(cx, |project, cx| {
6301                                project
6302                                    .worktrees(&cx)
6303                                    .filter(|worktree| {
6304                                        let worktree = worktree.read(cx);
6305                                        worktree.is_visible()
6306                                            && worktree.entries(false).any(|e| e.is_file())
6307                                    })
6308                                    .choose(&mut *rng.lock())
6309                            }) {
6310                            worktree
6311                        } else {
6312                            cx.background().simulate_random_delay().await;
6313                            continue;
6314                        };
6315
6316                        let (worktree_root_name, project_path) =
6317                            worktree.read_with(cx, |worktree, _| {
6318                                let entry = worktree
6319                                    .entries(false)
6320                                    .filter(|e| e.is_file())
6321                                    .choose(&mut *rng.lock())
6322                                    .unwrap();
6323                                (
6324                                    worktree.root_name().to_string(),
6325                                    (worktree.id(), entry.path.clone()),
6326                                )
6327                            });
6328                        log::info!(
6329                            "{}: opening path {:?} in worktree {} ({})",
6330                            guest_username,
6331                            project_path.1,
6332                            project_path.0,
6333                            worktree_root_name,
6334                        );
6335                        let buffer = project
6336                            .update(cx, |project, cx| {
6337                                project.open_buffer(project_path.clone(), cx)
6338                            })
6339                            .await?;
6340                        log::info!(
6341                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6342                            guest_username,
6343                            project_path.1,
6344                            project_path.0,
6345                            worktree_root_name,
6346                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6347                        );
6348                        client.buffers.insert(buffer.clone());
6349                        buffer
6350                    } else {
6351                        client
6352                            .buffers
6353                            .iter()
6354                            .choose(&mut *rng.lock())
6355                            .unwrap()
6356                            .clone()
6357                    };
6358
6359                    let choice = rng.lock().gen_range(0..100);
6360                    match choice {
6361                        0..=9 => {
6362                            cx.update(|cx| {
6363                                log::info!(
6364                                    "{}: dropping buffer {:?}",
6365                                    guest_username,
6366                                    buffer.read(cx).file().unwrap().full_path(cx)
6367                                );
6368                                client.buffers.remove(&buffer);
6369                                drop(buffer);
6370                            });
6371                        }
6372                        10..=19 => {
6373                            let completions = project.update(cx, |project, cx| {
6374                                log::info!(
6375                                    "{}: requesting completions for buffer {} ({:?})",
6376                                    guest_username,
6377                                    buffer.read(cx).remote_id(),
6378                                    buffer.read(cx).file().unwrap().full_path(cx)
6379                                );
6380                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6381                                project.completions(&buffer, offset, cx)
6382                            });
6383                            let completions = cx.background().spawn(async move {
6384                                completions
6385                                    .await
6386                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6387                            });
6388                            if rng.lock().gen_bool(0.3) {
6389                                log::info!("{}: detaching completions request", guest_username);
6390                                cx.update(|cx| completions.detach_and_log_err(cx));
6391                            } else {
6392                                completions.await?;
6393                            }
6394                        }
6395                        20..=29 => {
6396                            let code_actions = project.update(cx, |project, cx| {
6397                                log::info!(
6398                                    "{}: requesting code actions for buffer {} ({:?})",
6399                                    guest_username,
6400                                    buffer.read(cx).remote_id(),
6401                                    buffer.read(cx).file().unwrap().full_path(cx)
6402                                );
6403                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6404                                project.code_actions(&buffer, range, cx)
6405                            });
6406                            let code_actions = cx.background().spawn(async move {
6407                                code_actions.await.map_err(|err| {
6408                                    anyhow!("code actions request failed: {:?}", err)
6409                                })
6410                            });
6411                            if rng.lock().gen_bool(0.3) {
6412                                log::info!("{}: detaching code actions request", guest_username);
6413                                cx.update(|cx| code_actions.detach_and_log_err(cx));
6414                            } else {
6415                                code_actions.await?;
6416                            }
6417                        }
6418                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6419                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6420                                log::info!(
6421                                    "{}: saving buffer {} ({:?})",
6422                                    guest_username,
6423                                    buffer.remote_id(),
6424                                    buffer.file().unwrap().full_path(cx)
6425                                );
6426                                (buffer.version(), buffer.save(cx))
6427                            });
6428                            let save = cx.background().spawn(async move {
6429                                let (saved_version, _) = save
6430                                    .await
6431                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6432                                assert!(saved_version.observed_all(&requested_version));
6433                                Ok::<_, anyhow::Error>(())
6434                            });
6435                            if rng.lock().gen_bool(0.3) {
6436                                log::info!("{}: detaching save request", guest_username);
6437                                cx.update(|cx| save.detach_and_log_err(cx));
6438                            } else {
6439                                save.await?;
6440                            }
6441                        }
6442                        40..=44 => {
6443                            let prepare_rename = project.update(cx, |project, cx| {
6444                                log::info!(
6445                                    "{}: preparing rename for buffer {} ({:?})",
6446                                    guest_username,
6447                                    buffer.read(cx).remote_id(),
6448                                    buffer.read(cx).file().unwrap().full_path(cx)
6449                                );
6450                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6451                                project.prepare_rename(buffer, offset, cx)
6452                            });
6453                            let prepare_rename = cx.background().spawn(async move {
6454                                prepare_rename.await.map_err(|err| {
6455                                    anyhow!("prepare rename request failed: {:?}", err)
6456                                })
6457                            });
6458                            if rng.lock().gen_bool(0.3) {
6459                                log::info!("{}: detaching prepare rename request", guest_username);
6460                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6461                            } else {
6462                                prepare_rename.await?;
6463                            }
6464                        }
6465                        45..=49 => {
6466                            let definitions = project.update(cx, |project, cx| {
6467                                log::info!(
6468                                    "{}: requesting definitions for buffer {} ({:?})",
6469                                    guest_username,
6470                                    buffer.read(cx).remote_id(),
6471                                    buffer.read(cx).file().unwrap().full_path(cx)
6472                                );
6473                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6474                                project.definition(&buffer, offset, cx)
6475                            });
6476                            let definitions = cx.background().spawn(async move {
6477                                definitions
6478                                    .await
6479                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6480                            });
6481                            if rng.lock().gen_bool(0.3) {
6482                                log::info!("{}: detaching definitions request", guest_username);
6483                                cx.update(|cx| definitions.detach_and_log_err(cx));
6484                            } else {
6485                                client
6486                                    .buffers
6487                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6488                            }
6489                        }
6490                        50..=54 => {
6491                            let highlights = project.update(cx, |project, cx| {
6492                                log::info!(
6493                                    "{}: requesting highlights for buffer {} ({:?})",
6494                                    guest_username,
6495                                    buffer.read(cx).remote_id(),
6496                                    buffer.read(cx).file().unwrap().full_path(cx)
6497                                );
6498                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6499                                project.document_highlights(&buffer, offset, cx)
6500                            });
6501                            let highlights = cx.background().spawn(async move {
6502                                highlights
6503                                    .await
6504                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6505                            });
6506                            if rng.lock().gen_bool(0.3) {
6507                                log::info!("{}: detaching highlights request", guest_username);
6508                                cx.update(|cx| highlights.detach_and_log_err(cx));
6509                            } else {
6510                                highlights.await?;
6511                            }
6512                        }
6513                        55..=59 => {
6514                            let search = project.update(cx, |project, cx| {
6515                                let query = rng.lock().gen_range('a'..='z');
6516                                log::info!("{}: project-wide search {:?}", guest_username, query);
6517                                project.search(SearchQuery::text(query, false, false), cx)
6518                            });
6519                            let search = cx.background().spawn(async move {
6520                                search
6521                                    .await
6522                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
6523                            });
6524                            if rng.lock().gen_bool(0.3) {
6525                                log::info!("{}: detaching search request", guest_username);
6526                                cx.update(|cx| search.detach_and_log_err(cx));
6527                            } else {
6528                                client.buffers.extend(search.await?.into_keys());
6529                            }
6530                        }
6531                        60..=69 => {
6532                            let worktree = project
6533                                .read_with(cx, |project, cx| {
6534                                    project
6535                                        .worktrees(&cx)
6536                                        .filter(|worktree| {
6537                                            let worktree = worktree.read(cx);
6538                                            worktree.is_visible()
6539                                                && worktree.entries(false).any(|e| e.is_file())
6540                                                && worktree
6541                                                    .root_entry()
6542                                                    .map_or(false, |e| e.is_dir())
6543                                        })
6544                                        .choose(&mut *rng.lock())
6545                                })
6546                                .unwrap();
6547                            let (worktree_id, worktree_root_name) = worktree
6548                                .read_with(cx, |worktree, _| {
6549                                    (worktree.id(), worktree.root_name().to_string())
6550                                });
6551
6552                            let mut new_name = String::new();
6553                            for _ in 0..10 {
6554                                let letter = rng.lock().gen_range('a'..='z');
6555                                new_name.push(letter);
6556                            }
6557                            let mut new_path = PathBuf::new();
6558                            new_path.push(new_name);
6559                            new_path.set_extension("rs");
6560                            log::info!(
6561                                "{}: creating {:?} in worktree {} ({})",
6562                                guest_username,
6563                                new_path,
6564                                worktree_id,
6565                                worktree_root_name,
6566                            );
6567                            project
6568                                .update(cx, |project, cx| {
6569                                    project.create_entry((worktree_id, new_path), false, cx)
6570                                })
6571                                .unwrap()
6572                                .await?;
6573                        }
6574                        _ => {
6575                            buffer.update(cx, |buffer, cx| {
6576                                log::info!(
6577                                    "{}: updating buffer {} ({:?})",
6578                                    guest_username,
6579                                    buffer.remote_id(),
6580                                    buffer.file().unwrap().full_path(cx)
6581                                );
6582                                if rng.lock().gen_bool(0.7) {
6583                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6584                                } else {
6585                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6586                                }
6587                            });
6588                        }
6589                    }
6590                    cx.background().simulate_random_delay().await;
6591                }
6592                Ok(())
6593            }
6594
6595            let result = simulate_guest_internal(
6596                &mut self,
6597                &guest_username,
6598                project.clone(),
6599                op_start_signal,
6600                rng,
6601                &mut cx,
6602            )
6603            .await;
6604            log::info!("{}: done", guest_username);
6605
6606            self.project = Some(project);
6607            (self, cx, result.err())
6608        }
6609    }
6610
6611    impl Drop for TestClient {
6612        fn drop(&mut self) {
6613            self.client.tear_down();
6614        }
6615    }
6616
6617    impl Executor for Arc<gpui::executor::Background> {
6618        type Sleep = gpui::executor::Timer;
6619
6620        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6621            self.spawn(future).detach();
6622        }
6623
6624        fn sleep(&self, duration: Duration) -> Self::Sleep {
6625            self.as_ref().timer(duration)
6626        }
6627    }
6628
6629    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6630        channel
6631            .messages()
6632            .cursor::<()>()
6633            .map(|m| {
6634                (
6635                    m.sender.github_login.clone(),
6636                    m.body.clone(),
6637                    m.is_pending(),
6638                )
6639            })
6640            .collect()
6641    }
6642
6643    struct EmptyView;
6644
6645    impl gpui::Entity for EmptyView {
6646        type Event = ();
6647    }
6648
6649    impl gpui::View for EmptyView {
6650        fn ui_name() -> &'static str {
6651            "empty view"
6652        }
6653
6654        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6655            gpui::Element::boxed(gpui::elements::Empty)
6656        }
6657    }
6658}