rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::{IntoResponse, Response},
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::{HashMap, HashSet};
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::Arc,
  40    time::Duration,
  41};
  42use store::{Store, Worktree};
  43use time::OffsetDateTime;
  44use tokio::{
  45    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  46    time::Sleep,
  47};
  48use tower::ServiceBuilder;
  49use tracing::{info_span, instrument, Instrument};
  50
  51type MessageHandler =
  52    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  53
  54pub struct Server {
  55    peer: Arc<Peer>,
  56    store: RwLock<Store>,
  57    app_state: Arc<AppState>,
  58    handlers: HashMap<TypeId, MessageHandler>,
  59    notifications: Option<mpsc::UnboundedSender<()>>,
  60}
  61
  62pub trait Executor: Send + Clone {
  63    type Sleep: Send + Future;
  64    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  65    fn sleep(&self, duration: Duration) -> Self::Sleep;
  66}
  67
  68#[derive(Clone)]
  69pub struct RealExecutor;
  70
  71const MESSAGE_COUNT_PER_PAGE: usize = 100;
  72const MAX_MESSAGE_LEN: usize = 1024;
  73
  74struct StoreReadGuard<'a> {
  75    guard: RwLockReadGuard<'a, Store>,
  76    _not_send: PhantomData<Rc<()>>,
  77}
  78
  79struct StoreWriteGuard<'a> {
  80    guard: RwLockWriteGuard<'a, Store>,
  81    _not_send: PhantomData<Rc<()>>,
  82}
  83
  84impl Server {
  85    pub fn new(
  86        app_state: Arc<AppState>,
  87        notifications: Option<mpsc::UnboundedSender<()>>,
  88    ) -> Arc<Self> {
  89        let mut server = Self {
  90            peer: Peer::new(),
  91            app_state,
  92            store: Default::default(),
  93            handlers: Default::default(),
  94            notifications,
  95        };
  96
  97        server
  98            .add_request_handler(Server::ping)
  99            .add_request_handler(Server::register_project)
 100            .add_message_handler(Server::unregister_project)
 101            .add_request_handler(Server::share_project)
 102            .add_message_handler(Server::unshare_project)
 103            .add_sync_request_handler(Server::join_project)
 104            .add_message_handler(Server::leave_project)
 105            .add_request_handler(Server::register_worktree)
 106            .add_message_handler(Server::unregister_worktree)
 107            .add_request_handler(Server::update_worktree)
 108            .add_message_handler(Server::start_language_server)
 109            .add_message_handler(Server::update_language_server)
 110            .add_message_handler(Server::update_diagnostic_summary)
 111            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 112            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 113            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 114            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 115            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 116            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 117            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 118            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 119            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 120            .add_request_handler(
 121                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 122            )
 123            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 124            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 125            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 126            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 127            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 128            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 129            .add_request_handler(Server::update_buffer)
 130            .add_message_handler(Server::update_buffer_file)
 131            .add_message_handler(Server::buffer_reloaded)
 132            .add_message_handler(Server::buffer_saved)
 133            .add_request_handler(Server::save_buffer)
 134            .add_request_handler(Server::get_channels)
 135            .add_request_handler(Server::get_users)
 136            .add_request_handler(Server::join_channel)
 137            .add_message_handler(Server::leave_channel)
 138            .add_request_handler(Server::send_channel_message)
 139            .add_request_handler(Server::follow)
 140            .add_message_handler(Server::unfollow)
 141            .add_message_handler(Server::update_followers)
 142            .add_request_handler(Server::get_channel_messages);
 143
 144        Arc::new(server)
 145    }
 146
 147    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 148    where
 149        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 150        Fut: 'static + Send + Future<Output = Result<()>>,
 151        M: EnvelopedMessage,
 152    {
 153        let prev_handler = self.handlers.insert(
 154            TypeId::of::<M>(),
 155            Box::new(move |server, envelope| {
 156                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 157                let span = info_span!(
 158                    "handle message",
 159                    payload_type = envelope.payload_type_name()
 160                );
 161                let future = (handler)(server, *envelope);
 162                async move {
 163                    if let Err(error) = future.await {
 164                        tracing::error!(%error, "error handling message");
 165                    }
 166                }
 167                .instrument(span)
 168                .boxed()
 169            }),
 170        );
 171        if prev_handler.is_some() {
 172            panic!("registered a handler for the same message twice");
 173        }
 174        self
 175    }
 176
 177    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 178    where
 179        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 180        Fut: 'static + Send + Future<Output = Result<M::Response>>,
 181        M: RequestMessage,
 182    {
 183        self.add_message_handler(move |server, envelope| {
 184            let receipt = envelope.receipt();
 185            let response = (handler)(server.clone(), envelope);
 186            async move {
 187                match response.await {
 188                    Ok(response) => {
 189                        server.peer.respond(receipt, response)?;
 190                        Ok(())
 191                    }
 192                    Err(error) => {
 193                        server.peer.respond_with_error(
 194                            receipt,
 195                            proto::Error {
 196                                message: error.to_string(),
 197                            },
 198                        )?;
 199                        Err(error)
 200                    }
 201                }
 202            }
 203        })
 204    }
 205
 206    /// Handle a request while holding a lock to the store. This is useful when we're registering
 207    /// a connection but we want to respond on the connection before anybody else can send on it.
 208    fn add_sync_request_handler<F, M>(&mut self, handler: F) -> &mut Self
 209    where
 210        F: 'static
 211            + Send
 212            + Sync
 213            + Fn(Arc<Self>, &mut Store, TypedEnvelope<M>) -> Result<M::Response>,
 214        M: RequestMessage,
 215    {
 216        let handler = Arc::new(handler);
 217        self.add_message_handler(move |server, envelope| {
 218            let receipt = envelope.receipt();
 219            let handler = handler.clone();
 220            async move {
 221                let mut store = server.state_mut().await;
 222                let response = (handler)(server.clone(), &mut *store, envelope);
 223                match response {
 224                    Ok(response) => {
 225                        server.peer.respond(receipt, response)?;
 226                        Ok(())
 227                    }
 228                    Err(error) => {
 229                        server.peer.respond_with_error(
 230                            receipt,
 231                            proto::Error {
 232                                message: error.to_string(),
 233                            },
 234                        )?;
 235                        Err(error)
 236                    }
 237                }
 238            }
 239        })
 240    }
 241
 242    pub fn handle_connection<E: Executor>(
 243        self: &Arc<Self>,
 244        connection: Connection,
 245        address: String,
 246        user_id: UserId,
 247        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 248        executor: E,
 249    ) -> impl Future<Output = ()> {
 250        let mut this = self.clone();
 251        let span = info_span!("handle connection", %user_id, %address);
 252        async move {
 253            let (connection_id, handle_io, mut incoming_rx) = this
 254                .peer
 255                .add_connection(connection, {
 256                    let executor = executor.clone();
 257                    move |duration| {
 258                        let timer = executor.sleep(duration);
 259                        async move {
 260                            timer.await;
 261                        }
 262                    }
 263                })
 264                .await;
 265
 266            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 267
 268            if let Some(send_connection_id) = send_connection_id.as_mut() {
 269                let _ = send_connection_id.send(connection_id).await;
 270            }
 271
 272            {
 273                let mut state = this.state_mut().await;
 274                state.add_connection(connection_id, user_id);
 275                this.update_contacts_for_users(&*state, &[user_id]);
 276            }
 277
 278            let handle_io = handle_io.fuse();
 279            futures::pin_mut!(handle_io);
 280            loop {
 281                let next_message = incoming_rx.next().fuse();
 282                futures::pin_mut!(next_message);
 283                futures::select_biased! {
 284                    result = handle_io => {
 285                        if let Err(error) = result {
 286                            tracing::error!(%error, "error handling I/O");
 287                        }
 288                        break;
 289                    }
 290                    message = next_message => {
 291                        if let Some(message) = message {
 292                            let type_name = message.payload_type_name();
 293                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 294                            async {
 295                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 296                                    let notifications = this.notifications.clone();
 297                                    let is_background = message.is_background();
 298                                    let handle_message = (handler)(this.clone(), message);
 299                                    let handle_message = async move {
 300                                        handle_message.await;
 301                                        if let Some(mut notifications) = notifications {
 302                                            let _ = notifications.send(()).await;
 303                                        }
 304                                    };
 305                                    if is_background {
 306                                        executor.spawn_detached(handle_message);
 307                                    } else {
 308                                        handle_message.await;
 309                                    }
 310                                } else {
 311                                    tracing::error!("no message handler");
 312                                }
 313                            }.instrument(span).await;
 314                        } else {
 315                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 316                            break;
 317                        }
 318                    }
 319                }
 320            }
 321
 322            if let Err(error) = this.sign_out(connection_id).await {
 323                tracing::error!(%error, "error signing out");
 324            }
 325        }.instrument(span)
 326    }
 327
 328    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 329        self.peer.disconnect(connection_id);
 330        let mut state = self.state_mut().await;
 331        let removed_connection = state.remove_connection(connection_id)?;
 332
 333        for (project_id, project) in removed_connection.hosted_projects {
 334            if let Some(share) = project.share {
 335                broadcast(
 336                    connection_id,
 337                    share.guests.keys().copied().collect(),
 338                    |conn_id| {
 339                        self.peer
 340                            .send(conn_id, proto::UnshareProject { project_id })
 341                    },
 342                );
 343            }
 344        }
 345
 346        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 347            broadcast(connection_id, peer_ids, |conn_id| {
 348                self.peer.send(
 349                    conn_id,
 350                    proto::RemoveProjectCollaborator {
 351                        project_id,
 352                        peer_id: connection_id.0,
 353                    },
 354                )
 355            });
 356        }
 357
 358        self.update_contacts_for_users(&*state, removed_connection.contact_ids.iter());
 359        Ok(())
 360    }
 361
 362    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> Result<proto::Ack> {
 363        Ok(proto::Ack {})
 364    }
 365
 366    async fn register_project(
 367        self: Arc<Server>,
 368        request: TypedEnvelope<proto::RegisterProject>,
 369    ) -> Result<proto::RegisterProjectResponse> {
 370        let project_id = {
 371            let mut state = self.state_mut().await;
 372            let user_id = state.user_id_for_connection(request.sender_id)?;
 373            state.register_project(request.sender_id, user_id)
 374        };
 375        Ok(proto::RegisterProjectResponse { project_id })
 376    }
 377
 378    async fn unregister_project(
 379        self: Arc<Server>,
 380        request: TypedEnvelope<proto::UnregisterProject>,
 381    ) -> Result<()> {
 382        let mut state = self.state_mut().await;
 383        let project = state.unregister_project(request.payload.project_id, request.sender_id)?;
 384        self.update_contacts_for_users(&*state, &project.authorized_user_ids());
 385        Ok(())
 386    }
 387
 388    async fn share_project(
 389        self: Arc<Server>,
 390        request: TypedEnvelope<proto::ShareProject>,
 391    ) -> Result<proto::Ack> {
 392        let mut state = self.state_mut().await;
 393        let project = state.share_project(request.payload.project_id, request.sender_id)?;
 394        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 395        Ok(proto::Ack {})
 396    }
 397
 398    async fn unshare_project(
 399        self: Arc<Server>,
 400        request: TypedEnvelope<proto::UnshareProject>,
 401    ) -> Result<()> {
 402        let project_id = request.payload.project_id;
 403        let mut state = self.state_mut().await;
 404        let project = state.unshare_project(project_id, request.sender_id)?;
 405        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 406            self.peer
 407                .send(conn_id, proto::UnshareProject { project_id })
 408        });
 409        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 410        Ok(())
 411    }
 412
 413    fn join_project(
 414        self: Arc<Server>,
 415        state: &mut Store,
 416        request: TypedEnvelope<proto::JoinProject>,
 417    ) -> Result<proto::JoinProjectResponse> {
 418        let project_id = request.payload.project_id;
 419
 420        let user_id = state.user_id_for_connection(request.sender_id)?;
 421        let (response, connection_ids, contact_user_ids) = state
 422            .join_project(request.sender_id, user_id, project_id)
 423            .and_then(|joined| {
 424                let share = joined.project.share()?;
 425                let peer_count = share.guests.len();
 426                let mut collaborators = Vec::with_capacity(peer_count);
 427                collaborators.push(proto::Collaborator {
 428                    peer_id: joined.project.host_connection_id.0,
 429                    replica_id: 0,
 430                    user_id: joined.project.host_user_id.to_proto(),
 431                });
 432                let worktrees = share
 433                    .worktrees
 434                    .iter()
 435                    .filter_map(|(id, shared_worktree)| {
 436                        let worktree = joined.project.worktrees.get(&id)?;
 437                        Some(proto::Worktree {
 438                            id: *id,
 439                            root_name: worktree.root_name.clone(),
 440                            entries: shared_worktree.entries.values().cloned().collect(),
 441                            diagnostic_summaries: shared_worktree
 442                                .diagnostic_summaries
 443                                .values()
 444                                .cloned()
 445                                .collect(),
 446                            visible: worktree.visible,
 447                        })
 448                    })
 449                    .collect();
 450                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 451                    if *peer_conn_id != request.sender_id {
 452                        collaborators.push(proto::Collaborator {
 453                            peer_id: peer_conn_id.0,
 454                            replica_id: *peer_replica_id as u32,
 455                            user_id: peer_user_id.to_proto(),
 456                        });
 457                    }
 458                }
 459                let response = proto::JoinProjectResponse {
 460                    worktrees,
 461                    replica_id: joined.replica_id as u32,
 462                    collaborators,
 463                    language_servers: joined.project.language_servers.clone(),
 464                };
 465                let connection_ids = joined.project.connection_ids();
 466                let contact_user_ids = joined.project.authorized_user_ids();
 467                Ok((response, connection_ids, contact_user_ids))
 468            })?;
 469
 470        broadcast(request.sender_id, connection_ids, |conn_id| {
 471            self.peer.send(
 472                conn_id,
 473                proto::AddProjectCollaborator {
 474                    project_id,
 475                    collaborator: Some(proto::Collaborator {
 476                        peer_id: request.sender_id.0,
 477                        replica_id: response.replica_id,
 478                        user_id: user_id.to_proto(),
 479                    }),
 480                },
 481            )
 482        });
 483        self.update_contacts_for_users(state, &contact_user_ids);
 484        Ok(response)
 485    }
 486
 487    async fn leave_project(
 488        self: Arc<Server>,
 489        request: TypedEnvelope<proto::LeaveProject>,
 490    ) -> Result<()> {
 491        let sender_id = request.sender_id;
 492        let project_id = request.payload.project_id;
 493        let mut state = self.state_mut().await;
 494        let worktree = state.leave_project(sender_id, project_id)?;
 495        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 496            self.peer.send(
 497                conn_id,
 498                proto::RemoveProjectCollaborator {
 499                    project_id,
 500                    peer_id: sender_id.0,
 501                },
 502            )
 503        });
 504        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 505        Ok(())
 506    }
 507
 508    async fn register_worktree(
 509        self: Arc<Server>,
 510        request: TypedEnvelope<proto::RegisterWorktree>,
 511    ) -> Result<proto::Ack> {
 512        let mut contact_user_ids = HashSet::default();
 513        for github_login in &request.payload.authorized_logins {
 514            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 515            contact_user_ids.insert(contact_user_id);
 516        }
 517
 518        let mut state = self.state_mut().await;
 519        let host_user_id = state.user_id_for_connection(request.sender_id)?;
 520        contact_user_ids.insert(host_user_id);
 521
 522        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 523        let guest_connection_ids = state
 524            .read_project(request.payload.project_id, request.sender_id)?
 525            .guest_connection_ids();
 526        state.register_worktree(
 527            request.payload.project_id,
 528            request.payload.worktree_id,
 529            request.sender_id,
 530            Worktree {
 531                authorized_user_ids: contact_user_ids.clone(),
 532                root_name: request.payload.root_name.clone(),
 533                visible: request.payload.visible,
 534            },
 535        )?;
 536
 537        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 538            self.peer
 539                .forward_send(request.sender_id, connection_id, request.payload.clone())
 540        });
 541        self.update_contacts_for_users(&*state, &contact_user_ids);
 542        Ok(proto::Ack {})
 543    }
 544
 545    async fn unregister_worktree(
 546        self: Arc<Server>,
 547        request: TypedEnvelope<proto::UnregisterWorktree>,
 548    ) -> Result<()> {
 549        let project_id = request.payload.project_id;
 550        let worktree_id = request.payload.worktree_id;
 551        let mut state = self.state_mut().await;
 552        let (worktree, guest_connection_ids) =
 553            state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 554        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 555            self.peer.send(
 556                conn_id,
 557                proto::UnregisterWorktree {
 558                    project_id,
 559                    worktree_id,
 560                },
 561            )
 562        });
 563        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 564        Ok(())
 565    }
 566
 567    async fn update_worktree(
 568        self: Arc<Server>,
 569        request: TypedEnvelope<proto::UpdateWorktree>,
 570    ) -> Result<proto::Ack> {
 571        let connection_ids = self.state_mut().await.update_worktree(
 572            request.sender_id,
 573            request.payload.project_id,
 574            request.payload.worktree_id,
 575            &request.payload.removed_entries,
 576            &request.payload.updated_entries,
 577        )?;
 578
 579        broadcast(request.sender_id, connection_ids, |connection_id| {
 580            self.peer
 581                .forward_send(request.sender_id, connection_id, request.payload.clone())
 582        });
 583
 584        Ok(proto::Ack {})
 585    }
 586
 587    async fn update_diagnostic_summary(
 588        self: Arc<Server>,
 589        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 590    ) -> Result<()> {
 591        let summary = request
 592            .payload
 593            .summary
 594            .clone()
 595            .ok_or_else(|| anyhow!("invalid summary"))?;
 596        let receiver_ids = self.state_mut().await.update_diagnostic_summary(
 597            request.payload.project_id,
 598            request.payload.worktree_id,
 599            request.sender_id,
 600            summary,
 601        )?;
 602
 603        broadcast(request.sender_id, receiver_ids, |connection_id| {
 604            self.peer
 605                .forward_send(request.sender_id, connection_id, request.payload.clone())
 606        });
 607        Ok(())
 608    }
 609
 610    async fn start_language_server(
 611        self: Arc<Server>,
 612        request: TypedEnvelope<proto::StartLanguageServer>,
 613    ) -> Result<()> {
 614        let receiver_ids = self.state_mut().await.start_language_server(
 615            request.payload.project_id,
 616            request.sender_id,
 617            request
 618                .payload
 619                .server
 620                .clone()
 621                .ok_or_else(|| anyhow!("invalid language server"))?,
 622        )?;
 623        broadcast(request.sender_id, receiver_ids, |connection_id| {
 624            self.peer
 625                .forward_send(request.sender_id, connection_id, request.payload.clone())
 626        });
 627        Ok(())
 628    }
 629
 630    async fn update_language_server(
 631        self: Arc<Server>,
 632        request: TypedEnvelope<proto::UpdateLanguageServer>,
 633    ) -> Result<()> {
 634        let receiver_ids = self
 635            .state()
 636            .await
 637            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 638        broadcast(request.sender_id, receiver_ids, |connection_id| {
 639            self.peer
 640                .forward_send(request.sender_id, connection_id, request.payload.clone())
 641        });
 642        Ok(())
 643    }
 644
 645    async fn forward_project_request<T>(
 646        self: Arc<Server>,
 647        request: TypedEnvelope<T>,
 648    ) -> Result<T::Response>
 649    where
 650        T: EntityMessage + RequestMessage,
 651    {
 652        let host_connection_id = self
 653            .state()
 654            .await
 655            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 656            .host_connection_id;
 657        Ok(self
 658            .peer
 659            .forward_request(request.sender_id, host_connection_id, request.payload)
 660            .await?)
 661    }
 662
 663    async fn save_buffer(
 664        self: Arc<Server>,
 665        request: TypedEnvelope<proto::SaveBuffer>,
 666    ) -> Result<proto::BufferSaved> {
 667        let host = self
 668            .state()
 669            .await
 670            .read_project(request.payload.project_id, request.sender_id)?
 671            .host_connection_id;
 672        let response = self
 673            .peer
 674            .forward_request(request.sender_id, host, request.payload.clone())
 675            .await?;
 676
 677        let mut guests = self
 678            .state()
 679            .await
 680            .read_project(request.payload.project_id, request.sender_id)?
 681            .connection_ids();
 682        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 683        broadcast(host, guests, |conn_id| {
 684            self.peer.forward_send(host, conn_id, response.clone())
 685        });
 686
 687        Ok(response)
 688    }
 689
 690    async fn update_buffer(
 691        self: Arc<Server>,
 692        request: TypedEnvelope<proto::UpdateBuffer>,
 693    ) -> Result<proto::Ack> {
 694        let receiver_ids = self
 695            .state()
 696            .await
 697            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 698        broadcast(request.sender_id, receiver_ids, |connection_id| {
 699            self.peer
 700                .forward_send(request.sender_id, connection_id, request.payload.clone())
 701        });
 702        Ok(proto::Ack {})
 703    }
 704
 705    async fn update_buffer_file(
 706        self: Arc<Server>,
 707        request: TypedEnvelope<proto::UpdateBufferFile>,
 708    ) -> Result<()> {
 709        let receiver_ids = self
 710            .state()
 711            .await
 712            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 713        broadcast(request.sender_id, receiver_ids, |connection_id| {
 714            self.peer
 715                .forward_send(request.sender_id, connection_id, request.payload.clone())
 716        });
 717        Ok(())
 718    }
 719
 720    async fn buffer_reloaded(
 721        self: Arc<Server>,
 722        request: TypedEnvelope<proto::BufferReloaded>,
 723    ) -> Result<()> {
 724        let receiver_ids = self
 725            .state()
 726            .await
 727            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 728        broadcast(request.sender_id, receiver_ids, |connection_id| {
 729            self.peer
 730                .forward_send(request.sender_id, connection_id, request.payload.clone())
 731        });
 732        Ok(())
 733    }
 734
 735    async fn buffer_saved(
 736        self: Arc<Server>,
 737        request: TypedEnvelope<proto::BufferSaved>,
 738    ) -> Result<()> {
 739        let receiver_ids = self
 740            .state()
 741            .await
 742            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 743        broadcast(request.sender_id, receiver_ids, |connection_id| {
 744            self.peer
 745                .forward_send(request.sender_id, connection_id, request.payload.clone())
 746        });
 747        Ok(())
 748    }
 749
 750    async fn follow(
 751        self: Arc<Self>,
 752        request: TypedEnvelope<proto::Follow>,
 753    ) -> Result<proto::FollowResponse> {
 754        let leader_id = ConnectionId(request.payload.leader_id);
 755        let follower_id = request.sender_id;
 756        if !self
 757            .state()
 758            .await
 759            .project_connection_ids(request.payload.project_id, follower_id)?
 760            .contains(&leader_id)
 761        {
 762            Err(anyhow!("no such peer"))?;
 763        }
 764        let mut response = self
 765            .peer
 766            .forward_request(request.sender_id, leader_id, request.payload)
 767            .await?;
 768        response
 769            .views
 770            .retain(|view| view.leader_id != Some(follower_id.0));
 771        Ok(response)
 772    }
 773
 774    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 775        let leader_id = ConnectionId(request.payload.leader_id);
 776        if !self
 777            .state()
 778            .await
 779            .project_connection_ids(request.payload.project_id, request.sender_id)?
 780            .contains(&leader_id)
 781        {
 782            Err(anyhow!("no such peer"))?;
 783        }
 784        self.peer
 785            .forward_send(request.sender_id, leader_id, request.payload)?;
 786        Ok(())
 787    }
 788
 789    async fn update_followers(
 790        self: Arc<Self>,
 791        request: TypedEnvelope<proto::UpdateFollowers>,
 792    ) -> Result<()> {
 793        let connection_ids = self
 794            .state()
 795            .await
 796            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 797        let leader_id = request
 798            .payload
 799            .variant
 800            .as_ref()
 801            .and_then(|variant| match variant {
 802                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 803                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 804                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 805            });
 806        for follower_id in &request.payload.follower_ids {
 807            let follower_id = ConnectionId(*follower_id);
 808            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 809                self.peer
 810                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 811            }
 812        }
 813        Ok(())
 814    }
 815
 816    async fn get_channels(
 817        self: Arc<Server>,
 818        request: TypedEnvelope<proto::GetChannels>,
 819    ) -> Result<proto::GetChannelsResponse> {
 820        let user_id = self
 821            .state()
 822            .await
 823            .user_id_for_connection(request.sender_id)?;
 824        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 825        Ok(proto::GetChannelsResponse {
 826            channels: channels
 827                .into_iter()
 828                .map(|chan| proto::Channel {
 829                    id: chan.id.to_proto(),
 830                    name: chan.name,
 831                })
 832                .collect(),
 833        })
 834    }
 835
 836    async fn get_users(
 837        self: Arc<Server>,
 838        request: TypedEnvelope<proto::GetUsers>,
 839    ) -> Result<proto::GetUsersResponse> {
 840        let user_ids = request
 841            .payload
 842            .user_ids
 843            .into_iter()
 844            .map(UserId::from_proto)
 845            .collect();
 846        let users = self
 847            .app_state
 848            .db
 849            .get_users_by_ids(user_ids)
 850            .await?
 851            .into_iter()
 852            .map(|user| proto::User {
 853                id: user.id.to_proto(),
 854                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 855                github_login: user.github_login,
 856            })
 857            .collect();
 858        Ok(proto::GetUsersResponse { users })
 859    }
 860
 861    #[instrument(skip(self, state, user_ids))]
 862    fn update_contacts_for_users<'a>(
 863        self: &Arc<Self>,
 864        state: &Store,
 865        user_ids: impl IntoIterator<Item = &'a UserId>,
 866    ) {
 867        for user_id in user_ids {
 868            let contacts = state.contacts_for_user(*user_id);
 869            for connection_id in state.connection_ids_for_user(*user_id) {
 870                self.peer
 871                    .send(
 872                        connection_id,
 873                        proto::UpdateContacts {
 874                            contacts: contacts.clone(),
 875                        },
 876                    )
 877                    .trace_err();
 878            }
 879        }
 880    }
 881
 882    async fn join_channel(
 883        self: Arc<Self>,
 884        request: TypedEnvelope<proto::JoinChannel>,
 885    ) -> Result<proto::JoinChannelResponse> {
 886        let user_id = self
 887            .state()
 888            .await
 889            .user_id_for_connection(request.sender_id)?;
 890        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 891        if !self
 892            .app_state
 893            .db
 894            .can_user_access_channel(user_id, channel_id)
 895            .await?
 896        {
 897            Err(anyhow!("access denied"))?;
 898        }
 899
 900        self.state_mut()
 901            .await
 902            .join_channel(request.sender_id, channel_id);
 903        let messages = self
 904            .app_state
 905            .db
 906            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 907            .await?
 908            .into_iter()
 909            .map(|msg| proto::ChannelMessage {
 910                id: msg.id.to_proto(),
 911                body: msg.body,
 912                timestamp: msg.sent_at.unix_timestamp() as u64,
 913                sender_id: msg.sender_id.to_proto(),
 914                nonce: Some(msg.nonce.as_u128().into()),
 915            })
 916            .collect::<Vec<_>>();
 917        Ok(proto::JoinChannelResponse {
 918            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 919            messages,
 920        })
 921    }
 922
 923    async fn leave_channel(
 924        self: Arc<Self>,
 925        request: TypedEnvelope<proto::LeaveChannel>,
 926    ) -> Result<()> {
 927        let user_id = self
 928            .state()
 929            .await
 930            .user_id_for_connection(request.sender_id)?;
 931        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 932        if !self
 933            .app_state
 934            .db
 935            .can_user_access_channel(user_id, channel_id)
 936            .await?
 937        {
 938            Err(anyhow!("access denied"))?;
 939        }
 940
 941        self.state_mut()
 942            .await
 943            .leave_channel(request.sender_id, channel_id);
 944
 945        Ok(())
 946    }
 947
 948    async fn send_channel_message(
 949        self: Arc<Self>,
 950        request: TypedEnvelope<proto::SendChannelMessage>,
 951    ) -> Result<proto::SendChannelMessageResponse> {
 952        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 953        let user_id;
 954        let connection_ids;
 955        {
 956            let state = self.state().await;
 957            user_id = state.user_id_for_connection(request.sender_id)?;
 958            connection_ids = state.channel_connection_ids(channel_id)?;
 959        }
 960
 961        // Validate the message body.
 962        let body = request.payload.body.trim().to_string();
 963        if body.len() > MAX_MESSAGE_LEN {
 964            return Err(anyhow!("message is too long"))?;
 965        }
 966        if body.is_empty() {
 967            return Err(anyhow!("message can't be blank"))?;
 968        }
 969
 970        let timestamp = OffsetDateTime::now_utc();
 971        let nonce = request
 972            .payload
 973            .nonce
 974            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 975
 976        let message_id = self
 977            .app_state
 978            .db
 979            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 980            .await?
 981            .to_proto();
 982        let message = proto::ChannelMessage {
 983            sender_id: user_id.to_proto(),
 984            id: message_id,
 985            body,
 986            timestamp: timestamp.unix_timestamp() as u64,
 987            nonce: Some(nonce),
 988        };
 989        broadcast(request.sender_id, connection_ids, |conn_id| {
 990            self.peer.send(
 991                conn_id,
 992                proto::ChannelMessageSent {
 993                    channel_id: channel_id.to_proto(),
 994                    message: Some(message.clone()),
 995                },
 996            )
 997        });
 998        Ok(proto::SendChannelMessageResponse {
 999            message: Some(message),
1000        })
1001    }
1002
1003    async fn get_channel_messages(
1004        self: Arc<Self>,
1005        request: TypedEnvelope<proto::GetChannelMessages>,
1006    ) -> Result<proto::GetChannelMessagesResponse> {
1007        let user_id = self
1008            .state()
1009            .await
1010            .user_id_for_connection(request.sender_id)?;
1011        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1012        if !self
1013            .app_state
1014            .db
1015            .can_user_access_channel(user_id, channel_id)
1016            .await?
1017        {
1018            Err(anyhow!("access denied"))?;
1019        }
1020
1021        let messages = self
1022            .app_state
1023            .db
1024            .get_channel_messages(
1025                channel_id,
1026                MESSAGE_COUNT_PER_PAGE,
1027                Some(MessageId::from_proto(request.payload.before_message_id)),
1028            )
1029            .await?
1030            .into_iter()
1031            .map(|msg| proto::ChannelMessage {
1032                id: msg.id.to_proto(),
1033                body: msg.body,
1034                timestamp: msg.sent_at.unix_timestamp() as u64,
1035                sender_id: msg.sender_id.to_proto(),
1036                nonce: Some(msg.nonce.as_u128().into()),
1037            })
1038            .collect::<Vec<_>>();
1039
1040        Ok(proto::GetChannelMessagesResponse {
1041            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1042            messages,
1043        })
1044    }
1045
1046    async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1047        #[cfg(test)]
1048        tokio::task::yield_now().await;
1049        let guard = self.store.read().await;
1050        #[cfg(test)]
1051        tokio::task::yield_now().await;
1052        StoreReadGuard {
1053            guard,
1054            _not_send: PhantomData,
1055        }
1056    }
1057
1058    async fn state_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1059        #[cfg(test)]
1060        tokio::task::yield_now().await;
1061        let guard = self.store.write().await;
1062        #[cfg(test)]
1063        tokio::task::yield_now().await;
1064        StoreWriteGuard {
1065            guard,
1066            _not_send: PhantomData,
1067        }
1068    }
1069}
1070
1071impl<'a> Deref for StoreReadGuard<'a> {
1072    type Target = Store;
1073
1074    fn deref(&self) -> &Self::Target {
1075        &*self.guard
1076    }
1077}
1078
1079impl<'a> Deref for StoreWriteGuard<'a> {
1080    type Target = Store;
1081
1082    fn deref(&self) -> &Self::Target {
1083        &*self.guard
1084    }
1085}
1086
1087impl<'a> DerefMut for StoreWriteGuard<'a> {
1088    fn deref_mut(&mut self) -> &mut Self::Target {
1089        &mut *self.guard
1090    }
1091}
1092
1093impl<'a> Drop for StoreWriteGuard<'a> {
1094    fn drop(&mut self) {
1095        #[cfg(test)]
1096        self.check_invariants();
1097
1098        let metrics = self.metrics();
1099        tracing::info!(
1100            connections = metrics.connections,
1101            registered_projects = metrics.registered_projects,
1102            shared_projects = metrics.shared_projects,
1103            collaborators_per_project = metrics.collaborators_per_project,
1104            "metrics"
1105        );
1106    }
1107}
1108
1109impl Executor for RealExecutor {
1110    type Sleep = Sleep;
1111
1112    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1113        tokio::task::spawn(future);
1114    }
1115
1116    fn sleep(&self, duration: Duration) -> Self::Sleep {
1117        tokio::time::sleep(duration)
1118    }
1119}
1120
1121#[instrument(skip(f))]
1122fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1123where
1124    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1125{
1126    for receiver_id in receiver_ids {
1127        if receiver_id != sender_id {
1128            f(receiver_id).trace_err();
1129        }
1130    }
1131}
1132
1133lazy_static! {
1134    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1135}
1136
1137pub struct ProtocolVersion(u32);
1138
1139impl Header for ProtocolVersion {
1140    fn name() -> &'static HeaderName {
1141        &ZED_PROTOCOL_VERSION
1142    }
1143
1144    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1145    where
1146        Self: Sized,
1147        I: Iterator<Item = &'i axum::http::HeaderValue>,
1148    {
1149        let version = values
1150            .next()
1151            .ok_or_else(|| axum::headers::Error::invalid())?
1152            .to_str()
1153            .map_err(|_| axum::headers::Error::invalid())?
1154            .parse()
1155            .map_err(|_| axum::headers::Error::invalid())?;
1156        Ok(Self(version))
1157    }
1158
1159    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1160        values.extend([self.0.to_string().parse().unwrap()]);
1161    }
1162}
1163
1164pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1165    let server = Server::new(app_state.clone(), None);
1166    Router::new()
1167        .route("/rpc", get(handle_websocket_request))
1168        .layer(
1169            ServiceBuilder::new()
1170                .layer(Extension(app_state))
1171                .layer(middleware::from_fn(auth::validate_header))
1172                .layer(Extension(server)),
1173        )
1174}
1175
1176pub async fn handle_websocket_request(
1177    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1178    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1179    Extension(server): Extension<Arc<Server>>,
1180    Extension(user_id): Extension<UserId>,
1181    ws: WebSocketUpgrade,
1182) -> Response {
1183    if protocol_version != rpc::PROTOCOL_VERSION {
1184        return (
1185            StatusCode::UPGRADE_REQUIRED,
1186            "client must be upgraded".to_string(),
1187        )
1188            .into_response();
1189    }
1190    let socket_address = socket_address.to_string();
1191    ws.on_upgrade(move |socket| {
1192        let socket = socket
1193            .map_ok(to_tungstenite_message)
1194            .err_into()
1195            .with(|message| async move { Ok(to_axum_message(message)) });
1196        let connection = Connection::new(Box::pin(socket));
1197        server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
1198    })
1199}
1200
1201fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1202    match message {
1203        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1204        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1205        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1206        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1207        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1208            code: frame.code.into(),
1209            reason: frame.reason,
1210        })),
1211    }
1212}
1213
1214fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1215    match message {
1216        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1217        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1218        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1219        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1220        AxumMessage::Close(frame) => {
1221            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1222                code: frame.code.into(),
1223                reason: frame.reason,
1224            }))
1225        }
1226    }
1227}
1228
1229pub trait ResultExt {
1230    type Ok;
1231
1232    fn trace_err(self) -> Option<Self::Ok>;
1233}
1234
1235impl<T, E> ResultExt for Result<T, E>
1236where
1237    E: std::fmt::Debug,
1238{
1239    type Ok = T;
1240
1241    fn trace_err(self) -> Option<T> {
1242        match self {
1243            Ok(value) => Some(value),
1244            Err(error) => {
1245                tracing::error!("{:?}", error);
1246                None
1247            }
1248        }
1249    }
1250}
1251
1252#[cfg(test)]
1253mod tests {
1254    use super::*;
1255    use crate::{
1256        db::{tests::TestDb, UserId},
1257        AppState,
1258    };
1259    use ::rpc::Peer;
1260    use client::{
1261        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1262        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1263    };
1264    use collections::BTreeMap;
1265    use editor::{
1266        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1267        ToOffset, ToggleCodeActions, Undo,
1268    };
1269    use gpui::{
1270        executor::{self, Deterministic},
1271        geometry::vector::vec2f,
1272        ModelHandle, TestAppContext, ViewHandle,
1273    };
1274    use language::{
1275        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1276        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1277    };
1278    use lsp::{self, FakeLanguageServer};
1279    use parking_lot::Mutex;
1280    use project::{
1281        fs::{FakeFs, Fs as _},
1282        search::SearchQuery,
1283        worktree::WorktreeHandle,
1284        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1285    };
1286    use rand::prelude::*;
1287    use rpc::PeerId;
1288    use serde_json::json;
1289    use settings::Settings;
1290    use sqlx::types::time::OffsetDateTime;
1291    use std::{
1292        env,
1293        ops::Deref,
1294        path::{Path, PathBuf},
1295        rc::Rc,
1296        sync::{
1297            atomic::{AtomicBool, Ordering::SeqCst},
1298            Arc,
1299        },
1300        time::Duration,
1301    };
1302    use theme::ThemeRegistry;
1303    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1304
1305    #[cfg(test)]
1306    #[ctor::ctor]
1307    fn init_logger() {
1308        if std::env::var("RUST_LOG").is_ok() {
1309            env_logger::init();
1310        }
1311    }
1312
1313    #[gpui::test(iterations = 10)]
1314    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1315        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1316        let lang_registry = Arc::new(LanguageRegistry::test());
1317        let fs = FakeFs::new(cx_a.background());
1318        cx_a.foreground().forbid_parking();
1319
1320        // Connect to a server as 2 clients.
1321        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1322        let client_a = server.create_client(cx_a, "user_a").await;
1323        let client_b = server.create_client(cx_b, "user_b").await;
1324
1325        // Share a project as client A
1326        fs.insert_tree(
1327            "/a",
1328            json!({
1329                ".zed.toml": r#"collaborators = ["user_b"]"#,
1330                "a.txt": "a-contents",
1331                "b.txt": "b-contents",
1332            }),
1333        )
1334        .await;
1335        let project_a = cx_a.update(|cx| {
1336            Project::local(
1337                client_a.clone(),
1338                client_a.user_store.clone(),
1339                lang_registry.clone(),
1340                fs.clone(),
1341                cx,
1342            )
1343        });
1344        let (worktree_a, _) = project_a
1345            .update(cx_a, |p, cx| {
1346                p.find_or_create_local_worktree("/a", true, cx)
1347            })
1348            .await
1349            .unwrap();
1350        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1351        worktree_a
1352            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1353            .await;
1354        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1355        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1356
1357        // Join that project as client B
1358        let project_b = Project::remote(
1359            project_id,
1360            client_b.clone(),
1361            client_b.user_store.clone(),
1362            lang_registry.clone(),
1363            fs.clone(),
1364            &mut cx_b.to_async(),
1365        )
1366        .await
1367        .unwrap();
1368
1369        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1370            assert_eq!(
1371                project
1372                    .collaborators()
1373                    .get(&client_a.peer_id)
1374                    .unwrap()
1375                    .user
1376                    .github_login,
1377                "user_a"
1378            );
1379            project.replica_id()
1380        });
1381        project_a
1382            .condition(&cx_a, |tree, _| {
1383                tree.collaborators()
1384                    .get(&client_b.peer_id)
1385                    .map_or(false, |collaborator| {
1386                        collaborator.replica_id == replica_id_b
1387                            && collaborator.user.github_login == "user_b"
1388                    })
1389            })
1390            .await;
1391
1392        // Open the same file as client B and client A.
1393        let buffer_b = project_b
1394            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1395            .await
1396            .unwrap();
1397        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1398        project_a.read_with(cx_a, |project, cx| {
1399            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1400        });
1401        let buffer_a = project_a
1402            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1403            .await
1404            .unwrap();
1405
1406        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1407
1408        // TODO
1409        // // Create a selection set as client B and see that selection set as client A.
1410        // buffer_a
1411        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1412        //     .await;
1413
1414        // Edit the buffer as client B and see that edit as client A.
1415        editor_b.update(cx_b, |editor, cx| {
1416            editor.handle_input(&Input("ok, ".into()), cx)
1417        });
1418        buffer_a
1419            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1420            .await;
1421
1422        // TODO
1423        // // Remove the selection set as client B, see those selections disappear as client A.
1424        cx_b.update(move |_| drop(editor_b));
1425        // buffer_a
1426        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1427        //     .await;
1428
1429        // Dropping the client B's project removes client B from client A's collaborators.
1430        cx_b.update(move |_| drop(project_b));
1431        project_a
1432            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1433            .await;
1434    }
1435
1436    #[gpui::test(iterations = 10)]
1437    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1438        let lang_registry = Arc::new(LanguageRegistry::test());
1439        let fs = FakeFs::new(cx_a.background());
1440        cx_a.foreground().forbid_parking();
1441
1442        // Connect to a server as 2 clients.
1443        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1444        let client_a = server.create_client(cx_a, "user_a").await;
1445        let client_b = server.create_client(cx_b, "user_b").await;
1446
1447        // Share a project as client A
1448        fs.insert_tree(
1449            "/a",
1450            json!({
1451                ".zed.toml": r#"collaborators = ["user_b"]"#,
1452                "a.txt": "a-contents",
1453                "b.txt": "b-contents",
1454            }),
1455        )
1456        .await;
1457        let project_a = cx_a.update(|cx| {
1458            Project::local(
1459                client_a.clone(),
1460                client_a.user_store.clone(),
1461                lang_registry.clone(),
1462                fs.clone(),
1463                cx,
1464            )
1465        });
1466        let (worktree_a, _) = project_a
1467            .update(cx_a, |p, cx| {
1468                p.find_or_create_local_worktree("/a", true, cx)
1469            })
1470            .await
1471            .unwrap();
1472        worktree_a
1473            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1474            .await;
1475        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1476        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1477        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1478        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1479
1480        // Join that project as client B
1481        let project_b = Project::remote(
1482            project_id,
1483            client_b.clone(),
1484            client_b.user_store.clone(),
1485            lang_registry.clone(),
1486            fs.clone(),
1487            &mut cx_b.to_async(),
1488        )
1489        .await
1490        .unwrap();
1491        project_b
1492            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1493            .await
1494            .unwrap();
1495
1496        // Unshare the project as client A
1497        project_a.update(cx_a, |project, cx| project.unshare(cx));
1498        project_b
1499            .condition(cx_b, |project, _| project.is_read_only())
1500            .await;
1501        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1502        cx_b.update(|_| {
1503            drop(project_b);
1504        });
1505
1506        // Share the project again and ensure guests can still join.
1507        project_a
1508            .update(cx_a, |project, cx| project.share(cx))
1509            .await
1510            .unwrap();
1511        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1512
1513        let project_b2 = Project::remote(
1514            project_id,
1515            client_b.clone(),
1516            client_b.user_store.clone(),
1517            lang_registry.clone(),
1518            fs.clone(),
1519            &mut cx_b.to_async(),
1520        )
1521        .await
1522        .unwrap();
1523        project_b2
1524            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1525            .await
1526            .unwrap();
1527    }
1528
1529    #[gpui::test(iterations = 10)]
1530    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1531        let lang_registry = Arc::new(LanguageRegistry::test());
1532        let fs = FakeFs::new(cx_a.background());
1533        cx_a.foreground().forbid_parking();
1534
1535        // Connect to a server as 2 clients.
1536        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1537        let client_a = server.create_client(cx_a, "user_a").await;
1538        let client_b = server.create_client(cx_b, "user_b").await;
1539
1540        // Share a project as client A
1541        fs.insert_tree(
1542            "/a",
1543            json!({
1544                ".zed.toml": r#"collaborators = ["user_b"]"#,
1545                "a.txt": "a-contents",
1546                "b.txt": "b-contents",
1547            }),
1548        )
1549        .await;
1550        let project_a = cx_a.update(|cx| {
1551            Project::local(
1552                client_a.clone(),
1553                client_a.user_store.clone(),
1554                lang_registry.clone(),
1555                fs.clone(),
1556                cx,
1557            )
1558        });
1559        let (worktree_a, _) = project_a
1560            .update(cx_a, |p, cx| {
1561                p.find_or_create_local_worktree("/a", true, cx)
1562            })
1563            .await
1564            .unwrap();
1565        worktree_a
1566            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1567            .await;
1568        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1569        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1570        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1571        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1572
1573        // Join that project as client B
1574        let project_b = Project::remote(
1575            project_id,
1576            client_b.clone(),
1577            client_b.user_store.clone(),
1578            lang_registry.clone(),
1579            fs.clone(),
1580            &mut cx_b.to_async(),
1581        )
1582        .await
1583        .unwrap();
1584        project_b
1585            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1586            .await
1587            .unwrap();
1588
1589        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1590        server.disconnect_client(client_a.current_user_id(cx_a));
1591        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1592        project_a
1593            .condition(cx_a, |project, _| project.collaborators().is_empty())
1594            .await;
1595        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1596        project_b
1597            .condition(cx_b, |project, _| project.is_read_only())
1598            .await;
1599        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1600        cx_b.update(|_| {
1601            drop(project_b);
1602        });
1603
1604        // Await reconnection
1605        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1606
1607        // Share the project again and ensure guests can still join.
1608        project_a
1609            .update(cx_a, |project, cx| project.share(cx))
1610            .await
1611            .unwrap();
1612        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1613
1614        let project_b2 = Project::remote(
1615            project_id,
1616            client_b.clone(),
1617            client_b.user_store.clone(),
1618            lang_registry.clone(),
1619            fs.clone(),
1620            &mut cx_b.to_async(),
1621        )
1622        .await
1623        .unwrap();
1624        project_b2
1625            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1626            .await
1627            .unwrap();
1628    }
1629
1630    #[gpui::test(iterations = 10)]
1631    async fn test_propagate_saves_and_fs_changes(
1632        cx_a: &mut TestAppContext,
1633        cx_b: &mut TestAppContext,
1634        cx_c: &mut TestAppContext,
1635    ) {
1636        let lang_registry = Arc::new(LanguageRegistry::test());
1637        let fs = FakeFs::new(cx_a.background());
1638        cx_a.foreground().forbid_parking();
1639
1640        // Connect to a server as 3 clients.
1641        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1642        let client_a = server.create_client(cx_a, "user_a").await;
1643        let client_b = server.create_client(cx_b, "user_b").await;
1644        let client_c = server.create_client(cx_c, "user_c").await;
1645
1646        // Share a worktree as client A.
1647        fs.insert_tree(
1648            "/a",
1649            json!({
1650                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1651                "file1": "",
1652                "file2": ""
1653            }),
1654        )
1655        .await;
1656        let project_a = cx_a.update(|cx| {
1657            Project::local(
1658                client_a.clone(),
1659                client_a.user_store.clone(),
1660                lang_registry.clone(),
1661                fs.clone(),
1662                cx,
1663            )
1664        });
1665        let (worktree_a, _) = project_a
1666            .update(cx_a, |p, cx| {
1667                p.find_or_create_local_worktree("/a", true, cx)
1668            })
1669            .await
1670            .unwrap();
1671        worktree_a
1672            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1673            .await;
1674        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1675        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1676        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1677
1678        // Join that worktree as clients B and C.
1679        let project_b = Project::remote(
1680            project_id,
1681            client_b.clone(),
1682            client_b.user_store.clone(),
1683            lang_registry.clone(),
1684            fs.clone(),
1685            &mut cx_b.to_async(),
1686        )
1687        .await
1688        .unwrap();
1689        let project_c = Project::remote(
1690            project_id,
1691            client_c.clone(),
1692            client_c.user_store.clone(),
1693            lang_registry.clone(),
1694            fs.clone(),
1695            &mut cx_c.to_async(),
1696        )
1697        .await
1698        .unwrap();
1699        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1700        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1701
1702        // Open and edit a buffer as both guests B and C.
1703        let buffer_b = project_b
1704            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1705            .await
1706            .unwrap();
1707        let buffer_c = project_c
1708            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1709            .await
1710            .unwrap();
1711        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1712        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1713
1714        // Open and edit that buffer as the host.
1715        let buffer_a = project_a
1716            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1717            .await
1718            .unwrap();
1719
1720        buffer_a
1721            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1722            .await;
1723        buffer_a.update(cx_a, |buf, cx| {
1724            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1725        });
1726
1727        // Wait for edits to propagate
1728        buffer_a
1729            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1730            .await;
1731        buffer_b
1732            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1733            .await;
1734        buffer_c
1735            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1736            .await;
1737
1738        // Edit the buffer as the host and concurrently save as guest B.
1739        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1740        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1741        save_b.await.unwrap();
1742        assert_eq!(
1743            fs.load("/a/file1".as_ref()).await.unwrap(),
1744            "hi-a, i-am-c, i-am-b, i-am-a"
1745        );
1746        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1747        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1748        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1749
1750        worktree_a.flush_fs_events(cx_a).await;
1751
1752        // Make changes on host's file system, see those changes on guest worktrees.
1753        fs.rename(
1754            "/a/file1".as_ref(),
1755            "/a/file1-renamed".as_ref(),
1756            Default::default(),
1757        )
1758        .await
1759        .unwrap();
1760
1761        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1762            .await
1763            .unwrap();
1764        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1765
1766        worktree_a
1767            .condition(&cx_a, |tree, _| {
1768                tree.paths()
1769                    .map(|p| p.to_string_lossy())
1770                    .collect::<Vec<_>>()
1771                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1772            })
1773            .await;
1774        worktree_b
1775            .condition(&cx_b, |tree, _| {
1776                tree.paths()
1777                    .map(|p| p.to_string_lossy())
1778                    .collect::<Vec<_>>()
1779                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1780            })
1781            .await;
1782        worktree_c
1783            .condition(&cx_c, |tree, _| {
1784                tree.paths()
1785                    .map(|p| p.to_string_lossy())
1786                    .collect::<Vec<_>>()
1787                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1788            })
1789            .await;
1790
1791        // Ensure buffer files are updated as well.
1792        buffer_a
1793            .condition(&cx_a, |buf, _| {
1794                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1795            })
1796            .await;
1797        buffer_b
1798            .condition(&cx_b, |buf, _| {
1799                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1800            })
1801            .await;
1802        buffer_c
1803            .condition(&cx_c, |buf, _| {
1804                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1805            })
1806            .await;
1807    }
1808
1809    #[gpui::test(iterations = 10)]
1810    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1811        cx_a.foreground().forbid_parking();
1812        let lang_registry = Arc::new(LanguageRegistry::test());
1813        let fs = FakeFs::new(cx_a.background());
1814
1815        // Connect to a server as 2 clients.
1816        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1817        let client_a = server.create_client(cx_a, "user_a").await;
1818        let client_b = server.create_client(cx_b, "user_b").await;
1819
1820        // Share a project as client A
1821        fs.insert_tree(
1822            "/dir",
1823            json!({
1824                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1825                "a.txt": "a-contents",
1826            }),
1827        )
1828        .await;
1829
1830        let project_a = cx_a.update(|cx| {
1831            Project::local(
1832                client_a.clone(),
1833                client_a.user_store.clone(),
1834                lang_registry.clone(),
1835                fs.clone(),
1836                cx,
1837            )
1838        });
1839        let (worktree_a, _) = project_a
1840            .update(cx_a, |p, cx| {
1841                p.find_or_create_local_worktree("/dir", true, cx)
1842            })
1843            .await
1844            .unwrap();
1845        worktree_a
1846            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1847            .await;
1848        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1849        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1850        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1851
1852        // Join that project as client B
1853        let project_b = Project::remote(
1854            project_id,
1855            client_b.clone(),
1856            client_b.user_store.clone(),
1857            lang_registry.clone(),
1858            fs.clone(),
1859            &mut cx_b.to_async(),
1860        )
1861        .await
1862        .unwrap();
1863
1864        // Open a buffer as client B
1865        let buffer_b = project_b
1866            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1867            .await
1868            .unwrap();
1869
1870        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1871        buffer_b.read_with(cx_b, |buf, _| {
1872            assert!(buf.is_dirty());
1873            assert!(!buf.has_conflict());
1874        });
1875
1876        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1877        buffer_b
1878            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1879            .await;
1880        buffer_b.read_with(cx_b, |buf, _| {
1881            assert!(!buf.has_conflict());
1882        });
1883
1884        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1885        buffer_b.read_with(cx_b, |buf, _| {
1886            assert!(buf.is_dirty());
1887            assert!(!buf.has_conflict());
1888        });
1889    }
1890
1891    #[gpui::test(iterations = 10)]
1892    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1893        cx_a.foreground().forbid_parking();
1894        let lang_registry = Arc::new(LanguageRegistry::test());
1895        let fs = FakeFs::new(cx_a.background());
1896
1897        // Connect to a server as 2 clients.
1898        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1899        let client_a = server.create_client(cx_a, "user_a").await;
1900        let client_b = server.create_client(cx_b, "user_b").await;
1901
1902        // Share a project as client A
1903        fs.insert_tree(
1904            "/dir",
1905            json!({
1906                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1907                "a.txt": "a-contents",
1908            }),
1909        )
1910        .await;
1911
1912        let project_a = cx_a.update(|cx| {
1913            Project::local(
1914                client_a.clone(),
1915                client_a.user_store.clone(),
1916                lang_registry.clone(),
1917                fs.clone(),
1918                cx,
1919            )
1920        });
1921        let (worktree_a, _) = project_a
1922            .update(cx_a, |p, cx| {
1923                p.find_or_create_local_worktree("/dir", true, cx)
1924            })
1925            .await
1926            .unwrap();
1927        worktree_a
1928            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1929            .await;
1930        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1931        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1932        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1933
1934        // Join that project as client B
1935        let project_b = Project::remote(
1936            project_id,
1937            client_b.clone(),
1938            client_b.user_store.clone(),
1939            lang_registry.clone(),
1940            fs.clone(),
1941            &mut cx_b.to_async(),
1942        )
1943        .await
1944        .unwrap();
1945        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1946
1947        // Open a buffer as client B
1948        let buffer_b = project_b
1949            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1950            .await
1951            .unwrap();
1952        buffer_b.read_with(cx_b, |buf, _| {
1953            assert!(!buf.is_dirty());
1954            assert!(!buf.has_conflict());
1955        });
1956
1957        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1958            .await
1959            .unwrap();
1960        buffer_b
1961            .condition(&cx_b, |buf, _| {
1962                buf.text() == "new contents" && !buf.is_dirty()
1963            })
1964            .await;
1965        buffer_b.read_with(cx_b, |buf, _| {
1966            assert!(!buf.has_conflict());
1967        });
1968    }
1969
1970    #[gpui::test(iterations = 10)]
1971    async fn test_editing_while_guest_opens_buffer(
1972        cx_a: &mut TestAppContext,
1973        cx_b: &mut TestAppContext,
1974    ) {
1975        cx_a.foreground().forbid_parking();
1976        let lang_registry = Arc::new(LanguageRegistry::test());
1977        let fs = FakeFs::new(cx_a.background());
1978
1979        // Connect to a server as 2 clients.
1980        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1981        let client_a = server.create_client(cx_a, "user_a").await;
1982        let client_b = server.create_client(cx_b, "user_b").await;
1983
1984        // Share a project as client A
1985        fs.insert_tree(
1986            "/dir",
1987            json!({
1988                ".zed.toml": r#"collaborators = ["user_b"]"#,
1989                "a.txt": "a-contents",
1990            }),
1991        )
1992        .await;
1993        let project_a = cx_a.update(|cx| {
1994            Project::local(
1995                client_a.clone(),
1996                client_a.user_store.clone(),
1997                lang_registry.clone(),
1998                fs.clone(),
1999                cx,
2000            )
2001        });
2002        let (worktree_a, _) = project_a
2003            .update(cx_a, |p, cx| {
2004                p.find_or_create_local_worktree("/dir", true, cx)
2005            })
2006            .await
2007            .unwrap();
2008        worktree_a
2009            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2010            .await;
2011        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2012        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2013        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2014
2015        // Join that project as client B
2016        let project_b = Project::remote(
2017            project_id,
2018            client_b.clone(),
2019            client_b.user_store.clone(),
2020            lang_registry.clone(),
2021            fs.clone(),
2022            &mut cx_b.to_async(),
2023        )
2024        .await
2025        .unwrap();
2026
2027        // Open a buffer as client A
2028        let buffer_a = project_a
2029            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2030            .await
2031            .unwrap();
2032
2033        // Start opening the same buffer as client B
2034        let buffer_b = cx_b
2035            .background()
2036            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2037
2038        // Edit the buffer as client A while client B is still opening it.
2039        cx_b.background().simulate_random_delay().await;
2040        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
2041        cx_b.background().simulate_random_delay().await;
2042        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
2043
2044        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2045        let buffer_b = buffer_b.await.unwrap();
2046        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2047    }
2048
2049    #[gpui::test(iterations = 10)]
2050    async fn test_leaving_worktree_while_opening_buffer(
2051        cx_a: &mut TestAppContext,
2052        cx_b: &mut TestAppContext,
2053    ) {
2054        cx_a.foreground().forbid_parking();
2055        let lang_registry = Arc::new(LanguageRegistry::test());
2056        let fs = FakeFs::new(cx_a.background());
2057
2058        // Connect to a server as 2 clients.
2059        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2060        let client_a = server.create_client(cx_a, "user_a").await;
2061        let client_b = server.create_client(cx_b, "user_b").await;
2062
2063        // Share a project as client A
2064        fs.insert_tree(
2065            "/dir",
2066            json!({
2067                ".zed.toml": r#"collaborators = ["user_b"]"#,
2068                "a.txt": "a-contents",
2069            }),
2070        )
2071        .await;
2072        let project_a = cx_a.update(|cx| {
2073            Project::local(
2074                client_a.clone(),
2075                client_a.user_store.clone(),
2076                lang_registry.clone(),
2077                fs.clone(),
2078                cx,
2079            )
2080        });
2081        let (worktree_a, _) = project_a
2082            .update(cx_a, |p, cx| {
2083                p.find_or_create_local_worktree("/dir", true, cx)
2084            })
2085            .await
2086            .unwrap();
2087        worktree_a
2088            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2089            .await;
2090        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2091        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2092        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2093
2094        // Join that project as client B
2095        let project_b = Project::remote(
2096            project_id,
2097            client_b.clone(),
2098            client_b.user_store.clone(),
2099            lang_registry.clone(),
2100            fs.clone(),
2101            &mut cx_b.to_async(),
2102        )
2103        .await
2104        .unwrap();
2105
2106        // See that a guest has joined as client A.
2107        project_a
2108            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2109            .await;
2110
2111        // Begin opening a buffer as client B, but leave the project before the open completes.
2112        let buffer_b = cx_b
2113            .background()
2114            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2115        cx_b.update(|_| drop(project_b));
2116        drop(buffer_b);
2117
2118        // See that the guest has left.
2119        project_a
2120            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2121            .await;
2122    }
2123
2124    #[gpui::test(iterations = 10)]
2125    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2126        cx_a.foreground().forbid_parking();
2127        let lang_registry = Arc::new(LanguageRegistry::test());
2128        let fs = FakeFs::new(cx_a.background());
2129
2130        // Connect to a server as 2 clients.
2131        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2132        let client_a = server.create_client(cx_a, "user_a").await;
2133        let client_b = server.create_client(cx_b, "user_b").await;
2134
2135        // Share a project as client A
2136        fs.insert_tree(
2137            "/a",
2138            json!({
2139                ".zed.toml": r#"collaborators = ["user_b"]"#,
2140                "a.txt": "a-contents",
2141                "b.txt": "b-contents",
2142            }),
2143        )
2144        .await;
2145        let project_a = cx_a.update(|cx| {
2146            Project::local(
2147                client_a.clone(),
2148                client_a.user_store.clone(),
2149                lang_registry.clone(),
2150                fs.clone(),
2151                cx,
2152            )
2153        });
2154        let (worktree_a, _) = project_a
2155            .update(cx_a, |p, cx| {
2156                p.find_or_create_local_worktree("/a", true, cx)
2157            })
2158            .await
2159            .unwrap();
2160        worktree_a
2161            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2162            .await;
2163        let project_id = project_a
2164            .update(cx_a, |project, _| project.next_remote_id())
2165            .await;
2166        project_a
2167            .update(cx_a, |project, cx| project.share(cx))
2168            .await
2169            .unwrap();
2170
2171        // Join that project as client B
2172        let _project_b = Project::remote(
2173            project_id,
2174            client_b.clone(),
2175            client_b.user_store.clone(),
2176            lang_registry.clone(),
2177            fs.clone(),
2178            &mut cx_b.to_async(),
2179        )
2180        .await
2181        .unwrap();
2182
2183        // Client A sees that a guest has joined.
2184        project_a
2185            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2186            .await;
2187
2188        // Drop client B's connection and ensure client A observes client B leaving the project.
2189        client_b.disconnect(&cx_b.to_async()).unwrap();
2190        project_a
2191            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2192            .await;
2193
2194        // Rejoin the project as client B
2195        let _project_b = Project::remote(
2196            project_id,
2197            client_b.clone(),
2198            client_b.user_store.clone(),
2199            lang_registry.clone(),
2200            fs.clone(),
2201            &mut cx_b.to_async(),
2202        )
2203        .await
2204        .unwrap();
2205
2206        // Client A sees that a guest has re-joined.
2207        project_a
2208            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2209            .await;
2210
2211        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2212        client_b.wait_for_current_user(cx_b).await;
2213        server.disconnect_client(client_b.current_user_id(cx_b));
2214        cx_a.foreground().advance_clock(Duration::from_secs(3));
2215        project_a
2216            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2217            .await;
2218    }
2219
2220    #[gpui::test(iterations = 10)]
2221    async fn test_collaborating_with_diagnostics(
2222        cx_a: &mut TestAppContext,
2223        cx_b: &mut TestAppContext,
2224    ) {
2225        cx_a.foreground().forbid_parking();
2226        let lang_registry = Arc::new(LanguageRegistry::test());
2227        let fs = FakeFs::new(cx_a.background());
2228
2229        // Set up a fake language server.
2230        let mut language = Language::new(
2231            LanguageConfig {
2232                name: "Rust".into(),
2233                path_suffixes: vec!["rs".to_string()],
2234                ..Default::default()
2235            },
2236            Some(tree_sitter_rust::language()),
2237        );
2238        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2239        lang_registry.add(Arc::new(language));
2240
2241        // Connect to a server as 2 clients.
2242        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2243        let client_a = server.create_client(cx_a, "user_a").await;
2244        let client_b = server.create_client(cx_b, "user_b").await;
2245
2246        // Share a project as client A
2247        fs.insert_tree(
2248            "/a",
2249            json!({
2250                ".zed.toml": r#"collaborators = ["user_b"]"#,
2251                "a.rs": "let one = two",
2252                "other.rs": "",
2253            }),
2254        )
2255        .await;
2256        let project_a = cx_a.update(|cx| {
2257            Project::local(
2258                client_a.clone(),
2259                client_a.user_store.clone(),
2260                lang_registry.clone(),
2261                fs.clone(),
2262                cx,
2263            )
2264        });
2265        let (worktree_a, _) = project_a
2266            .update(cx_a, |p, cx| {
2267                p.find_or_create_local_worktree("/a", true, cx)
2268            })
2269            .await
2270            .unwrap();
2271        worktree_a
2272            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2273            .await;
2274        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2275        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2276        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2277
2278        // Cause the language server to start.
2279        let _ = cx_a
2280            .background()
2281            .spawn(project_a.update(cx_a, |project, cx| {
2282                project.open_buffer(
2283                    ProjectPath {
2284                        worktree_id,
2285                        path: Path::new("other.rs").into(),
2286                    },
2287                    cx,
2288                )
2289            }))
2290            .await
2291            .unwrap();
2292
2293        // Simulate a language server reporting errors for a file.
2294        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2295        fake_language_server
2296            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2297            .await;
2298        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2299            lsp::PublishDiagnosticsParams {
2300                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2301                version: None,
2302                diagnostics: vec![lsp::Diagnostic {
2303                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2304                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2305                    message: "message 1".to_string(),
2306                    ..Default::default()
2307                }],
2308            },
2309        );
2310
2311        // Wait for server to see the diagnostics update.
2312        server
2313            .condition(|store| {
2314                let worktree = store
2315                    .project(project_id)
2316                    .unwrap()
2317                    .share
2318                    .as_ref()
2319                    .unwrap()
2320                    .worktrees
2321                    .get(&worktree_id.to_proto())
2322                    .unwrap();
2323
2324                !worktree.diagnostic_summaries.is_empty()
2325            })
2326            .await;
2327
2328        // Join the worktree as client B.
2329        let project_b = Project::remote(
2330            project_id,
2331            client_b.clone(),
2332            client_b.user_store.clone(),
2333            lang_registry.clone(),
2334            fs.clone(),
2335            &mut cx_b.to_async(),
2336        )
2337        .await
2338        .unwrap();
2339
2340        project_b.read_with(cx_b, |project, cx| {
2341            assert_eq!(
2342                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2343                &[(
2344                    ProjectPath {
2345                        worktree_id,
2346                        path: Arc::from(Path::new("a.rs")),
2347                    },
2348                    DiagnosticSummary {
2349                        error_count: 1,
2350                        warning_count: 0,
2351                        ..Default::default()
2352                    },
2353                )]
2354            )
2355        });
2356
2357        // Simulate a language server reporting more errors for a file.
2358        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2359            lsp::PublishDiagnosticsParams {
2360                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2361                version: None,
2362                diagnostics: vec![
2363                    lsp::Diagnostic {
2364                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2365                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2366                        message: "message 1".to_string(),
2367                        ..Default::default()
2368                    },
2369                    lsp::Diagnostic {
2370                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2371                        range: lsp::Range::new(
2372                            lsp::Position::new(0, 10),
2373                            lsp::Position::new(0, 13),
2374                        ),
2375                        message: "message 2".to_string(),
2376                        ..Default::default()
2377                    },
2378                ],
2379            },
2380        );
2381
2382        // Client b gets the updated summaries
2383        project_b
2384            .condition(&cx_b, |project, cx| {
2385                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2386                    == &[(
2387                        ProjectPath {
2388                            worktree_id,
2389                            path: Arc::from(Path::new("a.rs")),
2390                        },
2391                        DiagnosticSummary {
2392                            error_count: 1,
2393                            warning_count: 1,
2394                            ..Default::default()
2395                        },
2396                    )]
2397            })
2398            .await;
2399
2400        // Open the file with the errors on client B. They should be present.
2401        let buffer_b = cx_b
2402            .background()
2403            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2404            .await
2405            .unwrap();
2406
2407        buffer_b.read_with(cx_b, |buffer, _| {
2408            assert_eq!(
2409                buffer
2410                    .snapshot()
2411                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2412                    .map(|entry| entry)
2413                    .collect::<Vec<_>>(),
2414                &[
2415                    DiagnosticEntry {
2416                        range: Point::new(0, 4)..Point::new(0, 7),
2417                        diagnostic: Diagnostic {
2418                            group_id: 0,
2419                            message: "message 1".to_string(),
2420                            severity: lsp::DiagnosticSeverity::ERROR,
2421                            is_primary: true,
2422                            ..Default::default()
2423                        }
2424                    },
2425                    DiagnosticEntry {
2426                        range: Point::new(0, 10)..Point::new(0, 13),
2427                        diagnostic: Diagnostic {
2428                            group_id: 1,
2429                            severity: lsp::DiagnosticSeverity::WARNING,
2430                            message: "message 2".to_string(),
2431                            is_primary: true,
2432                            ..Default::default()
2433                        }
2434                    }
2435                ]
2436            );
2437        });
2438
2439        // Simulate a language server reporting no errors for a file.
2440        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2441            lsp::PublishDiagnosticsParams {
2442                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2443                version: None,
2444                diagnostics: vec![],
2445            },
2446        );
2447        project_a
2448            .condition(cx_a, |project, cx| {
2449                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2450            })
2451            .await;
2452        project_b
2453            .condition(cx_b, |project, cx| {
2454                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2455            })
2456            .await;
2457    }
2458
2459    #[gpui::test(iterations = 10)]
2460    async fn test_collaborating_with_completion(
2461        cx_a: &mut TestAppContext,
2462        cx_b: &mut TestAppContext,
2463    ) {
2464        cx_a.foreground().forbid_parking();
2465        let lang_registry = Arc::new(LanguageRegistry::test());
2466        let fs = FakeFs::new(cx_a.background());
2467
2468        // Set up a fake language server.
2469        let mut language = Language::new(
2470            LanguageConfig {
2471                name: "Rust".into(),
2472                path_suffixes: vec!["rs".to_string()],
2473                ..Default::default()
2474            },
2475            Some(tree_sitter_rust::language()),
2476        );
2477        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2478            capabilities: lsp::ServerCapabilities {
2479                completion_provider: Some(lsp::CompletionOptions {
2480                    trigger_characters: Some(vec![".".to_string()]),
2481                    ..Default::default()
2482                }),
2483                ..Default::default()
2484            },
2485            ..Default::default()
2486        });
2487        lang_registry.add(Arc::new(language));
2488
2489        // Connect to a server as 2 clients.
2490        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2491        let client_a = server.create_client(cx_a, "user_a").await;
2492        let client_b = server.create_client(cx_b, "user_b").await;
2493
2494        // Share a project as client A
2495        fs.insert_tree(
2496            "/a",
2497            json!({
2498                ".zed.toml": r#"collaborators = ["user_b"]"#,
2499                "main.rs": "fn main() { a }",
2500                "other.rs": "",
2501            }),
2502        )
2503        .await;
2504        let project_a = cx_a.update(|cx| {
2505            Project::local(
2506                client_a.clone(),
2507                client_a.user_store.clone(),
2508                lang_registry.clone(),
2509                fs.clone(),
2510                cx,
2511            )
2512        });
2513        let (worktree_a, _) = project_a
2514            .update(cx_a, |p, cx| {
2515                p.find_or_create_local_worktree("/a", true, cx)
2516            })
2517            .await
2518            .unwrap();
2519        worktree_a
2520            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2521            .await;
2522        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2523        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2524        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2525
2526        // Join the worktree as client B.
2527        let project_b = Project::remote(
2528            project_id,
2529            client_b.clone(),
2530            client_b.user_store.clone(),
2531            lang_registry.clone(),
2532            fs.clone(),
2533            &mut cx_b.to_async(),
2534        )
2535        .await
2536        .unwrap();
2537
2538        // Open a file in an editor as the guest.
2539        let buffer_b = project_b
2540            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2541            .await
2542            .unwrap();
2543        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2544        let editor_b = cx_b.add_view(window_b, |cx| {
2545            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2546        });
2547
2548        let fake_language_server = fake_language_servers.next().await.unwrap();
2549        buffer_b
2550            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2551            .await;
2552
2553        // Type a completion trigger character as the guest.
2554        editor_b.update(cx_b, |editor, cx| {
2555            editor.select_ranges([13..13], None, cx);
2556            editor.handle_input(&Input(".".into()), cx);
2557            cx.focus(&editor_b);
2558        });
2559
2560        // Receive a completion request as the host's language server.
2561        // Return some completions from the host's language server.
2562        cx_a.foreground().start_waiting();
2563        fake_language_server
2564            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2565                assert_eq!(
2566                    params.text_document_position.text_document.uri,
2567                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2568                );
2569                assert_eq!(
2570                    params.text_document_position.position,
2571                    lsp::Position::new(0, 14),
2572                );
2573
2574                Ok(Some(lsp::CompletionResponse::Array(vec![
2575                    lsp::CompletionItem {
2576                        label: "first_method(…)".into(),
2577                        detail: Some("fn(&mut self, B) -> C".into()),
2578                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2579                            new_text: "first_method($1)".to_string(),
2580                            range: lsp::Range::new(
2581                                lsp::Position::new(0, 14),
2582                                lsp::Position::new(0, 14),
2583                            ),
2584                        })),
2585                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2586                        ..Default::default()
2587                    },
2588                    lsp::CompletionItem {
2589                        label: "second_method(…)".into(),
2590                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2591                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2592                            new_text: "second_method()".to_string(),
2593                            range: lsp::Range::new(
2594                                lsp::Position::new(0, 14),
2595                                lsp::Position::new(0, 14),
2596                            ),
2597                        })),
2598                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2599                        ..Default::default()
2600                    },
2601                ])))
2602            })
2603            .next()
2604            .await
2605            .unwrap();
2606        cx_a.foreground().finish_waiting();
2607
2608        // Open the buffer on the host.
2609        let buffer_a = project_a
2610            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2611            .await
2612            .unwrap();
2613        buffer_a
2614            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2615            .await;
2616
2617        // Confirm a completion on the guest.
2618        editor_b
2619            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2620            .await;
2621        editor_b.update(cx_b, |editor, cx| {
2622            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2623            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2624        });
2625
2626        // Return a resolved completion from the host's language server.
2627        // The resolved completion has an additional text edit.
2628        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2629            |params, _| async move {
2630                assert_eq!(params.label, "first_method(…)");
2631                Ok(lsp::CompletionItem {
2632                    label: "first_method(…)".into(),
2633                    detail: Some("fn(&mut self, B) -> C".into()),
2634                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2635                        new_text: "first_method($1)".to_string(),
2636                        range: lsp::Range::new(
2637                            lsp::Position::new(0, 14),
2638                            lsp::Position::new(0, 14),
2639                        ),
2640                    })),
2641                    additional_text_edits: Some(vec![lsp::TextEdit {
2642                        new_text: "use d::SomeTrait;\n".to_string(),
2643                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2644                    }]),
2645                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2646                    ..Default::default()
2647                })
2648            },
2649        );
2650
2651        // The additional edit is applied.
2652        buffer_a
2653            .condition(&cx_a, |buffer, _| {
2654                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2655            })
2656            .await;
2657        buffer_b
2658            .condition(&cx_b, |buffer, _| {
2659                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2660            })
2661            .await;
2662    }
2663
2664    #[gpui::test(iterations = 10)]
2665    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2666        cx_a.foreground().forbid_parking();
2667        let lang_registry = Arc::new(LanguageRegistry::test());
2668        let fs = FakeFs::new(cx_a.background());
2669
2670        // Connect to a server as 2 clients.
2671        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2672        let client_a = server.create_client(cx_a, "user_a").await;
2673        let client_b = server.create_client(cx_b, "user_b").await;
2674
2675        // Share a project as client A
2676        fs.insert_tree(
2677            "/a",
2678            json!({
2679                ".zed.toml": r#"collaborators = ["user_b"]"#,
2680                "a.rs": "let one = 1;",
2681            }),
2682        )
2683        .await;
2684        let project_a = cx_a.update(|cx| {
2685            Project::local(
2686                client_a.clone(),
2687                client_a.user_store.clone(),
2688                lang_registry.clone(),
2689                fs.clone(),
2690                cx,
2691            )
2692        });
2693        let (worktree_a, _) = project_a
2694            .update(cx_a, |p, cx| {
2695                p.find_or_create_local_worktree("/a", true, cx)
2696            })
2697            .await
2698            .unwrap();
2699        worktree_a
2700            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2701            .await;
2702        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2703        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2704        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2705        let buffer_a = project_a
2706            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2707            .await
2708            .unwrap();
2709
2710        // Join the worktree as client B.
2711        let project_b = Project::remote(
2712            project_id,
2713            client_b.clone(),
2714            client_b.user_store.clone(),
2715            lang_registry.clone(),
2716            fs.clone(),
2717            &mut cx_b.to_async(),
2718        )
2719        .await
2720        .unwrap();
2721
2722        let buffer_b = cx_b
2723            .background()
2724            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2725            .await
2726            .unwrap();
2727        buffer_b.update(cx_b, |buffer, cx| {
2728            buffer.edit([4..7], "six", cx);
2729            buffer.edit([10..11], "6", cx);
2730            assert_eq!(buffer.text(), "let six = 6;");
2731            assert!(buffer.is_dirty());
2732            assert!(!buffer.has_conflict());
2733        });
2734        buffer_a
2735            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
2736            .await;
2737
2738        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
2739            .await
2740            .unwrap();
2741        buffer_a
2742            .condition(cx_a, |buffer, _| buffer.has_conflict())
2743            .await;
2744        buffer_b
2745            .condition(cx_b, |buffer, _| buffer.has_conflict())
2746            .await;
2747
2748        project_b
2749            .update(cx_b, |project, cx| {
2750                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
2751            })
2752            .await
2753            .unwrap();
2754        buffer_a.read_with(cx_a, |buffer, _| {
2755            assert_eq!(buffer.text(), "let seven = 7;");
2756            assert!(!buffer.is_dirty());
2757            assert!(!buffer.has_conflict());
2758        });
2759        buffer_b.read_with(cx_b, |buffer, _| {
2760            assert_eq!(buffer.text(), "let seven = 7;");
2761            assert!(!buffer.is_dirty());
2762            assert!(!buffer.has_conflict());
2763        });
2764
2765        buffer_a.update(cx_a, |buffer, cx| {
2766            // Undoing on the host is a no-op when the reload was initiated by the guest.
2767            buffer.undo(cx);
2768            assert_eq!(buffer.text(), "let seven = 7;");
2769            assert!(!buffer.is_dirty());
2770            assert!(!buffer.has_conflict());
2771        });
2772        buffer_b.update(cx_b, |buffer, cx| {
2773            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
2774            buffer.undo(cx);
2775            assert_eq!(buffer.text(), "let six = 6;");
2776            assert!(buffer.is_dirty());
2777            assert!(!buffer.has_conflict());
2778        });
2779    }
2780
2781    #[gpui::test(iterations = 10)]
2782    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2783        cx_a.foreground().forbid_parking();
2784        let lang_registry = Arc::new(LanguageRegistry::test());
2785        let fs = FakeFs::new(cx_a.background());
2786
2787        // Set up a fake language server.
2788        let mut language = Language::new(
2789            LanguageConfig {
2790                name: "Rust".into(),
2791                path_suffixes: vec!["rs".to_string()],
2792                ..Default::default()
2793            },
2794            Some(tree_sitter_rust::language()),
2795        );
2796        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2797        lang_registry.add(Arc::new(language));
2798
2799        // Connect to a server as 2 clients.
2800        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2801        let client_a = server.create_client(cx_a, "user_a").await;
2802        let client_b = server.create_client(cx_b, "user_b").await;
2803
2804        // Share a project as client A
2805        fs.insert_tree(
2806            "/a",
2807            json!({
2808                ".zed.toml": r#"collaborators = ["user_b"]"#,
2809                "a.rs": "let one = two",
2810            }),
2811        )
2812        .await;
2813        let project_a = cx_a.update(|cx| {
2814            Project::local(
2815                client_a.clone(),
2816                client_a.user_store.clone(),
2817                lang_registry.clone(),
2818                fs.clone(),
2819                cx,
2820            )
2821        });
2822        let (worktree_a, _) = project_a
2823            .update(cx_a, |p, cx| {
2824                p.find_or_create_local_worktree("/a", true, cx)
2825            })
2826            .await
2827            .unwrap();
2828        worktree_a
2829            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2830            .await;
2831        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2832        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2833        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2834
2835        // Join the worktree as client B.
2836        let project_b = Project::remote(
2837            project_id,
2838            client_b.clone(),
2839            client_b.user_store.clone(),
2840            lang_registry.clone(),
2841            fs.clone(),
2842            &mut cx_b.to_async(),
2843        )
2844        .await
2845        .unwrap();
2846
2847        let buffer_b = cx_b
2848            .background()
2849            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2850            .await
2851            .unwrap();
2852
2853        let fake_language_server = fake_language_servers.next().await.unwrap();
2854        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
2855            Ok(Some(vec![
2856                lsp::TextEdit {
2857                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2858                    new_text: "h".to_string(),
2859                },
2860                lsp::TextEdit {
2861                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2862                    new_text: "y".to_string(),
2863                },
2864            ]))
2865        });
2866
2867        project_b
2868            .update(cx_b, |project, cx| {
2869                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2870            })
2871            .await
2872            .unwrap();
2873        assert_eq!(
2874            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2875            "let honey = two"
2876        );
2877    }
2878
2879    #[gpui::test(iterations = 10)]
2880    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2881        cx_a.foreground().forbid_parking();
2882        let lang_registry = Arc::new(LanguageRegistry::test());
2883        let fs = FakeFs::new(cx_a.background());
2884        fs.insert_tree(
2885            "/root-1",
2886            json!({
2887                ".zed.toml": r#"collaborators = ["user_b"]"#,
2888                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2889            }),
2890        )
2891        .await;
2892        fs.insert_tree(
2893            "/root-2",
2894            json!({
2895                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2896            }),
2897        )
2898        .await;
2899
2900        // Set up a fake language server.
2901        let mut language = Language::new(
2902            LanguageConfig {
2903                name: "Rust".into(),
2904                path_suffixes: vec!["rs".to_string()],
2905                ..Default::default()
2906            },
2907            Some(tree_sitter_rust::language()),
2908        );
2909        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2910        lang_registry.add(Arc::new(language));
2911
2912        // Connect to a server as 2 clients.
2913        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2914        let client_a = server.create_client(cx_a, "user_a").await;
2915        let client_b = server.create_client(cx_b, "user_b").await;
2916
2917        // Share a project as client A
2918        let project_a = cx_a.update(|cx| {
2919            Project::local(
2920                client_a.clone(),
2921                client_a.user_store.clone(),
2922                lang_registry.clone(),
2923                fs.clone(),
2924                cx,
2925            )
2926        });
2927        let (worktree_a, _) = project_a
2928            .update(cx_a, |p, cx| {
2929                p.find_or_create_local_worktree("/root-1", true, cx)
2930            })
2931            .await
2932            .unwrap();
2933        worktree_a
2934            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2935            .await;
2936        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2937        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2938        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2939
2940        // Join the worktree as client B.
2941        let project_b = Project::remote(
2942            project_id,
2943            client_b.clone(),
2944            client_b.user_store.clone(),
2945            lang_registry.clone(),
2946            fs.clone(),
2947            &mut cx_b.to_async(),
2948        )
2949        .await
2950        .unwrap();
2951
2952        // Open the file on client B.
2953        let buffer_b = cx_b
2954            .background()
2955            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2956            .await
2957            .unwrap();
2958
2959        // Request the definition of a symbol as the guest.
2960        let fake_language_server = fake_language_servers.next().await.unwrap();
2961        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2962            |_, _| async move {
2963                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2964                    lsp::Location::new(
2965                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2966                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2967                    ),
2968                )))
2969            },
2970        );
2971
2972        let definitions_1 = project_b
2973            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2974            .await
2975            .unwrap();
2976        cx_b.read(|cx| {
2977            assert_eq!(definitions_1.len(), 1);
2978            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2979            let target_buffer = definitions_1[0].buffer.read(cx);
2980            assert_eq!(
2981                target_buffer.text(),
2982                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2983            );
2984            assert_eq!(
2985                definitions_1[0].range.to_point(target_buffer),
2986                Point::new(0, 6)..Point::new(0, 9)
2987            );
2988        });
2989
2990        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2991        // the previous call to `definition`.
2992        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2993            |_, _| async move {
2994                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2995                    lsp::Location::new(
2996                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2997                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2998                    ),
2999                )))
3000            },
3001        );
3002
3003        let definitions_2 = project_b
3004            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3005            .await
3006            .unwrap();
3007        cx_b.read(|cx| {
3008            assert_eq!(definitions_2.len(), 1);
3009            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3010            let target_buffer = definitions_2[0].buffer.read(cx);
3011            assert_eq!(
3012                target_buffer.text(),
3013                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3014            );
3015            assert_eq!(
3016                definitions_2[0].range.to_point(target_buffer),
3017                Point::new(1, 6)..Point::new(1, 11)
3018            );
3019        });
3020        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3021    }
3022
3023    #[gpui::test(iterations = 10)]
3024    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3025        cx_a.foreground().forbid_parking();
3026        let lang_registry = Arc::new(LanguageRegistry::test());
3027        let fs = FakeFs::new(cx_a.background());
3028        fs.insert_tree(
3029            "/root-1",
3030            json!({
3031                ".zed.toml": r#"collaborators = ["user_b"]"#,
3032                "one.rs": "const ONE: usize = 1;",
3033                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3034            }),
3035        )
3036        .await;
3037        fs.insert_tree(
3038            "/root-2",
3039            json!({
3040                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3041            }),
3042        )
3043        .await;
3044
3045        // Set up a fake language server.
3046        let mut language = Language::new(
3047            LanguageConfig {
3048                name: "Rust".into(),
3049                path_suffixes: vec!["rs".to_string()],
3050                ..Default::default()
3051            },
3052            Some(tree_sitter_rust::language()),
3053        );
3054        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3055        lang_registry.add(Arc::new(language));
3056
3057        // Connect to a server as 2 clients.
3058        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3059        let client_a = server.create_client(cx_a, "user_a").await;
3060        let client_b = server.create_client(cx_b, "user_b").await;
3061
3062        // Share a project as client A
3063        let project_a = cx_a.update(|cx| {
3064            Project::local(
3065                client_a.clone(),
3066                client_a.user_store.clone(),
3067                lang_registry.clone(),
3068                fs.clone(),
3069                cx,
3070            )
3071        });
3072        let (worktree_a, _) = project_a
3073            .update(cx_a, |p, cx| {
3074                p.find_or_create_local_worktree("/root-1", true, cx)
3075            })
3076            .await
3077            .unwrap();
3078        worktree_a
3079            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3080            .await;
3081        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3082        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3083        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3084
3085        // Join the worktree as client B.
3086        let project_b = Project::remote(
3087            project_id,
3088            client_b.clone(),
3089            client_b.user_store.clone(),
3090            lang_registry.clone(),
3091            fs.clone(),
3092            &mut cx_b.to_async(),
3093        )
3094        .await
3095        .unwrap();
3096
3097        // Open the file on client B.
3098        let buffer_b = cx_b
3099            .background()
3100            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3101            .await
3102            .unwrap();
3103
3104        // Request references to a symbol as the guest.
3105        let fake_language_server = fake_language_servers.next().await.unwrap();
3106        fake_language_server.handle_request::<lsp::request::References, _, _>(
3107            |params, _| async move {
3108                assert_eq!(
3109                    params.text_document_position.text_document.uri.as_str(),
3110                    "file:///root-1/one.rs"
3111                );
3112                Ok(Some(vec![
3113                    lsp::Location {
3114                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3115                        range: lsp::Range::new(
3116                            lsp::Position::new(0, 24),
3117                            lsp::Position::new(0, 27),
3118                        ),
3119                    },
3120                    lsp::Location {
3121                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3122                        range: lsp::Range::new(
3123                            lsp::Position::new(0, 35),
3124                            lsp::Position::new(0, 38),
3125                        ),
3126                    },
3127                    lsp::Location {
3128                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3129                        range: lsp::Range::new(
3130                            lsp::Position::new(0, 37),
3131                            lsp::Position::new(0, 40),
3132                        ),
3133                    },
3134                ]))
3135            },
3136        );
3137
3138        let references = project_b
3139            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3140            .await
3141            .unwrap();
3142        cx_b.read(|cx| {
3143            assert_eq!(references.len(), 3);
3144            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3145
3146            let two_buffer = references[0].buffer.read(cx);
3147            let three_buffer = references[2].buffer.read(cx);
3148            assert_eq!(
3149                two_buffer.file().unwrap().path().as_ref(),
3150                Path::new("two.rs")
3151            );
3152            assert_eq!(references[1].buffer, references[0].buffer);
3153            assert_eq!(
3154                three_buffer.file().unwrap().full_path(cx),
3155                Path::new("three.rs")
3156            );
3157
3158            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3159            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3160            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3161        });
3162    }
3163
3164    #[gpui::test(iterations = 10)]
3165    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3166        cx_a.foreground().forbid_parking();
3167        let lang_registry = Arc::new(LanguageRegistry::test());
3168        let fs = FakeFs::new(cx_a.background());
3169        fs.insert_tree(
3170            "/root-1",
3171            json!({
3172                ".zed.toml": r#"collaborators = ["user_b"]"#,
3173                "a": "hello world",
3174                "b": "goodnight moon",
3175                "c": "a world of goo",
3176                "d": "world champion of clown world",
3177            }),
3178        )
3179        .await;
3180        fs.insert_tree(
3181            "/root-2",
3182            json!({
3183                "e": "disney world is fun",
3184            }),
3185        )
3186        .await;
3187
3188        // Connect to a server as 2 clients.
3189        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3190        let client_a = server.create_client(cx_a, "user_a").await;
3191        let client_b = server.create_client(cx_b, "user_b").await;
3192
3193        // Share a project as client A
3194        let project_a = cx_a.update(|cx| {
3195            Project::local(
3196                client_a.clone(),
3197                client_a.user_store.clone(),
3198                lang_registry.clone(),
3199                fs.clone(),
3200                cx,
3201            )
3202        });
3203        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3204
3205        let (worktree_1, _) = project_a
3206            .update(cx_a, |p, cx| {
3207                p.find_or_create_local_worktree("/root-1", true, cx)
3208            })
3209            .await
3210            .unwrap();
3211        worktree_1
3212            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3213            .await;
3214        let (worktree_2, _) = project_a
3215            .update(cx_a, |p, cx| {
3216                p.find_or_create_local_worktree("/root-2", true, cx)
3217            })
3218            .await
3219            .unwrap();
3220        worktree_2
3221            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3222            .await;
3223
3224        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3225
3226        // Join the worktree as client B.
3227        let project_b = Project::remote(
3228            project_id,
3229            client_b.clone(),
3230            client_b.user_store.clone(),
3231            lang_registry.clone(),
3232            fs.clone(),
3233            &mut cx_b.to_async(),
3234        )
3235        .await
3236        .unwrap();
3237
3238        let results = project_b
3239            .update(cx_b, |project, cx| {
3240                project.search(SearchQuery::text("world", false, false), cx)
3241            })
3242            .await
3243            .unwrap();
3244
3245        let mut ranges_by_path = results
3246            .into_iter()
3247            .map(|(buffer, ranges)| {
3248                buffer.read_with(cx_b, |buffer, cx| {
3249                    let path = buffer.file().unwrap().full_path(cx);
3250                    let offset_ranges = ranges
3251                        .into_iter()
3252                        .map(|range| range.to_offset(buffer))
3253                        .collect::<Vec<_>>();
3254                    (path, offset_ranges)
3255                })
3256            })
3257            .collect::<Vec<_>>();
3258        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3259
3260        assert_eq!(
3261            ranges_by_path,
3262            &[
3263                (PathBuf::from("root-1/a"), vec![6..11]),
3264                (PathBuf::from("root-1/c"), vec![2..7]),
3265                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3266                (PathBuf::from("root-2/e"), vec![7..12]),
3267            ]
3268        );
3269    }
3270
3271    #[gpui::test(iterations = 10)]
3272    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3273        cx_a.foreground().forbid_parking();
3274        let lang_registry = Arc::new(LanguageRegistry::test());
3275        let fs = FakeFs::new(cx_a.background());
3276        fs.insert_tree(
3277            "/root-1",
3278            json!({
3279                ".zed.toml": r#"collaborators = ["user_b"]"#,
3280                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3281            }),
3282        )
3283        .await;
3284
3285        // Set up a fake language server.
3286        let mut language = Language::new(
3287            LanguageConfig {
3288                name: "Rust".into(),
3289                path_suffixes: vec!["rs".to_string()],
3290                ..Default::default()
3291            },
3292            Some(tree_sitter_rust::language()),
3293        );
3294        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3295        lang_registry.add(Arc::new(language));
3296
3297        // Connect to a server as 2 clients.
3298        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3299        let client_a = server.create_client(cx_a, "user_a").await;
3300        let client_b = server.create_client(cx_b, "user_b").await;
3301
3302        // Share a project as client A
3303        let project_a = cx_a.update(|cx| {
3304            Project::local(
3305                client_a.clone(),
3306                client_a.user_store.clone(),
3307                lang_registry.clone(),
3308                fs.clone(),
3309                cx,
3310            )
3311        });
3312        let (worktree_a, _) = project_a
3313            .update(cx_a, |p, cx| {
3314                p.find_or_create_local_worktree("/root-1", true, cx)
3315            })
3316            .await
3317            .unwrap();
3318        worktree_a
3319            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3320            .await;
3321        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3322        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3323        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3324
3325        // Join the worktree as client B.
3326        let project_b = Project::remote(
3327            project_id,
3328            client_b.clone(),
3329            client_b.user_store.clone(),
3330            lang_registry.clone(),
3331            fs.clone(),
3332            &mut cx_b.to_async(),
3333        )
3334        .await
3335        .unwrap();
3336
3337        // Open the file on client B.
3338        let buffer_b = cx_b
3339            .background()
3340            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3341            .await
3342            .unwrap();
3343
3344        // Request document highlights as the guest.
3345        let fake_language_server = fake_language_servers.next().await.unwrap();
3346        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3347            |params, _| async move {
3348                assert_eq!(
3349                    params
3350                        .text_document_position_params
3351                        .text_document
3352                        .uri
3353                        .as_str(),
3354                    "file:///root-1/main.rs"
3355                );
3356                assert_eq!(
3357                    params.text_document_position_params.position,
3358                    lsp::Position::new(0, 34)
3359                );
3360                Ok(Some(vec![
3361                    lsp::DocumentHighlight {
3362                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3363                        range: lsp::Range::new(
3364                            lsp::Position::new(0, 10),
3365                            lsp::Position::new(0, 16),
3366                        ),
3367                    },
3368                    lsp::DocumentHighlight {
3369                        kind: Some(lsp::DocumentHighlightKind::READ),
3370                        range: lsp::Range::new(
3371                            lsp::Position::new(0, 32),
3372                            lsp::Position::new(0, 38),
3373                        ),
3374                    },
3375                    lsp::DocumentHighlight {
3376                        kind: Some(lsp::DocumentHighlightKind::READ),
3377                        range: lsp::Range::new(
3378                            lsp::Position::new(0, 41),
3379                            lsp::Position::new(0, 47),
3380                        ),
3381                    },
3382                ]))
3383            },
3384        );
3385
3386        let highlights = project_b
3387            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3388            .await
3389            .unwrap();
3390        buffer_b.read_with(cx_b, |buffer, _| {
3391            let snapshot = buffer.snapshot();
3392
3393            let highlights = highlights
3394                .into_iter()
3395                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3396                .collect::<Vec<_>>();
3397            assert_eq!(
3398                highlights,
3399                &[
3400                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3401                    (lsp::DocumentHighlightKind::READ, 32..38),
3402                    (lsp::DocumentHighlightKind::READ, 41..47)
3403                ]
3404            )
3405        });
3406    }
3407
3408    #[gpui::test(iterations = 10)]
3409    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3410        cx_a.foreground().forbid_parking();
3411        let lang_registry = Arc::new(LanguageRegistry::test());
3412        let fs = FakeFs::new(cx_a.background());
3413        fs.insert_tree(
3414            "/code",
3415            json!({
3416                "crate-1": {
3417                    ".zed.toml": r#"collaborators = ["user_b"]"#,
3418                    "one.rs": "const ONE: usize = 1;",
3419                },
3420                "crate-2": {
3421                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3422                },
3423                "private": {
3424                    "passwords.txt": "the-password",
3425                }
3426            }),
3427        )
3428        .await;
3429
3430        // Set up a fake language server.
3431        let mut language = Language::new(
3432            LanguageConfig {
3433                name: "Rust".into(),
3434                path_suffixes: vec!["rs".to_string()],
3435                ..Default::default()
3436            },
3437            Some(tree_sitter_rust::language()),
3438        );
3439        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3440        lang_registry.add(Arc::new(language));
3441
3442        // Connect to a server as 2 clients.
3443        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3444        let client_a = server.create_client(cx_a, "user_a").await;
3445        let client_b = server.create_client(cx_b, "user_b").await;
3446
3447        // Share a project as client A
3448        let project_a = cx_a.update(|cx| {
3449            Project::local(
3450                client_a.clone(),
3451                client_a.user_store.clone(),
3452                lang_registry.clone(),
3453                fs.clone(),
3454                cx,
3455            )
3456        });
3457        let (worktree_a, _) = project_a
3458            .update(cx_a, |p, cx| {
3459                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3460            })
3461            .await
3462            .unwrap();
3463        worktree_a
3464            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3465            .await;
3466        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3467        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3468        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3469
3470        // Join the worktree as client B.
3471        let project_b = Project::remote(
3472            project_id,
3473            client_b.clone(),
3474            client_b.user_store.clone(),
3475            lang_registry.clone(),
3476            fs.clone(),
3477            &mut cx_b.to_async(),
3478        )
3479        .await
3480        .unwrap();
3481
3482        // Cause the language server to start.
3483        let _buffer = cx_b
3484            .background()
3485            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3486            .await
3487            .unwrap();
3488
3489        let fake_language_server = fake_language_servers.next().await.unwrap();
3490        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3491            |_, _| async move {
3492                #[allow(deprecated)]
3493                Ok(Some(vec![lsp::SymbolInformation {
3494                    name: "TWO".into(),
3495                    location: lsp::Location {
3496                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3497                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3498                    },
3499                    kind: lsp::SymbolKind::CONSTANT,
3500                    tags: None,
3501                    container_name: None,
3502                    deprecated: None,
3503                }]))
3504            },
3505        );
3506
3507        // Request the definition of a symbol as the guest.
3508        let symbols = project_b
3509            .update(cx_b, |p, cx| p.symbols("two", cx))
3510            .await
3511            .unwrap();
3512        assert_eq!(symbols.len(), 1);
3513        assert_eq!(symbols[0].name, "TWO");
3514
3515        // Open one of the returned symbols.
3516        let buffer_b_2 = project_b
3517            .update(cx_b, |project, cx| {
3518                project.open_buffer_for_symbol(&symbols[0], cx)
3519            })
3520            .await
3521            .unwrap();
3522        buffer_b_2.read_with(cx_b, |buffer, _| {
3523            assert_eq!(
3524                buffer.file().unwrap().path().as_ref(),
3525                Path::new("../crate-2/two.rs")
3526            );
3527        });
3528
3529        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3530        let mut fake_symbol = symbols[0].clone();
3531        fake_symbol.path = Path::new("/code/secrets").into();
3532        let error = project_b
3533            .update(cx_b, |project, cx| {
3534                project.open_buffer_for_symbol(&fake_symbol, cx)
3535            })
3536            .await
3537            .unwrap_err();
3538        assert!(error.to_string().contains("invalid symbol signature"));
3539    }
3540
3541    #[gpui::test(iterations = 10)]
3542    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3543        cx_a: &mut TestAppContext,
3544        cx_b: &mut TestAppContext,
3545        mut rng: StdRng,
3546    ) {
3547        cx_a.foreground().forbid_parking();
3548        let lang_registry = Arc::new(LanguageRegistry::test());
3549        let fs = FakeFs::new(cx_a.background());
3550        fs.insert_tree(
3551            "/root",
3552            json!({
3553                ".zed.toml": r#"collaborators = ["user_b"]"#,
3554                "a.rs": "const ONE: usize = b::TWO;",
3555                "b.rs": "const TWO: usize = 2",
3556            }),
3557        )
3558        .await;
3559
3560        // Set up a fake language server.
3561        let mut language = Language::new(
3562            LanguageConfig {
3563                name: "Rust".into(),
3564                path_suffixes: vec!["rs".to_string()],
3565                ..Default::default()
3566            },
3567            Some(tree_sitter_rust::language()),
3568        );
3569        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3570        lang_registry.add(Arc::new(language));
3571
3572        // Connect to a server as 2 clients.
3573        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3574        let client_a = server.create_client(cx_a, "user_a").await;
3575        let client_b = server.create_client(cx_b, "user_b").await;
3576
3577        // Share a project as client A
3578        let project_a = cx_a.update(|cx| {
3579            Project::local(
3580                client_a.clone(),
3581                client_a.user_store.clone(),
3582                lang_registry.clone(),
3583                fs.clone(),
3584                cx,
3585            )
3586        });
3587
3588        let (worktree_a, _) = project_a
3589            .update(cx_a, |p, cx| {
3590                p.find_or_create_local_worktree("/root", true, cx)
3591            })
3592            .await
3593            .unwrap();
3594        worktree_a
3595            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3596            .await;
3597        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3598        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3599        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3600
3601        // Join the worktree as client B.
3602        let project_b = Project::remote(
3603            project_id,
3604            client_b.clone(),
3605            client_b.user_store.clone(),
3606            lang_registry.clone(),
3607            fs.clone(),
3608            &mut cx_b.to_async(),
3609        )
3610        .await
3611        .unwrap();
3612
3613        let buffer_b1 = cx_b
3614            .background()
3615            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3616            .await
3617            .unwrap();
3618
3619        let fake_language_server = fake_language_servers.next().await.unwrap();
3620        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3621            |_, _| async move {
3622                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3623                    lsp::Location::new(
3624                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
3625                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3626                    ),
3627                )))
3628            },
3629        );
3630
3631        let definitions;
3632        let buffer_b2;
3633        if rng.gen() {
3634            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3635            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3636        } else {
3637            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3638            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3639        }
3640
3641        let buffer_b2 = buffer_b2.await.unwrap();
3642        let definitions = definitions.await.unwrap();
3643        assert_eq!(definitions.len(), 1);
3644        assert_eq!(definitions[0].buffer, buffer_b2);
3645    }
3646
3647    #[gpui::test(iterations = 10)]
3648    async fn test_collaborating_with_code_actions(
3649        cx_a: &mut TestAppContext,
3650        cx_b: &mut TestAppContext,
3651    ) {
3652        cx_a.foreground().forbid_parking();
3653        let lang_registry = Arc::new(LanguageRegistry::test());
3654        let fs = FakeFs::new(cx_a.background());
3655        cx_b.update(|cx| editor::init(cx));
3656
3657        // Set up a fake language server.
3658        let mut language = Language::new(
3659            LanguageConfig {
3660                name: "Rust".into(),
3661                path_suffixes: vec!["rs".to_string()],
3662                ..Default::default()
3663            },
3664            Some(tree_sitter_rust::language()),
3665        );
3666        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3667        lang_registry.add(Arc::new(language));
3668
3669        // Connect to a server as 2 clients.
3670        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3671        let client_a = server.create_client(cx_a, "user_a").await;
3672        let client_b = server.create_client(cx_b, "user_b").await;
3673
3674        // Share a project as client A
3675        fs.insert_tree(
3676            "/a",
3677            json!({
3678                ".zed.toml": r#"collaborators = ["user_b"]"#,
3679                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3680                "other.rs": "pub fn foo() -> usize { 4 }",
3681            }),
3682        )
3683        .await;
3684        let project_a = cx_a.update(|cx| {
3685            Project::local(
3686                client_a.clone(),
3687                client_a.user_store.clone(),
3688                lang_registry.clone(),
3689                fs.clone(),
3690                cx,
3691            )
3692        });
3693        let (worktree_a, _) = project_a
3694            .update(cx_a, |p, cx| {
3695                p.find_or_create_local_worktree("/a", true, cx)
3696            })
3697            .await
3698            .unwrap();
3699        worktree_a
3700            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3701            .await;
3702        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3703        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3704        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3705
3706        // Join the worktree as client B.
3707        let project_b = Project::remote(
3708            project_id,
3709            client_b.clone(),
3710            client_b.user_store.clone(),
3711            lang_registry.clone(),
3712            fs.clone(),
3713            &mut cx_b.to_async(),
3714        )
3715        .await
3716        .unwrap();
3717        let mut params = cx_b.update(WorkspaceParams::test);
3718        params.languages = lang_registry.clone();
3719        params.client = client_b.client.clone();
3720        params.user_store = client_b.user_store.clone();
3721        params.project = project_b;
3722
3723        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3724        let editor_b = workspace_b
3725            .update(cx_b, |workspace, cx| {
3726                workspace.open_path((worktree_id, "main.rs"), cx)
3727            })
3728            .await
3729            .unwrap()
3730            .downcast::<Editor>()
3731            .unwrap();
3732
3733        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3734        fake_language_server
3735            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3736                assert_eq!(
3737                    params.text_document.uri,
3738                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3739                );
3740                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3741                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3742                Ok(None)
3743            })
3744            .next()
3745            .await;
3746
3747        // Move cursor to a location that contains code actions.
3748        editor_b.update(cx_b, |editor, cx| {
3749            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3750            cx.focus(&editor_b);
3751        });
3752
3753        fake_language_server
3754            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3755                assert_eq!(
3756                    params.text_document.uri,
3757                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3758                );
3759                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3760                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3761
3762                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
3763                    lsp::CodeAction {
3764                        title: "Inline into all callers".to_string(),
3765                        edit: Some(lsp::WorkspaceEdit {
3766                            changes: Some(
3767                                [
3768                                    (
3769                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3770                                        vec![lsp::TextEdit::new(
3771                                            lsp::Range::new(
3772                                                lsp::Position::new(1, 22),
3773                                                lsp::Position::new(1, 34),
3774                                            ),
3775                                            "4".to_string(),
3776                                        )],
3777                                    ),
3778                                    (
3779                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3780                                        vec![lsp::TextEdit::new(
3781                                            lsp::Range::new(
3782                                                lsp::Position::new(0, 0),
3783                                                lsp::Position::new(0, 27),
3784                                            ),
3785                                            "".to_string(),
3786                                        )],
3787                                    ),
3788                                ]
3789                                .into_iter()
3790                                .collect(),
3791                            ),
3792                            ..Default::default()
3793                        }),
3794                        data: Some(json!({
3795                            "codeActionParams": {
3796                                "range": {
3797                                    "start": {"line": 1, "column": 31},
3798                                    "end": {"line": 1, "column": 31},
3799                                }
3800                            }
3801                        })),
3802                        ..Default::default()
3803                    },
3804                )]))
3805            })
3806            .next()
3807            .await;
3808
3809        // Toggle code actions and wait for them to display.
3810        editor_b.update(cx_b, |editor, cx| {
3811            editor.toggle_code_actions(
3812                &ToggleCodeActions {
3813                    deployed_from_indicator: false,
3814                },
3815                cx,
3816            );
3817        });
3818        editor_b
3819            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3820            .await;
3821
3822        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3823
3824        // Confirming the code action will trigger a resolve request.
3825        let confirm_action = workspace_b
3826            .update(cx_b, |workspace, cx| {
3827                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
3828            })
3829            .unwrap();
3830        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
3831            |_, _| async move {
3832                Ok(lsp::CodeAction {
3833                    title: "Inline into all callers".to_string(),
3834                    edit: Some(lsp::WorkspaceEdit {
3835                        changes: Some(
3836                            [
3837                                (
3838                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3839                                    vec![lsp::TextEdit::new(
3840                                        lsp::Range::new(
3841                                            lsp::Position::new(1, 22),
3842                                            lsp::Position::new(1, 34),
3843                                        ),
3844                                        "4".to_string(),
3845                                    )],
3846                                ),
3847                                (
3848                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
3849                                    vec![lsp::TextEdit::new(
3850                                        lsp::Range::new(
3851                                            lsp::Position::new(0, 0),
3852                                            lsp::Position::new(0, 27),
3853                                        ),
3854                                        "".to_string(),
3855                                    )],
3856                                ),
3857                            ]
3858                            .into_iter()
3859                            .collect(),
3860                        ),
3861                        ..Default::default()
3862                    }),
3863                    ..Default::default()
3864                })
3865            },
3866        );
3867
3868        // After the action is confirmed, an editor containing both modified files is opened.
3869        confirm_action.await.unwrap();
3870        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3871            workspace
3872                .active_item(cx)
3873                .unwrap()
3874                .downcast::<Editor>()
3875                .unwrap()
3876        });
3877        code_action_editor.update(cx_b, |editor, cx| {
3878            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3879            editor.undo(&Undo, cx);
3880            assert_eq!(
3881                editor.text(cx),
3882                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3883            );
3884            editor.redo(&Redo, cx);
3885            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3886        });
3887    }
3888
3889    #[gpui::test(iterations = 10)]
3890    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3891        cx_a.foreground().forbid_parking();
3892        let lang_registry = Arc::new(LanguageRegistry::test());
3893        let fs = FakeFs::new(cx_a.background());
3894        cx_b.update(|cx| editor::init(cx));
3895
3896        // Set up a fake language server.
3897        let mut language = Language::new(
3898            LanguageConfig {
3899                name: "Rust".into(),
3900                path_suffixes: vec!["rs".to_string()],
3901                ..Default::default()
3902            },
3903            Some(tree_sitter_rust::language()),
3904        );
3905        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3906            capabilities: lsp::ServerCapabilities {
3907                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
3908                    prepare_provider: Some(true),
3909                    work_done_progress_options: Default::default(),
3910                })),
3911                ..Default::default()
3912            },
3913            ..Default::default()
3914        });
3915        lang_registry.add(Arc::new(language));
3916
3917        // Connect to a server as 2 clients.
3918        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3919        let client_a = server.create_client(cx_a, "user_a").await;
3920        let client_b = server.create_client(cx_b, "user_b").await;
3921
3922        // Share a project as client A
3923        fs.insert_tree(
3924            "/dir",
3925            json!({
3926                ".zed.toml": r#"collaborators = ["user_b"]"#,
3927                "one.rs": "const ONE: usize = 1;",
3928                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3929            }),
3930        )
3931        .await;
3932        let project_a = cx_a.update(|cx| {
3933            Project::local(
3934                client_a.clone(),
3935                client_a.user_store.clone(),
3936                lang_registry.clone(),
3937                fs.clone(),
3938                cx,
3939            )
3940        });
3941        let (worktree_a, _) = project_a
3942            .update(cx_a, |p, cx| {
3943                p.find_or_create_local_worktree("/dir", true, cx)
3944            })
3945            .await
3946            .unwrap();
3947        worktree_a
3948            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3949            .await;
3950        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3951        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3952        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3953
3954        // Join the worktree as client B.
3955        let project_b = Project::remote(
3956            project_id,
3957            client_b.clone(),
3958            client_b.user_store.clone(),
3959            lang_registry.clone(),
3960            fs.clone(),
3961            &mut cx_b.to_async(),
3962        )
3963        .await
3964        .unwrap();
3965        let mut params = cx_b.update(WorkspaceParams::test);
3966        params.languages = lang_registry.clone();
3967        params.client = client_b.client.clone();
3968        params.user_store = client_b.user_store.clone();
3969        params.project = project_b;
3970
3971        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3972        let editor_b = workspace_b
3973            .update(cx_b, |workspace, cx| {
3974                workspace.open_path((worktree_id, "one.rs"), cx)
3975            })
3976            .await
3977            .unwrap()
3978            .downcast::<Editor>()
3979            .unwrap();
3980        let fake_language_server = fake_language_servers.next().await.unwrap();
3981
3982        // Move cursor to a location that can be renamed.
3983        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3984            editor.select_ranges([7..7], None, cx);
3985            editor.rename(&Rename, cx).unwrap()
3986        });
3987
3988        fake_language_server
3989            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
3990                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3991                assert_eq!(params.position, lsp::Position::new(0, 7));
3992                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3993                    lsp::Position::new(0, 6),
3994                    lsp::Position::new(0, 9),
3995                ))))
3996            })
3997            .next()
3998            .await
3999            .unwrap();
4000        prepare_rename.await.unwrap();
4001        editor_b.update(cx_b, |editor, cx| {
4002            let rename = editor.pending_rename().unwrap();
4003            let buffer = editor.buffer().read(cx).snapshot(cx);
4004            assert_eq!(
4005                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4006                6..9
4007            );
4008            rename.editor.update(cx, |rename_editor, cx| {
4009                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4010                    rename_buffer.edit([0..3], "THREE", cx);
4011                });
4012            });
4013        });
4014
4015        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4016            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4017        });
4018        fake_language_server
4019            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4020                assert_eq!(
4021                    params.text_document_position.text_document.uri.as_str(),
4022                    "file:///dir/one.rs"
4023                );
4024                assert_eq!(
4025                    params.text_document_position.position,
4026                    lsp::Position::new(0, 6)
4027                );
4028                assert_eq!(params.new_name, "THREE");
4029                Ok(Some(lsp::WorkspaceEdit {
4030                    changes: Some(
4031                        [
4032                            (
4033                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4034                                vec![lsp::TextEdit::new(
4035                                    lsp::Range::new(
4036                                        lsp::Position::new(0, 6),
4037                                        lsp::Position::new(0, 9),
4038                                    ),
4039                                    "THREE".to_string(),
4040                                )],
4041                            ),
4042                            (
4043                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4044                                vec![
4045                                    lsp::TextEdit::new(
4046                                        lsp::Range::new(
4047                                            lsp::Position::new(0, 24),
4048                                            lsp::Position::new(0, 27),
4049                                        ),
4050                                        "THREE".to_string(),
4051                                    ),
4052                                    lsp::TextEdit::new(
4053                                        lsp::Range::new(
4054                                            lsp::Position::new(0, 35),
4055                                            lsp::Position::new(0, 38),
4056                                        ),
4057                                        "THREE".to_string(),
4058                                    ),
4059                                ],
4060                            ),
4061                        ]
4062                        .into_iter()
4063                        .collect(),
4064                    ),
4065                    ..Default::default()
4066                }))
4067            })
4068            .next()
4069            .await
4070            .unwrap();
4071        confirm_rename.await.unwrap();
4072
4073        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4074            workspace
4075                .active_item(cx)
4076                .unwrap()
4077                .downcast::<Editor>()
4078                .unwrap()
4079        });
4080        rename_editor.update(cx_b, |editor, cx| {
4081            assert_eq!(
4082                editor.text(cx),
4083                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
4084            );
4085            editor.undo(&Undo, cx);
4086            assert_eq!(
4087                editor.text(cx),
4088                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
4089            );
4090            editor.redo(&Redo, cx);
4091            assert_eq!(
4092                editor.text(cx),
4093                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
4094            );
4095        });
4096
4097        // Ensure temporary rename edits cannot be undone/redone.
4098        editor_b.update(cx_b, |editor, cx| {
4099            editor.undo(&Undo, cx);
4100            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4101            editor.undo(&Undo, cx);
4102            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4103            editor.redo(&Redo, cx);
4104            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4105        })
4106    }
4107
4108    #[gpui::test(iterations = 10)]
4109    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4110        cx_a.foreground().forbid_parking();
4111
4112        // Connect to a server as 2 clients.
4113        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4114        let client_a = server.create_client(cx_a, "user_a").await;
4115        let client_b = server.create_client(cx_b, "user_b").await;
4116
4117        // Create an org that includes these 2 users.
4118        let db = &server.app_state.db;
4119        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4120        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4121            .await
4122            .unwrap();
4123        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4124            .await
4125            .unwrap();
4126
4127        // Create a channel that includes all the users.
4128        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4129        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4130            .await
4131            .unwrap();
4132        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4133            .await
4134            .unwrap();
4135        db.create_channel_message(
4136            channel_id,
4137            client_b.current_user_id(&cx_b),
4138            "hello A, it's B.",
4139            OffsetDateTime::now_utc(),
4140            1,
4141        )
4142        .await
4143        .unwrap();
4144
4145        let channels_a = cx_a
4146            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4147        channels_a
4148            .condition(cx_a, |list, _| list.available_channels().is_some())
4149            .await;
4150        channels_a.read_with(cx_a, |list, _| {
4151            assert_eq!(
4152                list.available_channels().unwrap(),
4153                &[ChannelDetails {
4154                    id: channel_id.to_proto(),
4155                    name: "test-channel".to_string()
4156                }]
4157            )
4158        });
4159        let channel_a = channels_a.update(cx_a, |this, cx| {
4160            this.get_channel(channel_id.to_proto(), cx).unwrap()
4161        });
4162        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4163        channel_a
4164            .condition(&cx_a, |channel, _| {
4165                channel_messages(channel)
4166                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4167            })
4168            .await;
4169
4170        let channels_b = cx_b
4171            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4172        channels_b
4173            .condition(cx_b, |list, _| list.available_channels().is_some())
4174            .await;
4175        channels_b.read_with(cx_b, |list, _| {
4176            assert_eq!(
4177                list.available_channels().unwrap(),
4178                &[ChannelDetails {
4179                    id: channel_id.to_proto(),
4180                    name: "test-channel".to_string()
4181                }]
4182            )
4183        });
4184
4185        let channel_b = channels_b.update(cx_b, |this, cx| {
4186            this.get_channel(channel_id.to_proto(), cx).unwrap()
4187        });
4188        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4189        channel_b
4190            .condition(&cx_b, |channel, _| {
4191                channel_messages(channel)
4192                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4193            })
4194            .await;
4195
4196        channel_a
4197            .update(cx_a, |channel, cx| {
4198                channel
4199                    .send_message("oh, hi B.".to_string(), cx)
4200                    .unwrap()
4201                    .detach();
4202                let task = channel.send_message("sup".to_string(), cx).unwrap();
4203                assert_eq!(
4204                    channel_messages(channel),
4205                    &[
4206                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4207                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4208                        ("user_a".to_string(), "sup".to_string(), true)
4209                    ]
4210                );
4211                task
4212            })
4213            .await
4214            .unwrap();
4215
4216        channel_b
4217            .condition(&cx_b, |channel, _| {
4218                channel_messages(channel)
4219                    == [
4220                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4221                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4222                        ("user_a".to_string(), "sup".to_string(), false),
4223                    ]
4224            })
4225            .await;
4226
4227        assert_eq!(
4228            server
4229                .state()
4230                .await
4231                .channel(channel_id)
4232                .unwrap()
4233                .connection_ids
4234                .len(),
4235            2
4236        );
4237        cx_b.update(|_| drop(channel_b));
4238        server
4239            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4240            .await;
4241
4242        cx_a.update(|_| drop(channel_a));
4243        server
4244            .condition(|state| state.channel(channel_id).is_none())
4245            .await;
4246    }
4247
4248    #[gpui::test(iterations = 10)]
4249    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4250        cx_a.foreground().forbid_parking();
4251
4252        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4253        let client_a = server.create_client(cx_a, "user_a").await;
4254
4255        let db = &server.app_state.db;
4256        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4257        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4258        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4259            .await
4260            .unwrap();
4261        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4262            .await
4263            .unwrap();
4264
4265        let channels_a = cx_a
4266            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4267        channels_a
4268            .condition(cx_a, |list, _| list.available_channels().is_some())
4269            .await;
4270        let channel_a = channels_a.update(cx_a, |this, cx| {
4271            this.get_channel(channel_id.to_proto(), cx).unwrap()
4272        });
4273
4274        // Messages aren't allowed to be too long.
4275        channel_a
4276            .update(cx_a, |channel, cx| {
4277                let long_body = "this is long.\n".repeat(1024);
4278                channel.send_message(long_body, cx).unwrap()
4279            })
4280            .await
4281            .unwrap_err();
4282
4283        // Messages aren't allowed to be blank.
4284        channel_a.update(cx_a, |channel, cx| {
4285            channel.send_message(String::new(), cx).unwrap_err()
4286        });
4287
4288        // Leading and trailing whitespace are trimmed.
4289        channel_a
4290            .update(cx_a, |channel, cx| {
4291                channel
4292                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4293                    .unwrap()
4294            })
4295            .await
4296            .unwrap();
4297        assert_eq!(
4298            db.get_channel_messages(channel_id, 10, None)
4299                .await
4300                .unwrap()
4301                .iter()
4302                .map(|m| &m.body)
4303                .collect::<Vec<_>>(),
4304            &["surrounded by whitespace"]
4305        );
4306    }
4307
4308    #[gpui::test(iterations = 10)]
4309    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4310        cx_a.foreground().forbid_parking();
4311
4312        // Connect to a server as 2 clients.
4313        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4314        let client_a = server.create_client(cx_a, "user_a").await;
4315        let client_b = server.create_client(cx_b, "user_b").await;
4316        let mut status_b = client_b.status();
4317
4318        // Create an org that includes these 2 users.
4319        let db = &server.app_state.db;
4320        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4321        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4322            .await
4323            .unwrap();
4324        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4325            .await
4326            .unwrap();
4327
4328        // Create a channel that includes all the users.
4329        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4330        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4331            .await
4332            .unwrap();
4333        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4334            .await
4335            .unwrap();
4336        db.create_channel_message(
4337            channel_id,
4338            client_b.current_user_id(&cx_b),
4339            "hello A, it's B.",
4340            OffsetDateTime::now_utc(),
4341            2,
4342        )
4343        .await
4344        .unwrap();
4345
4346        let channels_a = cx_a
4347            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4348        channels_a
4349            .condition(cx_a, |list, _| list.available_channels().is_some())
4350            .await;
4351
4352        channels_a.read_with(cx_a, |list, _| {
4353            assert_eq!(
4354                list.available_channels().unwrap(),
4355                &[ChannelDetails {
4356                    id: channel_id.to_proto(),
4357                    name: "test-channel".to_string()
4358                }]
4359            )
4360        });
4361        let channel_a = channels_a.update(cx_a, |this, cx| {
4362            this.get_channel(channel_id.to_proto(), cx).unwrap()
4363        });
4364        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4365        channel_a
4366            .condition(&cx_a, |channel, _| {
4367                channel_messages(channel)
4368                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4369            })
4370            .await;
4371
4372        let channels_b = cx_b
4373            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4374        channels_b
4375            .condition(cx_b, |list, _| list.available_channels().is_some())
4376            .await;
4377        channels_b.read_with(cx_b, |list, _| {
4378            assert_eq!(
4379                list.available_channels().unwrap(),
4380                &[ChannelDetails {
4381                    id: channel_id.to_proto(),
4382                    name: "test-channel".to_string()
4383                }]
4384            )
4385        });
4386
4387        let channel_b = channels_b.update(cx_b, |this, cx| {
4388            this.get_channel(channel_id.to_proto(), cx).unwrap()
4389        });
4390        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4391        channel_b
4392            .condition(&cx_b, |channel, _| {
4393                channel_messages(channel)
4394                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4395            })
4396            .await;
4397
4398        // Disconnect client B, ensuring we can still access its cached channel data.
4399        server.forbid_connections();
4400        server.disconnect_client(client_b.current_user_id(&cx_b));
4401        cx_b.foreground().advance_clock(Duration::from_secs(3));
4402        while !matches!(
4403            status_b.next().await,
4404            Some(client::Status::ReconnectionError { .. })
4405        ) {}
4406
4407        channels_b.read_with(cx_b, |channels, _| {
4408            assert_eq!(
4409                channels.available_channels().unwrap(),
4410                [ChannelDetails {
4411                    id: channel_id.to_proto(),
4412                    name: "test-channel".to_string()
4413                }]
4414            )
4415        });
4416        channel_b.read_with(cx_b, |channel, _| {
4417            assert_eq!(
4418                channel_messages(channel),
4419                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4420            )
4421        });
4422
4423        // Send a message from client B while it is disconnected.
4424        channel_b
4425            .update(cx_b, |channel, cx| {
4426                let task = channel
4427                    .send_message("can you see this?".to_string(), cx)
4428                    .unwrap();
4429                assert_eq!(
4430                    channel_messages(channel),
4431                    &[
4432                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4433                        ("user_b".to_string(), "can you see this?".to_string(), true)
4434                    ]
4435                );
4436                task
4437            })
4438            .await
4439            .unwrap_err();
4440
4441        // Send a message from client A while B is disconnected.
4442        channel_a
4443            .update(cx_a, |channel, cx| {
4444                channel
4445                    .send_message("oh, hi B.".to_string(), cx)
4446                    .unwrap()
4447                    .detach();
4448                let task = channel.send_message("sup".to_string(), cx).unwrap();
4449                assert_eq!(
4450                    channel_messages(channel),
4451                    &[
4452                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4453                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4454                        ("user_a".to_string(), "sup".to_string(), true)
4455                    ]
4456                );
4457                task
4458            })
4459            .await
4460            .unwrap();
4461
4462        // Give client B a chance to reconnect.
4463        server.allow_connections();
4464        cx_b.foreground().advance_clock(Duration::from_secs(10));
4465
4466        // Verify that B sees the new messages upon reconnection, as well as the message client B
4467        // sent while offline.
4468        channel_b
4469            .condition(&cx_b, |channel, _| {
4470                channel_messages(channel)
4471                    == [
4472                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4473                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4474                        ("user_a".to_string(), "sup".to_string(), false),
4475                        ("user_b".to_string(), "can you see this?".to_string(), false),
4476                    ]
4477            })
4478            .await;
4479
4480        // Ensure client A and B can communicate normally after reconnection.
4481        channel_a
4482            .update(cx_a, |channel, cx| {
4483                channel.send_message("you online?".to_string(), cx).unwrap()
4484            })
4485            .await
4486            .unwrap();
4487        channel_b
4488            .condition(&cx_b, |channel, _| {
4489                channel_messages(channel)
4490                    == [
4491                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4492                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4493                        ("user_a".to_string(), "sup".to_string(), false),
4494                        ("user_b".to_string(), "can you see this?".to_string(), false),
4495                        ("user_a".to_string(), "you online?".to_string(), false),
4496                    ]
4497            })
4498            .await;
4499
4500        channel_b
4501            .update(cx_b, |channel, cx| {
4502                channel.send_message("yep".to_string(), cx).unwrap()
4503            })
4504            .await
4505            .unwrap();
4506        channel_a
4507            .condition(&cx_a, |channel, _| {
4508                channel_messages(channel)
4509                    == [
4510                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4511                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4512                        ("user_a".to_string(), "sup".to_string(), false),
4513                        ("user_b".to_string(), "can you see this?".to_string(), false),
4514                        ("user_a".to_string(), "you online?".to_string(), false),
4515                        ("user_b".to_string(), "yep".to_string(), false),
4516                    ]
4517            })
4518            .await;
4519    }
4520
4521    #[gpui::test(iterations = 10)]
4522    async fn test_contacts(
4523        cx_a: &mut TestAppContext,
4524        cx_b: &mut TestAppContext,
4525        cx_c: &mut TestAppContext,
4526    ) {
4527        cx_a.foreground().forbid_parking();
4528        let lang_registry = Arc::new(LanguageRegistry::test());
4529        let fs = FakeFs::new(cx_a.background());
4530
4531        // Connect to a server as 3 clients.
4532        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4533        let client_a = server.create_client(cx_a, "user_a").await;
4534        let client_b = server.create_client(cx_b, "user_b").await;
4535        let client_c = server.create_client(cx_c, "user_c").await;
4536
4537        // Share a worktree as client A.
4538        fs.insert_tree(
4539            "/a",
4540            json!({
4541                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4542            }),
4543        )
4544        .await;
4545
4546        let project_a = cx_a.update(|cx| {
4547            Project::local(
4548                client_a.clone(),
4549                client_a.user_store.clone(),
4550                lang_registry.clone(),
4551                fs.clone(),
4552                cx,
4553            )
4554        });
4555        let (worktree_a, _) = project_a
4556            .update(cx_a, |p, cx| {
4557                p.find_or_create_local_worktree("/a", true, cx)
4558            })
4559            .await
4560            .unwrap();
4561        worktree_a
4562            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4563            .await;
4564
4565        client_a
4566            .user_store
4567            .condition(&cx_a, |user_store, _| {
4568                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4569            })
4570            .await;
4571        client_b
4572            .user_store
4573            .condition(&cx_b, |user_store, _| {
4574                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4575            })
4576            .await;
4577        client_c
4578            .user_store
4579            .condition(&cx_c, |user_store, _| {
4580                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4581            })
4582            .await;
4583
4584        let project_id = project_a
4585            .update(cx_a, |project, _| project.next_remote_id())
4586            .await;
4587        project_a
4588            .update(cx_a, |project, cx| project.share(cx))
4589            .await
4590            .unwrap();
4591        client_a
4592            .user_store
4593            .condition(&cx_a, |user_store, _| {
4594                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4595            })
4596            .await;
4597        client_b
4598            .user_store
4599            .condition(&cx_b, |user_store, _| {
4600                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4601            })
4602            .await;
4603        client_c
4604            .user_store
4605            .condition(&cx_c, |user_store, _| {
4606                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4607            })
4608            .await;
4609
4610        let _project_b = Project::remote(
4611            project_id,
4612            client_b.clone(),
4613            client_b.user_store.clone(),
4614            lang_registry.clone(),
4615            fs.clone(),
4616            &mut cx_b.to_async(),
4617        )
4618        .await
4619        .unwrap();
4620
4621        client_a
4622            .user_store
4623            .condition(&cx_a, |user_store, _| {
4624                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4625            })
4626            .await;
4627        client_b
4628            .user_store
4629            .condition(&cx_b, |user_store, _| {
4630                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4631            })
4632            .await;
4633        client_c
4634            .user_store
4635            .condition(&cx_c, |user_store, _| {
4636                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4637            })
4638            .await;
4639
4640        project_a
4641            .condition(&cx_a, |project, _| {
4642                project.collaborators().contains_key(&client_b.peer_id)
4643            })
4644            .await;
4645
4646        cx_a.update(move |_| drop(project_a));
4647        client_a
4648            .user_store
4649            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4650            .await;
4651        client_b
4652            .user_store
4653            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4654            .await;
4655        client_c
4656            .user_store
4657            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4658            .await;
4659
4660        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
4661            user_store
4662                .contacts()
4663                .iter()
4664                .map(|contact| {
4665                    let worktrees = contact
4666                        .projects
4667                        .iter()
4668                        .map(|p| {
4669                            (
4670                                p.worktree_root_names[0].as_str(),
4671                                p.is_shared,
4672                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4673                            )
4674                        })
4675                        .collect();
4676                    (contact.user.github_login.as_str(), worktrees)
4677                })
4678                .collect()
4679        }
4680    }
4681
4682    #[gpui::test(iterations = 10)]
4683    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4684        cx_a.foreground().forbid_parking();
4685        let fs = FakeFs::new(cx_a.background());
4686
4687        // 2 clients connect to a server.
4688        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4689        let mut client_a = server.create_client(cx_a, "user_a").await;
4690        let mut client_b = server.create_client(cx_b, "user_b").await;
4691        cx_a.update(editor::init);
4692        cx_b.update(editor::init);
4693
4694        // Client A shares a project.
4695        fs.insert_tree(
4696            "/a",
4697            json!({
4698                ".zed.toml": r#"collaborators = ["user_b"]"#,
4699                "1.txt": "one",
4700                "2.txt": "two",
4701                "3.txt": "three",
4702            }),
4703        )
4704        .await;
4705        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4706        project_a
4707            .update(cx_a, |project, cx| project.share(cx))
4708            .await
4709            .unwrap();
4710
4711        // Client B joins the project.
4712        let project_b = client_b
4713            .build_remote_project(
4714                project_a
4715                    .read_with(cx_a, |project, _| project.remote_id())
4716                    .unwrap(),
4717                cx_b,
4718            )
4719            .await;
4720
4721        // Client A opens some editors.
4722        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4723        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4724        let editor_a1 = workspace_a
4725            .update(cx_a, |workspace, cx| {
4726                workspace.open_path((worktree_id, "1.txt"), cx)
4727            })
4728            .await
4729            .unwrap()
4730            .downcast::<Editor>()
4731            .unwrap();
4732        let editor_a2 = workspace_a
4733            .update(cx_a, |workspace, cx| {
4734                workspace.open_path((worktree_id, "2.txt"), cx)
4735            })
4736            .await
4737            .unwrap()
4738            .downcast::<Editor>()
4739            .unwrap();
4740
4741        // Client B opens an editor.
4742        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4743        let editor_b1 = workspace_b
4744            .update(cx_b, |workspace, cx| {
4745                workspace.open_path((worktree_id, "1.txt"), cx)
4746            })
4747            .await
4748            .unwrap()
4749            .downcast::<Editor>()
4750            .unwrap();
4751
4752        let client_a_id = project_b.read_with(cx_b, |project, _| {
4753            project.collaborators().values().next().unwrap().peer_id
4754        });
4755        let client_b_id = project_a.read_with(cx_a, |project, _| {
4756            project.collaborators().values().next().unwrap().peer_id
4757        });
4758
4759        // When client B starts following client A, all visible view states are replicated to client B.
4760        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4761        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4762        workspace_b
4763            .update(cx_b, |workspace, cx| {
4764                workspace
4765                    .toggle_follow(&ToggleFollow(client_a_id), cx)
4766                    .unwrap()
4767            })
4768            .await
4769            .unwrap();
4770        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4771            workspace
4772                .active_item(cx)
4773                .unwrap()
4774                .downcast::<Editor>()
4775                .unwrap()
4776        });
4777        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4778        assert_eq!(
4779            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4780            Some((worktree_id, "2.txt").into())
4781        );
4782        assert_eq!(
4783            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4784            vec![2..3]
4785        );
4786        assert_eq!(
4787            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4788            vec![0..1]
4789        );
4790
4791        // When client A activates a different editor, client B does so as well.
4792        workspace_a.update(cx_a, |workspace, cx| {
4793            workspace.activate_item(&editor_a1, cx)
4794        });
4795        workspace_b
4796            .condition(cx_b, |workspace, cx| {
4797                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4798            })
4799            .await;
4800
4801        // When client A navigates back and forth, client B does so as well.
4802        workspace_a
4803            .update(cx_a, |workspace, cx| {
4804                workspace::Pane::go_back(workspace, None, cx)
4805            })
4806            .await;
4807        workspace_b
4808            .condition(cx_b, |workspace, cx| {
4809                workspace.active_item(cx).unwrap().id() == editor_b2.id()
4810            })
4811            .await;
4812
4813        workspace_a
4814            .update(cx_a, |workspace, cx| {
4815                workspace::Pane::go_forward(workspace, None, cx)
4816            })
4817            .await;
4818        workspace_b
4819            .condition(cx_b, |workspace, cx| {
4820                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4821            })
4822            .await;
4823
4824        // Changes to client A's editor are reflected on client B.
4825        editor_a1.update(cx_a, |editor, cx| {
4826            editor.select_ranges([1..1, 2..2], None, cx);
4827        });
4828        editor_b1
4829            .condition(cx_b, |editor, cx| {
4830                editor.selected_ranges(cx) == vec![1..1, 2..2]
4831            })
4832            .await;
4833
4834        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
4835        editor_b1
4836            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
4837            .await;
4838
4839        editor_a1.update(cx_a, |editor, cx| {
4840            editor.select_ranges([3..3], None, cx);
4841            editor.set_scroll_position(vec2f(0., 100.), cx);
4842        });
4843        editor_b1
4844            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
4845            .await;
4846
4847        // After unfollowing, client B stops receiving updates from client A.
4848        workspace_b.update(cx_b, |workspace, cx| {
4849            workspace.unfollow(&workspace.active_pane().clone(), cx)
4850        });
4851        workspace_a.update(cx_a, |workspace, cx| {
4852            workspace.activate_item(&editor_a2, cx)
4853        });
4854        cx_a.foreground().run_until_parked();
4855        assert_eq!(
4856            workspace_b.read_with(cx_b, |workspace, cx| workspace
4857                .active_item(cx)
4858                .unwrap()
4859                .id()),
4860            editor_b1.id()
4861        );
4862
4863        // Client A starts following client B.
4864        workspace_a
4865            .update(cx_a, |workspace, cx| {
4866                workspace
4867                    .toggle_follow(&ToggleFollow(client_b_id), cx)
4868                    .unwrap()
4869            })
4870            .await
4871            .unwrap();
4872        assert_eq!(
4873            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4874            Some(client_b_id)
4875        );
4876        assert_eq!(
4877            workspace_a.read_with(cx_a, |workspace, cx| workspace
4878                .active_item(cx)
4879                .unwrap()
4880                .id()),
4881            editor_a1.id()
4882        );
4883
4884        // Following interrupts when client B disconnects.
4885        client_b.disconnect(&cx_b.to_async()).unwrap();
4886        cx_a.foreground().run_until_parked();
4887        assert_eq!(
4888            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4889            None
4890        );
4891    }
4892
4893    #[gpui::test(iterations = 10)]
4894    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4895        cx_a.foreground().forbid_parking();
4896        let fs = FakeFs::new(cx_a.background());
4897
4898        // 2 clients connect to a server.
4899        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4900        let mut client_a = server.create_client(cx_a, "user_a").await;
4901        let mut client_b = server.create_client(cx_b, "user_b").await;
4902        cx_a.update(editor::init);
4903        cx_b.update(editor::init);
4904
4905        // Client A shares a project.
4906        fs.insert_tree(
4907            "/a",
4908            json!({
4909                ".zed.toml": r#"collaborators = ["user_b"]"#,
4910                "1.txt": "one",
4911                "2.txt": "two",
4912                "3.txt": "three",
4913                "4.txt": "four",
4914            }),
4915        )
4916        .await;
4917        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4918        project_a
4919            .update(cx_a, |project, cx| project.share(cx))
4920            .await
4921            .unwrap();
4922
4923        // Client B joins the project.
4924        let project_b = client_b
4925            .build_remote_project(
4926                project_a
4927                    .read_with(cx_a, |project, _| project.remote_id())
4928                    .unwrap(),
4929                cx_b,
4930            )
4931            .await;
4932
4933        // Client A opens some editors.
4934        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4935        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4936        let _editor_a1 = workspace_a
4937            .update(cx_a, |workspace, cx| {
4938                workspace.open_path((worktree_id, "1.txt"), cx)
4939            })
4940            .await
4941            .unwrap()
4942            .downcast::<Editor>()
4943            .unwrap();
4944
4945        // Client B opens an editor.
4946        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4947        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
4948        let _editor_b1 = workspace_b
4949            .update(cx_b, |workspace, cx| {
4950                workspace.open_path((worktree_id, "2.txt"), cx)
4951            })
4952            .await
4953            .unwrap()
4954            .downcast::<Editor>()
4955            .unwrap();
4956
4957        // Clients A and B follow each other in split panes
4958        workspace_a
4959            .update(cx_a, |workspace, cx| {
4960                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4961                assert_ne!(*workspace.active_pane(), pane_a1);
4962                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
4963                workspace
4964                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4965                    .unwrap()
4966            })
4967            .await
4968            .unwrap();
4969        workspace_b
4970            .update(cx_b, |workspace, cx| {
4971                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4972                assert_ne!(*workspace.active_pane(), pane_b1);
4973                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
4974                workspace
4975                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4976                    .unwrap()
4977            })
4978            .await
4979            .unwrap();
4980
4981        workspace_a
4982            .update(cx_a, |workspace, cx| {
4983                workspace.activate_next_pane(cx);
4984                assert_eq!(*workspace.active_pane(), pane_a1);
4985                workspace.open_path((worktree_id, "3.txt"), cx)
4986            })
4987            .await
4988            .unwrap();
4989        workspace_b
4990            .update(cx_b, |workspace, cx| {
4991                workspace.activate_next_pane(cx);
4992                assert_eq!(*workspace.active_pane(), pane_b1);
4993                workspace.open_path((worktree_id, "4.txt"), cx)
4994            })
4995            .await
4996            .unwrap();
4997        cx_a.foreground().run_until_parked();
4998
4999        // Ensure leader updates don't change the active pane of followers
5000        workspace_a.read_with(cx_a, |workspace, _| {
5001            assert_eq!(*workspace.active_pane(), pane_a1);
5002        });
5003        workspace_b.read_with(cx_b, |workspace, _| {
5004            assert_eq!(*workspace.active_pane(), pane_b1);
5005        });
5006
5007        // Ensure peers following each other doesn't cause an infinite loop.
5008        assert_eq!(
5009            workspace_a.read_with(cx_a, |workspace, cx| workspace
5010                .active_item(cx)
5011                .unwrap()
5012                .project_path(cx)),
5013            Some((worktree_id, "3.txt").into())
5014        );
5015        workspace_a.update(cx_a, |workspace, cx| {
5016            assert_eq!(
5017                workspace.active_item(cx).unwrap().project_path(cx),
5018                Some((worktree_id, "3.txt").into())
5019            );
5020            workspace.activate_next_pane(cx);
5021            assert_eq!(
5022                workspace.active_item(cx).unwrap().project_path(cx),
5023                Some((worktree_id, "4.txt").into())
5024            );
5025        });
5026        workspace_b.update(cx_b, |workspace, cx| {
5027            assert_eq!(
5028                workspace.active_item(cx).unwrap().project_path(cx),
5029                Some((worktree_id, "4.txt").into())
5030            );
5031            workspace.activate_next_pane(cx);
5032            assert_eq!(
5033                workspace.active_item(cx).unwrap().project_path(cx),
5034                Some((worktree_id, "3.txt").into())
5035            );
5036        });
5037    }
5038
5039    #[gpui::test(iterations = 10)]
5040    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5041        cx_a.foreground().forbid_parking();
5042        let fs = FakeFs::new(cx_a.background());
5043
5044        // 2 clients connect to a server.
5045        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5046        let mut client_a = server.create_client(cx_a, "user_a").await;
5047        let mut client_b = server.create_client(cx_b, "user_b").await;
5048        cx_a.update(editor::init);
5049        cx_b.update(editor::init);
5050
5051        // Client A shares a project.
5052        fs.insert_tree(
5053            "/a",
5054            json!({
5055                ".zed.toml": r#"collaborators = ["user_b"]"#,
5056                "1.txt": "one",
5057                "2.txt": "two",
5058                "3.txt": "three",
5059            }),
5060        )
5061        .await;
5062        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5063        project_a
5064            .update(cx_a, |project, cx| project.share(cx))
5065            .await
5066            .unwrap();
5067
5068        // Client B joins the project.
5069        let project_b = client_b
5070            .build_remote_project(
5071                project_a
5072                    .read_with(cx_a, |project, _| project.remote_id())
5073                    .unwrap(),
5074                cx_b,
5075            )
5076            .await;
5077
5078        // Client A opens some editors.
5079        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5080        let _editor_a1 = workspace_a
5081            .update(cx_a, |workspace, cx| {
5082                workspace.open_path((worktree_id, "1.txt"), cx)
5083            })
5084            .await
5085            .unwrap()
5086            .downcast::<Editor>()
5087            .unwrap();
5088
5089        // Client B starts following client A.
5090        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5091        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5092        let leader_id = project_b.read_with(cx_b, |project, _| {
5093            project.collaborators().values().next().unwrap().peer_id
5094        });
5095        workspace_b
5096            .update(cx_b, |workspace, cx| {
5097                workspace
5098                    .toggle_follow(&ToggleFollow(leader_id), cx)
5099                    .unwrap()
5100            })
5101            .await
5102            .unwrap();
5103        assert_eq!(
5104            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5105            Some(leader_id)
5106        );
5107        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5108            workspace
5109                .active_item(cx)
5110                .unwrap()
5111                .downcast::<Editor>()
5112                .unwrap()
5113        });
5114
5115        // When client B moves, it automatically stops following client A.
5116        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5117        assert_eq!(
5118            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5119            None
5120        );
5121
5122        workspace_b
5123            .update(cx_b, |workspace, cx| {
5124                workspace
5125                    .toggle_follow(&ToggleFollow(leader_id), cx)
5126                    .unwrap()
5127            })
5128            .await
5129            .unwrap();
5130        assert_eq!(
5131            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5132            Some(leader_id)
5133        );
5134
5135        // When client B edits, it automatically stops following client A.
5136        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5137        assert_eq!(
5138            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5139            None
5140        );
5141
5142        workspace_b
5143            .update(cx_b, |workspace, cx| {
5144                workspace
5145                    .toggle_follow(&ToggleFollow(leader_id), cx)
5146                    .unwrap()
5147            })
5148            .await
5149            .unwrap();
5150        assert_eq!(
5151            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5152            Some(leader_id)
5153        );
5154
5155        // When client B scrolls, it automatically stops following client A.
5156        editor_b2.update(cx_b, |editor, cx| {
5157            editor.set_scroll_position(vec2f(0., 3.), cx)
5158        });
5159        assert_eq!(
5160            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5161            None
5162        );
5163
5164        workspace_b
5165            .update(cx_b, |workspace, cx| {
5166                workspace
5167                    .toggle_follow(&ToggleFollow(leader_id), cx)
5168                    .unwrap()
5169            })
5170            .await
5171            .unwrap();
5172        assert_eq!(
5173            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5174            Some(leader_id)
5175        );
5176
5177        // When client B activates a different pane, it continues following client A in the original pane.
5178        workspace_b.update(cx_b, |workspace, cx| {
5179            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5180        });
5181        assert_eq!(
5182            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5183            Some(leader_id)
5184        );
5185
5186        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5187        assert_eq!(
5188            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5189            Some(leader_id)
5190        );
5191
5192        // When client B activates a different item in the original pane, it automatically stops following client A.
5193        workspace_b
5194            .update(cx_b, |workspace, cx| {
5195                workspace.open_path((worktree_id, "2.txt"), cx)
5196            })
5197            .await
5198            .unwrap();
5199        assert_eq!(
5200            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5201            None
5202        );
5203    }
5204
5205    #[gpui::test(iterations = 100)]
5206    async fn test_random_collaboration(
5207        cx: &mut TestAppContext,
5208        deterministic: Arc<Deterministic>,
5209        rng: StdRng,
5210    ) {
5211        cx.foreground().forbid_parking();
5212        let max_peers = env::var("MAX_PEERS")
5213            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5214            .unwrap_or(5);
5215        assert!(max_peers <= 5);
5216
5217        let max_operations = env::var("OPERATIONS")
5218            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5219            .unwrap_or(10);
5220
5221        let rng = Arc::new(Mutex::new(rng));
5222
5223        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5224        let host_language_registry = Arc::new(LanguageRegistry::test());
5225
5226        let fs = FakeFs::new(cx.background());
5227        fs.insert_tree(
5228            "/_collab",
5229            json!({
5230                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5231            }),
5232        )
5233        .await;
5234
5235        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5236        let mut clients = Vec::new();
5237        let mut user_ids = Vec::new();
5238        let mut op_start_signals = Vec::new();
5239        let files = Arc::new(Mutex::new(Vec::new()));
5240
5241        let mut next_entity_id = 100000;
5242        let mut host_cx = TestAppContext::new(
5243            cx.foreground_platform(),
5244            cx.platform(),
5245            deterministic.build_foreground(next_entity_id),
5246            deterministic.build_background(),
5247            cx.font_cache(),
5248            cx.leak_detector(),
5249            next_entity_id,
5250        );
5251        let host = server.create_client(&mut host_cx, "host").await;
5252        let host_project = host_cx.update(|cx| {
5253            Project::local(
5254                host.client.clone(),
5255                host.user_store.clone(),
5256                host_language_registry.clone(),
5257                fs.clone(),
5258                cx,
5259            )
5260        });
5261        let host_project_id = host_project
5262            .update(&mut host_cx, |p, _| p.next_remote_id())
5263            .await;
5264
5265        let (collab_worktree, _) = host_project
5266            .update(&mut host_cx, |project, cx| {
5267                project.find_or_create_local_worktree("/_collab", true, cx)
5268            })
5269            .await
5270            .unwrap();
5271        collab_worktree
5272            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5273            .await;
5274        host_project
5275            .update(&mut host_cx, |project, cx| project.share(cx))
5276            .await
5277            .unwrap();
5278
5279        // Set up fake language servers.
5280        let mut language = Language::new(
5281            LanguageConfig {
5282                name: "Rust".into(),
5283                path_suffixes: vec!["rs".to_string()],
5284                ..Default::default()
5285            },
5286            None,
5287        );
5288        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5289            name: "the-fake-language-server",
5290            capabilities: lsp::LanguageServer::full_capabilities(),
5291            initializer: Some(Box::new({
5292                let rng = rng.clone();
5293                let files = files.clone();
5294                let project = host_project.downgrade();
5295                move |fake_server: &mut FakeLanguageServer| {
5296                    fake_server.handle_request::<lsp::request::Completion, _, _>(
5297                        |_, _| async move {
5298                            Ok(Some(lsp::CompletionResponse::Array(vec![
5299                                lsp::CompletionItem {
5300                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5301                                        range: lsp::Range::new(
5302                                            lsp::Position::new(0, 0),
5303                                            lsp::Position::new(0, 0),
5304                                        ),
5305                                        new_text: "the-new-text".to_string(),
5306                                    })),
5307                                    ..Default::default()
5308                                },
5309                            ])))
5310                        },
5311                    );
5312
5313                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5314                        |_, _| async move {
5315                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5316                                lsp::CodeAction {
5317                                    title: "the-code-action".to_string(),
5318                                    ..Default::default()
5319                                },
5320                            )]))
5321                        },
5322                    );
5323
5324                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5325                        |params, _| async move {
5326                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5327                                params.position,
5328                                params.position,
5329                            ))))
5330                        },
5331                    );
5332
5333                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5334                        let files = files.clone();
5335                        let rng = rng.clone();
5336                        move |_, _| {
5337                            let files = files.clone();
5338                            let rng = rng.clone();
5339                            async move {
5340                                let files = files.lock();
5341                                let mut rng = rng.lock();
5342                                let count = rng.gen_range::<usize, _>(1..3);
5343                                let files = (0..count)
5344                                    .map(|_| files.choose(&mut *rng).unwrap())
5345                                    .collect::<Vec<_>>();
5346                                log::info!("LSP: Returning definitions in files {:?}", &files);
5347                                Ok(Some(lsp::GotoDefinitionResponse::Array(
5348                                    files
5349                                        .into_iter()
5350                                        .map(|file| lsp::Location {
5351                                            uri: lsp::Url::from_file_path(file).unwrap(),
5352                                            range: Default::default(),
5353                                        })
5354                                        .collect(),
5355                                )))
5356                            }
5357                        }
5358                    });
5359
5360                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5361                        let rng = rng.clone();
5362                        let project = project.clone();
5363                        move |params, mut cx| {
5364                            let highlights = if let Some(project) = project.upgrade(&cx) {
5365                                project.update(&mut cx, |project, cx| {
5366                                    let path = params
5367                                        .text_document_position_params
5368                                        .text_document
5369                                        .uri
5370                                        .to_file_path()
5371                                        .unwrap();
5372                                    let (worktree, relative_path) =
5373                                        project.find_local_worktree(&path, cx)?;
5374                                    let project_path =
5375                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
5376                                    let buffer =
5377                                        project.get_open_buffer(&project_path, cx)?.read(cx);
5378
5379                                    let mut highlights = Vec::new();
5380                                    let highlight_count = rng.lock().gen_range(1..=5);
5381                                    let mut prev_end = 0;
5382                                    for _ in 0..highlight_count {
5383                                        let range =
5384                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
5385
5386                                        highlights.push(lsp::DocumentHighlight {
5387                                            range: range_to_lsp(range.to_point_utf16(buffer)),
5388                                            kind: Some(lsp::DocumentHighlightKind::READ),
5389                                        });
5390                                        prev_end = range.end;
5391                                    }
5392                                    Some(highlights)
5393                                })
5394                            } else {
5395                                None
5396                            };
5397                            async move { Ok(highlights) }
5398                        }
5399                    });
5400                }
5401            })),
5402            ..Default::default()
5403        });
5404        host_language_registry.add(Arc::new(language));
5405
5406        let op_start_signal = futures::channel::mpsc::unbounded();
5407        user_ids.push(host.current_user_id(&host_cx));
5408        op_start_signals.push(op_start_signal.0);
5409        clients.push(host_cx.foreground().spawn(host.simulate_host(
5410            host_project,
5411            files,
5412            op_start_signal.1,
5413            rng.clone(),
5414            host_cx,
5415        )));
5416
5417        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5418            rng.lock().gen_range(0..max_operations)
5419        } else {
5420            max_operations
5421        };
5422        let mut available_guests = vec![
5423            "guest-1".to_string(),
5424            "guest-2".to_string(),
5425            "guest-3".to_string(),
5426            "guest-4".to_string(),
5427        ];
5428        let mut operations = 0;
5429        while operations < max_operations {
5430            if operations == disconnect_host_at {
5431                server.disconnect_client(user_ids[0]);
5432                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5433                drop(op_start_signals);
5434                let mut clients = futures::future::join_all(clients).await;
5435                cx.foreground().run_until_parked();
5436
5437                let (host, mut host_cx, host_err) = clients.remove(0);
5438                if let Some(host_err) = host_err {
5439                    log::error!("host error - {}", host_err);
5440                }
5441                host.project
5442                    .as_ref()
5443                    .unwrap()
5444                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5445                for (guest, mut guest_cx, guest_err) in clients {
5446                    if let Some(guest_err) = guest_err {
5447                        log::error!("{} error - {}", guest.username, guest_err);
5448                    }
5449                    let contacts = server
5450                        .store
5451                        .read()
5452                        .await
5453                        .contacts_for_user(guest.current_user_id(&guest_cx));
5454                    assert!(!contacts
5455                        .iter()
5456                        .flat_map(|contact| &contact.projects)
5457                        .any(|project| project.id == host_project_id));
5458                    guest
5459                        .project
5460                        .as_ref()
5461                        .unwrap()
5462                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5463                    guest_cx.update(|_| drop(guest));
5464                }
5465                host_cx.update(|_| drop(host));
5466
5467                return;
5468            }
5469
5470            let distribution = rng.lock().gen_range(0..100);
5471            match distribution {
5472                0..=19 if !available_guests.is_empty() => {
5473                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
5474                    let guest_username = available_guests.remove(guest_ix);
5475                    log::info!("Adding new connection for {}", guest_username);
5476                    next_entity_id += 100000;
5477                    let mut guest_cx = TestAppContext::new(
5478                        cx.foreground_platform(),
5479                        cx.platform(),
5480                        deterministic.build_foreground(next_entity_id),
5481                        deterministic.build_background(),
5482                        cx.font_cache(),
5483                        cx.leak_detector(),
5484                        next_entity_id,
5485                    );
5486                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
5487                    let guest_project = Project::remote(
5488                        host_project_id,
5489                        guest.client.clone(),
5490                        guest.user_store.clone(),
5491                        guest_lang_registry.clone(),
5492                        FakeFs::new(cx.background()),
5493                        &mut guest_cx.to_async(),
5494                    )
5495                    .await
5496                    .unwrap();
5497                    let op_start_signal = futures::channel::mpsc::unbounded();
5498                    user_ids.push(guest.current_user_id(&guest_cx));
5499                    op_start_signals.push(op_start_signal.0);
5500                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5501                        guest_username.clone(),
5502                        guest_project,
5503                        op_start_signal.1,
5504                        rng.clone(),
5505                        guest_cx,
5506                    )));
5507
5508                    log::info!("Added connection for {}", guest_username);
5509                    operations += 1;
5510                }
5511                20..=29 if clients.len() > 1 => {
5512                    log::info!("Removing guest");
5513                    let guest_ix = rng.lock().gen_range(1..clients.len());
5514                    let removed_guest_id = user_ids.remove(guest_ix);
5515                    let guest = clients.remove(guest_ix);
5516                    op_start_signals.remove(guest_ix);
5517                    server.disconnect_client(removed_guest_id);
5518                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5519                    let (guest, mut guest_cx, guest_err) = guest.await;
5520                    if let Some(guest_err) = guest_err {
5521                        log::error!("{} error - {}", guest.username, guest_err);
5522                    }
5523                    guest
5524                        .project
5525                        .as_ref()
5526                        .unwrap()
5527                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5528                    for user_id in &user_ids {
5529                        for contact in server.store.read().await.contacts_for_user(*user_id) {
5530                            assert_ne!(
5531                                contact.user_id, removed_guest_id.0 as u64,
5532                                "removed guest is still a contact of another peer"
5533                            );
5534                            for project in contact.projects {
5535                                for project_guest_id in project.guests {
5536                                    assert_ne!(
5537                                        project_guest_id, removed_guest_id.0 as u64,
5538                                        "removed guest appears as still participating on a project"
5539                                    );
5540                                }
5541                            }
5542                        }
5543                    }
5544
5545                    log::info!("{} removed", guest.username);
5546                    available_guests.push(guest.username.clone());
5547                    guest_cx.update(|_| drop(guest));
5548
5549                    operations += 1;
5550                }
5551                _ => {
5552                    while operations < max_operations && rng.lock().gen_bool(0.7) {
5553                        op_start_signals
5554                            .choose(&mut *rng.lock())
5555                            .unwrap()
5556                            .unbounded_send(())
5557                            .unwrap();
5558                        operations += 1;
5559                    }
5560
5561                    if rng.lock().gen_bool(0.8) {
5562                        cx.foreground().run_until_parked();
5563                    }
5564                }
5565            }
5566        }
5567
5568        drop(op_start_signals);
5569        let mut clients = futures::future::join_all(clients).await;
5570        cx.foreground().run_until_parked();
5571
5572        let (host_client, mut host_cx, host_err) = clients.remove(0);
5573        if let Some(host_err) = host_err {
5574            panic!("host error - {}", host_err);
5575        }
5576        let host_project = host_client.project.as_ref().unwrap();
5577        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5578            project
5579                .worktrees(cx)
5580                .map(|worktree| {
5581                    let snapshot = worktree.read(cx).snapshot();
5582                    (snapshot.id(), snapshot)
5583                })
5584                .collect::<BTreeMap<_, _>>()
5585        });
5586
5587        host_client
5588            .project
5589            .as_ref()
5590            .unwrap()
5591            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5592
5593        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5594            if let Some(guest_err) = guest_err {
5595                panic!("{} error - {}", guest_client.username, guest_err);
5596            }
5597            let worktree_snapshots =
5598                guest_client
5599                    .project
5600                    .as_ref()
5601                    .unwrap()
5602                    .read_with(&guest_cx, |project, cx| {
5603                        project
5604                            .worktrees(cx)
5605                            .map(|worktree| {
5606                                let worktree = worktree.read(cx);
5607                                (worktree.id(), worktree.snapshot())
5608                            })
5609                            .collect::<BTreeMap<_, _>>()
5610                    });
5611
5612            assert_eq!(
5613                worktree_snapshots.keys().collect::<Vec<_>>(),
5614                host_worktree_snapshots.keys().collect::<Vec<_>>(),
5615                "{} has different worktrees than the host",
5616                guest_client.username
5617            );
5618            for (id, host_snapshot) in &host_worktree_snapshots {
5619                let guest_snapshot = &worktree_snapshots[id];
5620                assert_eq!(
5621                    guest_snapshot.root_name(),
5622                    host_snapshot.root_name(),
5623                    "{} has different root name than the host for worktree {}",
5624                    guest_client.username,
5625                    id
5626                );
5627                assert_eq!(
5628                    guest_snapshot.entries(false).collect::<Vec<_>>(),
5629                    host_snapshot.entries(false).collect::<Vec<_>>(),
5630                    "{} has different snapshot than the host for worktree {}",
5631                    guest_client.username,
5632                    id
5633                );
5634            }
5635
5636            guest_client
5637                .project
5638                .as_ref()
5639                .unwrap()
5640                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5641
5642            for guest_buffer in &guest_client.buffers {
5643                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5644                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5645                    project.buffer_for_id(buffer_id, cx).expect(&format!(
5646                        "host does not have buffer for guest:{}, peer:{}, id:{}",
5647                        guest_client.username, guest_client.peer_id, buffer_id
5648                    ))
5649                });
5650                let path = host_buffer
5651                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5652
5653                assert_eq!(
5654                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5655                    0,
5656                    "{}, buffer {}, path {:?} has deferred operations",
5657                    guest_client.username,
5658                    buffer_id,
5659                    path,
5660                );
5661                assert_eq!(
5662                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5663                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5664                    "{}, buffer {}, path {:?}, differs from the host's buffer",
5665                    guest_client.username,
5666                    buffer_id,
5667                    path
5668                );
5669            }
5670
5671            guest_cx.update(|_| drop(guest_client));
5672        }
5673
5674        host_cx.update(|_| drop(host_client));
5675    }
5676
5677    struct TestServer {
5678        peer: Arc<Peer>,
5679        app_state: Arc<AppState>,
5680        server: Arc<Server>,
5681        foreground: Rc<executor::Foreground>,
5682        notifications: mpsc::UnboundedReceiver<()>,
5683        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5684        forbid_connections: Arc<AtomicBool>,
5685        _test_db: TestDb,
5686    }
5687
5688    impl TestServer {
5689        async fn start(
5690            foreground: Rc<executor::Foreground>,
5691            background: Arc<executor::Background>,
5692        ) -> Self {
5693            let test_db = TestDb::fake(background);
5694            let app_state = Self::build_app_state(&test_db).await;
5695            let peer = Peer::new();
5696            let notifications = mpsc::unbounded();
5697            let server = Server::new(app_state.clone(), Some(notifications.0));
5698            Self {
5699                peer,
5700                app_state,
5701                server,
5702                foreground,
5703                notifications: notifications.1,
5704                connection_killers: Default::default(),
5705                forbid_connections: Default::default(),
5706                _test_db: test_db,
5707            }
5708        }
5709
5710        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
5711            cx.update(|cx| {
5712                let settings = Settings::test(cx);
5713                cx.set_global(settings);
5714            });
5715
5716            let http = FakeHttpClient::with_404_response();
5717            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
5718            let client_name = name.to_string();
5719            let mut client = Client::new(http.clone());
5720            let server = self.server.clone();
5721            let connection_killers = self.connection_killers.clone();
5722            let forbid_connections = self.forbid_connections.clone();
5723            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
5724
5725            Arc::get_mut(&mut client)
5726                .unwrap()
5727                .override_authenticate(move |cx| {
5728                    cx.spawn(|_| async move {
5729                        let access_token = "the-token".to_string();
5730                        Ok(Credentials {
5731                            user_id: user_id.0 as u64,
5732                            access_token,
5733                        })
5734                    })
5735                })
5736                .override_establish_connection(move |credentials, cx| {
5737                    assert_eq!(credentials.user_id, user_id.0 as u64);
5738                    assert_eq!(credentials.access_token, "the-token");
5739
5740                    let server = server.clone();
5741                    let connection_killers = connection_killers.clone();
5742                    let forbid_connections = forbid_connections.clone();
5743                    let client_name = client_name.clone();
5744                    let connection_id_tx = connection_id_tx.clone();
5745                    cx.spawn(move |cx| async move {
5746                        if forbid_connections.load(SeqCst) {
5747                            Err(EstablishConnectionError::other(anyhow!(
5748                                "server is forbidding connections"
5749                            )))
5750                        } else {
5751                            let (client_conn, server_conn, killed) =
5752                                Connection::in_memory(cx.background());
5753                            connection_killers.lock().insert(user_id, killed);
5754                            cx.background()
5755                                .spawn(server.handle_connection(
5756                                    server_conn,
5757                                    client_name,
5758                                    user_id,
5759                                    Some(connection_id_tx),
5760                                    cx.background(),
5761                                ))
5762                                .detach();
5763                            Ok(client_conn)
5764                        }
5765                    })
5766                });
5767
5768            client
5769                .authenticate_and_connect(false, &cx.to_async())
5770                .await
5771                .unwrap();
5772
5773            Channel::init(&client);
5774            Project::init(&client);
5775            cx.update(|cx| {
5776                workspace::init(&client, cx);
5777            });
5778
5779            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5780            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5781
5782            let client = TestClient {
5783                client,
5784                peer_id,
5785                username: name.to_string(),
5786                user_store,
5787                language_registry: Arc::new(LanguageRegistry::test()),
5788                project: Default::default(),
5789                buffers: Default::default(),
5790            };
5791            client.wait_for_current_user(cx).await;
5792            client
5793        }
5794
5795        fn disconnect_client(&self, user_id: UserId) {
5796            self.connection_killers
5797                .lock()
5798                .remove(&user_id)
5799                .unwrap()
5800                .store(true, SeqCst);
5801        }
5802
5803        fn forbid_connections(&self) {
5804            self.forbid_connections.store(true, SeqCst);
5805        }
5806
5807        fn allow_connections(&self) {
5808            self.forbid_connections.store(false, SeqCst);
5809        }
5810
5811        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
5812            Arc::new(AppState {
5813                db: test_db.db().clone(),
5814                api_token: Default::default(),
5815            })
5816        }
5817
5818        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
5819            self.server.store.read().await
5820        }
5821
5822        async fn condition<F>(&mut self, mut predicate: F)
5823        where
5824            F: FnMut(&Store) -> bool,
5825        {
5826            assert!(
5827                self.foreground.parking_forbidden(),
5828                "you must call forbid_parking to use server conditions so we don't block indefinitely"
5829            );
5830            while !(predicate)(&*self.server.store.read().await) {
5831                self.foreground.start_waiting();
5832                self.notifications.next().await;
5833                self.foreground.finish_waiting();
5834            }
5835        }
5836    }
5837
5838    impl Deref for TestServer {
5839        type Target = Server;
5840
5841        fn deref(&self) -> &Self::Target {
5842            &self.server
5843        }
5844    }
5845
5846    impl Drop for TestServer {
5847        fn drop(&mut self) {
5848            self.peer.reset();
5849        }
5850    }
5851
5852    struct TestClient {
5853        client: Arc<Client>,
5854        username: String,
5855        pub peer_id: PeerId,
5856        pub user_store: ModelHandle<UserStore>,
5857        language_registry: Arc<LanguageRegistry>,
5858        project: Option<ModelHandle<Project>>,
5859        buffers: HashSet<ModelHandle<language::Buffer>>,
5860    }
5861
5862    impl Deref for TestClient {
5863        type Target = Arc<Client>;
5864
5865        fn deref(&self) -> &Self::Target {
5866            &self.client
5867        }
5868    }
5869
5870    impl TestClient {
5871        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
5872            UserId::from_proto(
5873                self.user_store
5874                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
5875            )
5876        }
5877
5878        async fn wait_for_current_user(&self, cx: &TestAppContext) {
5879            let mut authed_user = self
5880                .user_store
5881                .read_with(cx, |user_store, _| user_store.watch_current_user());
5882            while authed_user.next().await.unwrap().is_none() {}
5883        }
5884
5885        async fn build_local_project(
5886            &mut self,
5887            fs: Arc<FakeFs>,
5888            root_path: impl AsRef<Path>,
5889            cx: &mut TestAppContext,
5890        ) -> (ModelHandle<Project>, WorktreeId) {
5891            let project = cx.update(|cx| {
5892                Project::local(
5893                    self.client.clone(),
5894                    self.user_store.clone(),
5895                    self.language_registry.clone(),
5896                    fs,
5897                    cx,
5898                )
5899            });
5900            self.project = Some(project.clone());
5901            let (worktree, _) = project
5902                .update(cx, |p, cx| {
5903                    p.find_or_create_local_worktree(root_path, true, cx)
5904                })
5905                .await
5906                .unwrap();
5907            worktree
5908                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
5909                .await;
5910            project
5911                .update(cx, |project, _| project.next_remote_id())
5912                .await;
5913            (project, worktree.read_with(cx, |tree, _| tree.id()))
5914        }
5915
5916        async fn build_remote_project(
5917            &mut self,
5918            project_id: u64,
5919            cx: &mut TestAppContext,
5920        ) -> ModelHandle<Project> {
5921            let project = Project::remote(
5922                project_id,
5923                self.client.clone(),
5924                self.user_store.clone(),
5925                self.language_registry.clone(),
5926                FakeFs::new(cx.background()),
5927                &mut cx.to_async(),
5928            )
5929            .await
5930            .unwrap();
5931            self.project = Some(project.clone());
5932            project
5933        }
5934
5935        fn build_workspace(
5936            &self,
5937            project: &ModelHandle<Project>,
5938            cx: &mut TestAppContext,
5939        ) -> ViewHandle<Workspace> {
5940            let (window_id, _) = cx.add_window(|_| EmptyView);
5941            cx.add_view(window_id, |cx| {
5942                let fs = project.read(cx).fs().clone();
5943                Workspace::new(
5944                    &WorkspaceParams {
5945                        fs,
5946                        project: project.clone(),
5947                        user_store: self.user_store.clone(),
5948                        languages: self.language_registry.clone(),
5949                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
5950                        channel_list: cx.add_model(|cx| {
5951                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
5952                        }),
5953                        client: self.client.clone(),
5954                    },
5955                    cx,
5956                )
5957            })
5958        }
5959
5960        async fn simulate_host(
5961            mut self,
5962            project: ModelHandle<Project>,
5963            files: Arc<Mutex<Vec<PathBuf>>>,
5964            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5965            rng: Arc<Mutex<StdRng>>,
5966            mut cx: TestAppContext,
5967        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
5968            async fn simulate_host_internal(
5969                client: &mut TestClient,
5970                project: ModelHandle<Project>,
5971                files: Arc<Mutex<Vec<PathBuf>>>,
5972                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5973                rng: Arc<Mutex<StdRng>>,
5974                cx: &mut TestAppContext,
5975            ) -> anyhow::Result<()> {
5976                let fs = project.read_with(cx, |project, _| project.fs().clone());
5977
5978                while op_start_signal.next().await.is_some() {
5979                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
5980                    match distribution {
5981                        0..=20 if !files.lock().is_empty() => {
5982                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5983                            let mut path = path.as_path();
5984                            while let Some(parent_path) = path.parent() {
5985                                path = parent_path;
5986                                if rng.lock().gen() {
5987                                    break;
5988                                }
5989                            }
5990
5991                            log::info!("Host: find/create local worktree {:?}", path);
5992                            let find_or_create_worktree = project.update(cx, |project, cx| {
5993                                project.find_or_create_local_worktree(path, true, cx)
5994                            });
5995                            if rng.lock().gen() {
5996                                cx.background().spawn(find_or_create_worktree).detach();
5997                            } else {
5998                                find_or_create_worktree.await?;
5999                            }
6000                        }
6001                        10..=80 if !files.lock().is_empty() => {
6002                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6003                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6004                                let (worktree, path) = project
6005                                    .update(cx, |project, cx| {
6006                                        project.find_or_create_local_worktree(
6007                                            file.clone(),
6008                                            true,
6009                                            cx,
6010                                        )
6011                                    })
6012                                    .await?;
6013                                let project_path =
6014                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6015                                log::info!(
6016                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6017                                    file,
6018                                    project_path.0,
6019                                    project_path.1
6020                                );
6021                                let buffer = project
6022                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6023                                    .await
6024                                    .unwrap();
6025                                client.buffers.insert(buffer.clone());
6026                                buffer
6027                            } else {
6028                                client
6029                                    .buffers
6030                                    .iter()
6031                                    .choose(&mut *rng.lock())
6032                                    .unwrap()
6033                                    .clone()
6034                            };
6035
6036                            if rng.lock().gen_bool(0.1) {
6037                                cx.update(|cx| {
6038                                    log::info!(
6039                                        "Host: dropping buffer {:?}",
6040                                        buffer.read(cx).file().unwrap().full_path(cx)
6041                                    );
6042                                    client.buffers.remove(&buffer);
6043                                    drop(buffer);
6044                                });
6045                            } else {
6046                                buffer.update(cx, |buffer, cx| {
6047                                    log::info!(
6048                                        "Host: updating buffer {:?} ({})",
6049                                        buffer.file().unwrap().full_path(cx),
6050                                        buffer.remote_id()
6051                                    );
6052
6053                                    if rng.lock().gen_bool(0.7) {
6054                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6055                                    } else {
6056                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6057                                    }
6058                                });
6059                            }
6060                        }
6061                        _ => loop {
6062                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6063                            let mut path = PathBuf::new();
6064                            path.push("/");
6065                            for _ in 0..path_component_count {
6066                                let letter = rng.lock().gen_range(b'a'..=b'z');
6067                                path.push(std::str::from_utf8(&[letter]).unwrap());
6068                            }
6069                            path.set_extension("rs");
6070                            let parent_path = path.parent().unwrap();
6071
6072                            log::info!("Host: creating file {:?}", path,);
6073
6074                            if fs.create_dir(&parent_path).await.is_ok()
6075                                && fs.create_file(&path, Default::default()).await.is_ok()
6076                            {
6077                                files.lock().push(path);
6078                                break;
6079                            } else {
6080                                log::info!("Host: cannot create file");
6081                            }
6082                        },
6083                    }
6084
6085                    cx.background().simulate_random_delay().await;
6086                }
6087
6088                Ok(())
6089            }
6090
6091            let result = simulate_host_internal(
6092                &mut self,
6093                project.clone(),
6094                files,
6095                op_start_signal,
6096                rng,
6097                &mut cx,
6098            )
6099            .await;
6100            log::info!("Host done");
6101            self.project = Some(project);
6102            (self, cx, result.err())
6103        }
6104
6105        pub async fn simulate_guest(
6106            mut self,
6107            guest_username: String,
6108            project: ModelHandle<Project>,
6109            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6110            rng: Arc<Mutex<StdRng>>,
6111            mut cx: TestAppContext,
6112        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6113            async fn simulate_guest_internal(
6114                client: &mut TestClient,
6115                guest_username: &str,
6116                project: ModelHandle<Project>,
6117                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6118                rng: Arc<Mutex<StdRng>>,
6119                cx: &mut TestAppContext,
6120            ) -> anyhow::Result<()> {
6121                while op_start_signal.next().await.is_some() {
6122                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6123                        let worktree = if let Some(worktree) =
6124                            project.read_with(cx, |project, cx| {
6125                                project
6126                                    .worktrees(&cx)
6127                                    .filter(|worktree| {
6128                                        let worktree = worktree.read(cx);
6129                                        worktree.is_visible()
6130                                            && worktree.entries(false).any(|e| e.is_file())
6131                                    })
6132                                    .choose(&mut *rng.lock())
6133                            }) {
6134                            worktree
6135                        } else {
6136                            cx.background().simulate_random_delay().await;
6137                            continue;
6138                        };
6139
6140                        let (worktree_root_name, project_path) =
6141                            worktree.read_with(cx, |worktree, _| {
6142                                let entry = worktree
6143                                    .entries(false)
6144                                    .filter(|e| e.is_file())
6145                                    .choose(&mut *rng.lock())
6146                                    .unwrap();
6147                                (
6148                                    worktree.root_name().to_string(),
6149                                    (worktree.id(), entry.path.clone()),
6150                                )
6151                            });
6152                        log::info!(
6153                            "{}: opening path {:?} in worktree {} ({})",
6154                            guest_username,
6155                            project_path.1,
6156                            project_path.0,
6157                            worktree_root_name,
6158                        );
6159                        let buffer = project
6160                            .update(cx, |project, cx| {
6161                                project.open_buffer(project_path.clone(), cx)
6162                            })
6163                            .await?;
6164                        log::info!(
6165                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6166                            guest_username,
6167                            project_path.1,
6168                            project_path.0,
6169                            worktree_root_name,
6170                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6171                        );
6172                        client.buffers.insert(buffer.clone());
6173                        buffer
6174                    } else {
6175                        client
6176                            .buffers
6177                            .iter()
6178                            .choose(&mut *rng.lock())
6179                            .unwrap()
6180                            .clone()
6181                    };
6182
6183                    let choice = rng.lock().gen_range(0..100);
6184                    match choice {
6185                        0..=9 => {
6186                            cx.update(|cx| {
6187                                log::info!(
6188                                    "{}: dropping buffer {:?}",
6189                                    guest_username,
6190                                    buffer.read(cx).file().unwrap().full_path(cx)
6191                                );
6192                                client.buffers.remove(&buffer);
6193                                drop(buffer);
6194                            });
6195                        }
6196                        10..=19 => {
6197                            let completions = project.update(cx, |project, cx| {
6198                                log::info!(
6199                                    "{}: requesting completions for buffer {} ({:?})",
6200                                    guest_username,
6201                                    buffer.read(cx).remote_id(),
6202                                    buffer.read(cx).file().unwrap().full_path(cx)
6203                                );
6204                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6205                                project.completions(&buffer, offset, cx)
6206                            });
6207                            let completions = cx.background().spawn(async move {
6208                                completions
6209                                    .await
6210                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6211                            });
6212                            if rng.lock().gen_bool(0.3) {
6213                                log::info!("{}: detaching completions request", guest_username);
6214                                cx.update(|cx| completions.detach_and_log_err(cx));
6215                            } else {
6216                                completions.await?;
6217                            }
6218                        }
6219                        20..=29 => {
6220                            let code_actions = project.update(cx, |project, cx| {
6221                                log::info!(
6222                                    "{}: requesting code actions for buffer {} ({:?})",
6223                                    guest_username,
6224                                    buffer.read(cx).remote_id(),
6225                                    buffer.read(cx).file().unwrap().full_path(cx)
6226                                );
6227                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6228                                project.code_actions(&buffer, range, cx)
6229                            });
6230                            let code_actions = cx.background().spawn(async move {
6231                                code_actions.await.map_err(|err| {
6232                                    anyhow!("code actions request failed: {:?}", err)
6233                                })
6234                            });
6235                            if rng.lock().gen_bool(0.3) {
6236                                log::info!("{}: detaching code actions request", guest_username);
6237                                cx.update(|cx| code_actions.detach_and_log_err(cx));
6238                            } else {
6239                                code_actions.await?;
6240                            }
6241                        }
6242                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6243                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6244                                log::info!(
6245                                    "{}: saving buffer {} ({:?})",
6246                                    guest_username,
6247                                    buffer.remote_id(),
6248                                    buffer.file().unwrap().full_path(cx)
6249                                );
6250                                (buffer.version(), buffer.save(cx))
6251                            });
6252                            let save = cx.background().spawn(async move {
6253                                let (saved_version, _) = save
6254                                    .await
6255                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6256                                assert!(saved_version.observed_all(&requested_version));
6257                                Ok::<_, anyhow::Error>(())
6258                            });
6259                            if rng.lock().gen_bool(0.3) {
6260                                log::info!("{}: detaching save request", guest_username);
6261                                cx.update(|cx| save.detach_and_log_err(cx));
6262                            } else {
6263                                save.await?;
6264                            }
6265                        }
6266                        40..=44 => {
6267                            let prepare_rename = project.update(cx, |project, cx| {
6268                                log::info!(
6269                                    "{}: preparing rename for buffer {} ({:?})",
6270                                    guest_username,
6271                                    buffer.read(cx).remote_id(),
6272                                    buffer.read(cx).file().unwrap().full_path(cx)
6273                                );
6274                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6275                                project.prepare_rename(buffer, offset, cx)
6276                            });
6277                            let prepare_rename = cx.background().spawn(async move {
6278                                prepare_rename.await.map_err(|err| {
6279                                    anyhow!("prepare rename request failed: {:?}", err)
6280                                })
6281                            });
6282                            if rng.lock().gen_bool(0.3) {
6283                                log::info!("{}: detaching prepare rename request", guest_username);
6284                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6285                            } else {
6286                                prepare_rename.await?;
6287                            }
6288                        }
6289                        45..=49 => {
6290                            let definitions = project.update(cx, |project, cx| {
6291                                log::info!(
6292                                    "{}: requesting definitions for buffer {} ({:?})",
6293                                    guest_username,
6294                                    buffer.read(cx).remote_id(),
6295                                    buffer.read(cx).file().unwrap().full_path(cx)
6296                                );
6297                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6298                                project.definition(&buffer, offset, cx)
6299                            });
6300                            let definitions = cx.background().spawn(async move {
6301                                definitions
6302                                    .await
6303                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6304                            });
6305                            if rng.lock().gen_bool(0.3) {
6306                                log::info!("{}: detaching definitions request", guest_username);
6307                                cx.update(|cx| definitions.detach_and_log_err(cx));
6308                            } else {
6309                                client
6310                                    .buffers
6311                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6312                            }
6313                        }
6314                        50..=54 => {
6315                            let highlights = project.update(cx, |project, cx| {
6316                                log::info!(
6317                                    "{}: requesting highlights for buffer {} ({:?})",
6318                                    guest_username,
6319                                    buffer.read(cx).remote_id(),
6320                                    buffer.read(cx).file().unwrap().full_path(cx)
6321                                );
6322                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6323                                project.document_highlights(&buffer, offset, cx)
6324                            });
6325                            let highlights = cx.background().spawn(async move {
6326                                highlights
6327                                    .await
6328                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6329                            });
6330                            if rng.lock().gen_bool(0.3) {
6331                                log::info!("{}: detaching highlights request", guest_username);
6332                                cx.update(|cx| highlights.detach_and_log_err(cx));
6333                            } else {
6334                                highlights.await?;
6335                            }
6336                        }
6337                        55..=59 => {
6338                            let search = project.update(cx, |project, cx| {
6339                                let query = rng.lock().gen_range('a'..='z');
6340                                log::info!("{}: project-wide search {:?}", guest_username, query);
6341                                project.search(SearchQuery::text(query, false, false), cx)
6342                            });
6343                            let search = cx.background().spawn(async move {
6344                                search
6345                                    .await
6346                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
6347                            });
6348                            if rng.lock().gen_bool(0.3) {
6349                                log::info!("{}: detaching search request", guest_username);
6350                                cx.update(|cx| search.detach_and_log_err(cx));
6351                            } else {
6352                                client.buffers.extend(search.await?.into_keys());
6353                            }
6354                        }
6355                        _ => {
6356                            buffer.update(cx, |buffer, cx| {
6357                                log::info!(
6358                                    "{}: updating buffer {} ({:?})",
6359                                    guest_username,
6360                                    buffer.remote_id(),
6361                                    buffer.file().unwrap().full_path(cx)
6362                                );
6363                                if rng.lock().gen_bool(0.7) {
6364                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6365                                } else {
6366                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6367                                }
6368                            });
6369                        }
6370                    }
6371                    cx.background().simulate_random_delay().await;
6372                }
6373                Ok(())
6374            }
6375
6376            let result = simulate_guest_internal(
6377                &mut self,
6378                &guest_username,
6379                project.clone(),
6380                op_start_signal,
6381                rng,
6382                &mut cx,
6383            )
6384            .await;
6385            log::info!("{}: done", guest_username);
6386
6387            self.project = Some(project);
6388            (self, cx, result.err())
6389        }
6390    }
6391
6392    impl Drop for TestClient {
6393        fn drop(&mut self) {
6394            self.client.tear_down();
6395        }
6396    }
6397
6398    impl Executor for Arc<gpui::executor::Background> {
6399        type Sleep = gpui::executor::Timer;
6400
6401        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6402            self.spawn(future).detach();
6403        }
6404
6405        fn sleep(&self, duration: Duration) -> Self::Sleep {
6406            self.as_ref().timer(duration)
6407        }
6408    }
6409
6410    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6411        channel
6412            .messages()
6413            .cursor::<()>()
6414            .map(|m| {
6415                (
6416                    m.sender.github_login.clone(),
6417                    m.body.clone(),
6418                    m.is_pending(),
6419                )
6420            })
6421            .collect()
6422    }
6423
6424    struct EmptyView;
6425
6426    impl gpui::Entity for EmptyView {
6427        type Event = ();
6428    }
6429
6430    impl gpui::View for EmptyView {
6431        fn ui_name() -> &'static str {
6432            "empty view"
6433        }
6434
6435        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6436            gpui::Element::boxed(gpui::elements::Empty)
6437        }
6438    }
6439}