rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::{IntoResponse, Response},
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::{HashMap, HashSet};
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::Arc,
  40    time::Duration,
  41};
  42use store::{Store, Worktree};
  43use time::OffsetDateTime;
  44use tokio::{
  45    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  46    time::Sleep,
  47};
  48use tower::ServiceBuilder;
  49use tracing::{info_span, instrument, Instrument};
  50
  51type MessageHandler =
  52    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  53
  54pub struct Server {
  55    peer: Arc<Peer>,
  56    store: RwLock<Store>,
  57    app_state: Arc<AppState>,
  58    handlers: HashMap<TypeId, MessageHandler>,
  59    notifications: Option<mpsc::UnboundedSender<()>>,
  60}
  61
  62pub trait Executor: Send + Clone {
  63    type Sleep: Send + Future;
  64    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  65    fn sleep(&self, duration: Duration) -> Self::Sleep;
  66}
  67
  68#[derive(Clone)]
  69pub struct RealExecutor;
  70
  71const MESSAGE_COUNT_PER_PAGE: usize = 100;
  72const MAX_MESSAGE_LEN: usize = 1024;
  73
  74struct StoreReadGuard<'a> {
  75    guard: RwLockReadGuard<'a, Store>,
  76    _not_send: PhantomData<Rc<()>>,
  77}
  78
  79struct StoreWriteGuard<'a> {
  80    guard: RwLockWriteGuard<'a, Store>,
  81    _not_send: PhantomData<Rc<()>>,
  82}
  83
  84impl Server {
  85    pub fn new(
  86        app_state: Arc<AppState>,
  87        notifications: Option<mpsc::UnboundedSender<()>>,
  88    ) -> Arc<Self> {
  89        let mut server = Self {
  90            peer: Peer::new(),
  91            app_state,
  92            store: Default::default(),
  93            handlers: Default::default(),
  94            notifications,
  95        };
  96
  97        server
  98            .add_request_handler(Server::ping)
  99            .add_request_handler(Server::register_project)
 100            .add_message_handler(Server::unregister_project)
 101            .add_request_handler(Server::share_project)
 102            .add_message_handler(Server::unshare_project)
 103            .add_sync_request_handler(Server::join_project)
 104            .add_message_handler(Server::leave_project)
 105            .add_request_handler(Server::register_worktree)
 106            .add_message_handler(Server::unregister_worktree)
 107            .add_request_handler(Server::update_worktree)
 108            .add_message_handler(Server::start_language_server)
 109            .add_message_handler(Server::update_language_server)
 110            .add_message_handler(Server::update_diagnostic_summary)
 111            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 112            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 113            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 114            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 115            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 116            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 117            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 118            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 119            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 120            .add_request_handler(
 121                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 122            )
 123            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 124            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 125            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 126            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 127            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 128            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 129            .add_request_handler(Server::update_buffer)
 130            .add_message_handler(Server::update_buffer_file)
 131            .add_message_handler(Server::buffer_reloaded)
 132            .add_message_handler(Server::buffer_saved)
 133            .add_request_handler(Server::save_buffer)
 134            .add_request_handler(Server::get_channels)
 135            .add_request_handler(Server::get_users)
 136            .add_request_handler(Server::join_channel)
 137            .add_message_handler(Server::leave_channel)
 138            .add_request_handler(Server::send_channel_message)
 139            .add_request_handler(Server::follow)
 140            .add_message_handler(Server::unfollow)
 141            .add_message_handler(Server::update_followers)
 142            .add_request_handler(Server::get_channel_messages);
 143
 144        Arc::new(server)
 145    }
 146
 147    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 148    where
 149        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 150        Fut: 'static + Send + Future<Output = Result<()>>,
 151        M: EnvelopedMessage,
 152    {
 153        let prev_handler = self.handlers.insert(
 154            TypeId::of::<M>(),
 155            Box::new(move |server, envelope| {
 156                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 157                let span = info_span!(
 158                    "handle message",
 159                    payload_type = envelope.payload_type_name()
 160                );
 161                let future = (handler)(server, *envelope);
 162                async move {
 163                    if let Err(error) = future.await {
 164                        tracing::error!(%error, "error handling message");
 165                    }
 166                }
 167                .instrument(span)
 168                .boxed()
 169            }),
 170        );
 171        if prev_handler.is_some() {
 172            panic!("registered a handler for the same message twice");
 173        }
 174        self
 175    }
 176
 177    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 178    where
 179        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 180        Fut: 'static + Send + Future<Output = Result<M::Response>>,
 181        M: RequestMessage,
 182    {
 183        self.add_message_handler(move |server, envelope| {
 184            let receipt = envelope.receipt();
 185            let response = (handler)(server.clone(), envelope);
 186            async move {
 187                match response.await {
 188                    Ok(response) => {
 189                        server.peer.respond(receipt, response)?;
 190                        Ok(())
 191                    }
 192                    Err(error) => {
 193                        server.peer.respond_with_error(
 194                            receipt,
 195                            proto::Error {
 196                                message: error.to_string(),
 197                            },
 198                        )?;
 199                        Err(error)
 200                    }
 201                }
 202            }
 203        })
 204    }
 205
 206    /// Handle a request while holding a lock to the store. This is useful when we're registering
 207    /// a connection but we want to respond on the connection before anybody else can send on it.
 208    fn add_sync_request_handler<F, M>(&mut self, handler: F) -> &mut Self
 209    where
 210        F: 'static
 211            + Send
 212            + Sync
 213            + Fn(Arc<Self>, &mut Store, TypedEnvelope<M>) -> Result<M::Response>,
 214        M: RequestMessage,
 215    {
 216        let handler = Arc::new(handler);
 217        self.add_message_handler(move |server, envelope| {
 218            let receipt = envelope.receipt();
 219            let handler = handler.clone();
 220            async move {
 221                let mut store = server.state_mut().await;
 222                let response = (handler)(server.clone(), &mut *store, envelope);
 223                match response {
 224                    Ok(response) => {
 225                        server.peer.respond(receipt, response)?;
 226                        Ok(())
 227                    }
 228                    Err(error) => {
 229                        server.peer.respond_with_error(
 230                            receipt,
 231                            proto::Error {
 232                                message: error.to_string(),
 233                            },
 234                        )?;
 235                        Err(error)
 236                    }
 237                }
 238            }
 239        })
 240    }
 241
 242    pub fn handle_connection<E: Executor>(
 243        self: &Arc<Self>,
 244        connection: Connection,
 245        address: String,
 246        user_id: UserId,
 247        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 248        executor: E,
 249    ) -> impl Future<Output = ()> {
 250        let mut this = self.clone();
 251        let span = info_span!("handle connection", %user_id, %address);
 252        async move {
 253            let (connection_id, handle_io, mut incoming_rx) = this
 254                .peer
 255                .add_connection(connection, {
 256                    let executor = executor.clone();
 257                    move |duration| {
 258                        let timer = executor.sleep(duration);
 259                        async move {
 260                            timer.await;
 261                        }
 262                    }
 263                })
 264                .await;
 265
 266            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 267
 268            if let Some(send_connection_id) = send_connection_id.as_mut() {
 269                let _ = send_connection_id.send(connection_id).await;
 270            }
 271
 272            {
 273                let mut state = this.state_mut().await;
 274                state.add_connection(connection_id, user_id);
 275                this.update_contacts_for_users(&*state, &[user_id]);
 276            }
 277
 278            let handle_io = handle_io.fuse();
 279            futures::pin_mut!(handle_io);
 280            loop {
 281                let next_message = incoming_rx.next().fuse();
 282                futures::pin_mut!(next_message);
 283                futures::select_biased! {
 284                    result = handle_io => {
 285                        if let Err(error) = result {
 286                            tracing::error!(%error, "error handling I/O");
 287                        }
 288                        break;
 289                    }
 290                    message = next_message => {
 291                        if let Some(message) = message {
 292                            let type_name = message.payload_type_name();
 293                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 294                            async {
 295                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 296                                    let notifications = this.notifications.clone();
 297                                    let is_background = message.is_background();
 298                                    let handle_message = (handler)(this.clone(), message);
 299                                    let handle_message = async move {
 300                                        handle_message.await;
 301                                        if let Some(mut notifications) = notifications {
 302                                            let _ = notifications.send(()).await;
 303                                        }
 304                                    };
 305                                    if is_background {
 306                                        executor.spawn_detached(handle_message);
 307                                    } else {
 308                                        handle_message.await;
 309                                    }
 310                                } else {
 311                                    tracing::error!("no message handler");
 312                                }
 313                            }.instrument(span).await;
 314                        } else {
 315                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 316                            break;
 317                        }
 318                    }
 319                }
 320            }
 321
 322            if let Err(error) = this.sign_out(connection_id).await {
 323                tracing::error!(%error, "error signing out");
 324            }
 325        }.instrument(span)
 326    }
 327
 328    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 329        self.peer.disconnect(connection_id);
 330        let mut state = self.state_mut().await;
 331        let removed_connection = state.remove_connection(connection_id)?;
 332
 333        for (project_id, project) in removed_connection.hosted_projects {
 334            if let Some(share) = project.share {
 335                broadcast(
 336                    connection_id,
 337                    share.guests.keys().copied().collect(),
 338                    |conn_id| {
 339                        self.peer
 340                            .send(conn_id, proto::UnshareProject { project_id })
 341                    },
 342                );
 343            }
 344        }
 345
 346        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 347            broadcast(connection_id, peer_ids, |conn_id| {
 348                self.peer.send(
 349                    conn_id,
 350                    proto::RemoveProjectCollaborator {
 351                        project_id,
 352                        peer_id: connection_id.0,
 353                    },
 354                )
 355            });
 356        }
 357
 358        self.update_contacts_for_users(&*state, removed_connection.contact_ids.iter());
 359        Ok(())
 360    }
 361
 362    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> Result<proto::Ack> {
 363        Ok(proto::Ack {})
 364    }
 365
 366    async fn register_project(
 367        self: Arc<Server>,
 368        request: TypedEnvelope<proto::RegisterProject>,
 369    ) -> Result<proto::RegisterProjectResponse> {
 370        let project_id = {
 371            let mut state = self.state_mut().await;
 372            let user_id = state.user_id_for_connection(request.sender_id)?;
 373            state.register_project(request.sender_id, user_id)
 374        };
 375        Ok(proto::RegisterProjectResponse { project_id })
 376    }
 377
 378    async fn unregister_project(
 379        self: Arc<Server>,
 380        request: TypedEnvelope<proto::UnregisterProject>,
 381    ) -> Result<()> {
 382        let mut state = self.state_mut().await;
 383        let project = state.unregister_project(request.payload.project_id, request.sender_id)?;
 384        self.update_contacts_for_users(&*state, &project.authorized_user_ids());
 385        Ok(())
 386    }
 387
 388    async fn share_project(
 389        self: Arc<Server>,
 390        request: TypedEnvelope<proto::ShareProject>,
 391    ) -> Result<proto::Ack> {
 392        let mut state = self.state_mut().await;
 393        let project = state.share_project(request.payload.project_id, request.sender_id)?;
 394        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 395        Ok(proto::Ack {})
 396    }
 397
 398    async fn unshare_project(
 399        self: Arc<Server>,
 400        request: TypedEnvelope<proto::UnshareProject>,
 401    ) -> Result<()> {
 402        let project_id = request.payload.project_id;
 403        let mut state = self.state_mut().await;
 404        let project = state.unshare_project(project_id, request.sender_id)?;
 405        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 406            self.peer
 407                .send(conn_id, proto::UnshareProject { project_id })
 408        });
 409        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 410        Ok(())
 411    }
 412
 413    fn join_project(
 414        self: Arc<Server>,
 415        state: &mut Store,
 416        request: TypedEnvelope<proto::JoinProject>,
 417    ) -> Result<proto::JoinProjectResponse> {
 418        let project_id = request.payload.project_id;
 419
 420        let user_id = state.user_id_for_connection(request.sender_id)?;
 421        let (response, connection_ids, contact_user_ids) = state
 422            .join_project(request.sender_id, user_id, project_id)
 423            .and_then(|joined| {
 424                let share = joined.project.share()?;
 425                let peer_count = share.guests.len();
 426                let mut collaborators = Vec::with_capacity(peer_count);
 427                collaborators.push(proto::Collaborator {
 428                    peer_id: joined.project.host_connection_id.0,
 429                    replica_id: 0,
 430                    user_id: joined.project.host_user_id.to_proto(),
 431                });
 432                let worktrees = share
 433                    .worktrees
 434                    .iter()
 435                    .filter_map(|(id, shared_worktree)| {
 436                        let worktree = joined.project.worktrees.get(&id)?;
 437                        Some(proto::Worktree {
 438                            id: *id,
 439                            root_name: worktree.root_name.clone(),
 440                            entries: shared_worktree.entries.values().cloned().collect(),
 441                            diagnostic_summaries: shared_worktree
 442                                .diagnostic_summaries
 443                                .values()
 444                                .cloned()
 445                                .collect(),
 446                            visible: worktree.visible,
 447                        })
 448                    })
 449                    .collect();
 450                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 451                    if *peer_conn_id != request.sender_id {
 452                        collaborators.push(proto::Collaborator {
 453                            peer_id: peer_conn_id.0,
 454                            replica_id: *peer_replica_id as u32,
 455                            user_id: peer_user_id.to_proto(),
 456                        });
 457                    }
 458                }
 459                let response = proto::JoinProjectResponse {
 460                    worktrees,
 461                    replica_id: joined.replica_id as u32,
 462                    collaborators,
 463                    language_servers: joined.project.language_servers.clone(),
 464                };
 465                let connection_ids = joined.project.connection_ids();
 466                let contact_user_ids = joined.project.authorized_user_ids();
 467                Ok((response, connection_ids, contact_user_ids))
 468            })?;
 469
 470        broadcast(request.sender_id, connection_ids, |conn_id| {
 471            self.peer.send(
 472                conn_id,
 473                proto::AddProjectCollaborator {
 474                    project_id,
 475                    collaborator: Some(proto::Collaborator {
 476                        peer_id: request.sender_id.0,
 477                        replica_id: response.replica_id,
 478                        user_id: user_id.to_proto(),
 479                    }),
 480                },
 481            )
 482        });
 483        self.update_contacts_for_users(state, &contact_user_ids);
 484        Ok(response)
 485    }
 486
 487    async fn leave_project(
 488        self: Arc<Server>,
 489        request: TypedEnvelope<proto::LeaveProject>,
 490    ) -> Result<()> {
 491        let sender_id = request.sender_id;
 492        let project_id = request.payload.project_id;
 493        let mut state = self.state_mut().await;
 494        let worktree = state.leave_project(sender_id, project_id)?;
 495        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 496            self.peer.send(
 497                conn_id,
 498                proto::RemoveProjectCollaborator {
 499                    project_id,
 500                    peer_id: sender_id.0,
 501                },
 502            )
 503        });
 504        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 505        Ok(())
 506    }
 507
 508    async fn register_worktree(
 509        self: Arc<Server>,
 510        request: TypedEnvelope<proto::RegisterWorktree>,
 511    ) -> Result<proto::Ack> {
 512        let mut contact_user_ids = HashSet::default();
 513        for github_login in &request.payload.authorized_logins {
 514            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 515            contact_user_ids.insert(contact_user_id);
 516        }
 517
 518        let mut state = self.state_mut().await;
 519        let host_user_id = state.user_id_for_connection(request.sender_id)?;
 520        contact_user_ids.insert(host_user_id);
 521
 522        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 523        let guest_connection_ids = state
 524            .read_project(request.payload.project_id, request.sender_id)?
 525            .guest_connection_ids();
 526        state.register_worktree(
 527            request.payload.project_id,
 528            request.payload.worktree_id,
 529            request.sender_id,
 530            Worktree {
 531                authorized_user_ids: contact_user_ids.clone(),
 532                root_name: request.payload.root_name.clone(),
 533                visible: request.payload.visible,
 534            },
 535        )?;
 536
 537        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 538            self.peer
 539                .forward_send(request.sender_id, connection_id, request.payload.clone())
 540        });
 541        self.update_contacts_for_users(&*state, &contact_user_ids);
 542        Ok(proto::Ack {})
 543    }
 544
 545    async fn unregister_worktree(
 546        self: Arc<Server>,
 547        request: TypedEnvelope<proto::UnregisterWorktree>,
 548    ) -> Result<()> {
 549        let project_id = request.payload.project_id;
 550        let worktree_id = request.payload.worktree_id;
 551        let mut state = self.state_mut().await;
 552        let (worktree, guest_connection_ids) =
 553            state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 554        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 555            self.peer.send(
 556                conn_id,
 557                proto::UnregisterWorktree {
 558                    project_id,
 559                    worktree_id,
 560                },
 561            )
 562        });
 563        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 564        Ok(())
 565    }
 566
 567    async fn update_worktree(
 568        self: Arc<Server>,
 569        request: TypedEnvelope<proto::UpdateWorktree>,
 570    ) -> Result<proto::Ack> {
 571        let connection_ids = self.state_mut().await.update_worktree(
 572            request.sender_id,
 573            request.payload.project_id,
 574            request.payload.worktree_id,
 575            &request.payload.removed_entries,
 576            &request.payload.updated_entries,
 577        )?;
 578
 579        broadcast(request.sender_id, connection_ids, |connection_id| {
 580            self.peer
 581                .forward_send(request.sender_id, connection_id, request.payload.clone())
 582        });
 583
 584        Ok(proto::Ack {})
 585    }
 586
 587    async fn update_diagnostic_summary(
 588        self: Arc<Server>,
 589        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 590    ) -> Result<()> {
 591        let summary = request
 592            .payload
 593            .summary
 594            .clone()
 595            .ok_or_else(|| anyhow!("invalid summary"))?;
 596        let receiver_ids = self.state_mut().await.update_diagnostic_summary(
 597            request.payload.project_id,
 598            request.payload.worktree_id,
 599            request.sender_id,
 600            summary,
 601        )?;
 602
 603        broadcast(request.sender_id, receiver_ids, |connection_id| {
 604            self.peer
 605                .forward_send(request.sender_id, connection_id, request.payload.clone())
 606        });
 607        Ok(())
 608    }
 609
 610    async fn start_language_server(
 611        self: Arc<Server>,
 612        request: TypedEnvelope<proto::StartLanguageServer>,
 613    ) -> Result<()> {
 614        let receiver_ids = self.state_mut().await.start_language_server(
 615            request.payload.project_id,
 616            request.sender_id,
 617            request
 618                .payload
 619                .server
 620                .clone()
 621                .ok_or_else(|| anyhow!("invalid language server"))?,
 622        )?;
 623        broadcast(request.sender_id, receiver_ids, |connection_id| {
 624            self.peer
 625                .forward_send(request.sender_id, connection_id, request.payload.clone())
 626        });
 627        Ok(())
 628    }
 629
 630    async fn update_language_server(
 631        self: Arc<Server>,
 632        request: TypedEnvelope<proto::UpdateLanguageServer>,
 633    ) -> Result<()> {
 634        let receiver_ids = self
 635            .state()
 636            .await
 637            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 638        broadcast(request.sender_id, receiver_ids, |connection_id| {
 639            self.peer
 640                .forward_send(request.sender_id, connection_id, request.payload.clone())
 641        });
 642        Ok(())
 643    }
 644
 645    async fn forward_project_request<T>(
 646        self: Arc<Server>,
 647        request: TypedEnvelope<T>,
 648    ) -> Result<T::Response>
 649    where
 650        T: EntityMessage + RequestMessage,
 651    {
 652        let host_connection_id = self
 653            .state()
 654            .await
 655            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 656            .host_connection_id;
 657        Ok(self
 658            .peer
 659            .forward_request(request.sender_id, host_connection_id, request.payload)
 660            .await?)
 661    }
 662
 663    async fn save_buffer(
 664        self: Arc<Server>,
 665        request: TypedEnvelope<proto::SaveBuffer>,
 666    ) -> Result<proto::BufferSaved> {
 667        let host = self
 668            .state()
 669            .await
 670            .read_project(request.payload.project_id, request.sender_id)?
 671            .host_connection_id;
 672        let response = self
 673            .peer
 674            .forward_request(request.sender_id, host, request.payload.clone())
 675            .await?;
 676
 677        let mut guests = self
 678            .state()
 679            .await
 680            .read_project(request.payload.project_id, request.sender_id)?
 681            .connection_ids();
 682        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 683        broadcast(host, guests, |conn_id| {
 684            self.peer.forward_send(host, conn_id, response.clone())
 685        });
 686
 687        Ok(response)
 688    }
 689
 690    async fn update_buffer(
 691        self: Arc<Server>,
 692        request: TypedEnvelope<proto::UpdateBuffer>,
 693    ) -> Result<proto::Ack> {
 694        let receiver_ids = self
 695            .state()
 696            .await
 697            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 698        broadcast(request.sender_id, receiver_ids, |connection_id| {
 699            self.peer
 700                .forward_send(request.sender_id, connection_id, request.payload.clone())
 701        });
 702        Ok(proto::Ack {})
 703    }
 704
 705    async fn update_buffer_file(
 706        self: Arc<Server>,
 707        request: TypedEnvelope<proto::UpdateBufferFile>,
 708    ) -> Result<()> {
 709        let receiver_ids = self
 710            .state()
 711            .await
 712            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 713        broadcast(request.sender_id, receiver_ids, |connection_id| {
 714            self.peer
 715                .forward_send(request.sender_id, connection_id, request.payload.clone())
 716        });
 717        Ok(())
 718    }
 719
 720    async fn buffer_reloaded(
 721        self: Arc<Server>,
 722        request: TypedEnvelope<proto::BufferReloaded>,
 723    ) -> Result<()> {
 724        let receiver_ids = self
 725            .state()
 726            .await
 727            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 728        broadcast(request.sender_id, receiver_ids, |connection_id| {
 729            self.peer
 730                .forward_send(request.sender_id, connection_id, request.payload.clone())
 731        });
 732        Ok(())
 733    }
 734
 735    async fn buffer_saved(
 736        self: Arc<Server>,
 737        request: TypedEnvelope<proto::BufferSaved>,
 738    ) -> Result<()> {
 739        let receiver_ids = self
 740            .state()
 741            .await
 742            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 743        broadcast(request.sender_id, receiver_ids, |connection_id| {
 744            self.peer
 745                .forward_send(request.sender_id, connection_id, request.payload.clone())
 746        });
 747        Ok(())
 748    }
 749
 750    async fn follow(
 751        self: Arc<Self>,
 752        request: TypedEnvelope<proto::Follow>,
 753    ) -> Result<proto::FollowResponse> {
 754        let leader_id = ConnectionId(request.payload.leader_id);
 755        let follower_id = request.sender_id;
 756        if !self
 757            .state()
 758            .await
 759            .project_connection_ids(request.payload.project_id, follower_id)?
 760            .contains(&leader_id)
 761        {
 762            Err(anyhow!("no such peer"))?;
 763        }
 764        let mut response = self
 765            .peer
 766            .forward_request(request.sender_id, leader_id, request.payload)
 767            .await?;
 768        response
 769            .views
 770            .retain(|view| view.leader_id != Some(follower_id.0));
 771        Ok(response)
 772    }
 773
 774    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 775        let leader_id = ConnectionId(request.payload.leader_id);
 776        if !self
 777            .state()
 778            .await
 779            .project_connection_ids(request.payload.project_id, request.sender_id)?
 780            .contains(&leader_id)
 781        {
 782            Err(anyhow!("no such peer"))?;
 783        }
 784        self.peer
 785            .forward_send(request.sender_id, leader_id, request.payload)?;
 786        Ok(())
 787    }
 788
 789    async fn update_followers(
 790        self: Arc<Self>,
 791        request: TypedEnvelope<proto::UpdateFollowers>,
 792    ) -> Result<()> {
 793        let connection_ids = self
 794            .state()
 795            .await
 796            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 797        let leader_id = request
 798            .payload
 799            .variant
 800            .as_ref()
 801            .and_then(|variant| match variant {
 802                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 803                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 804                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 805            });
 806        for follower_id in &request.payload.follower_ids {
 807            let follower_id = ConnectionId(*follower_id);
 808            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 809                self.peer
 810                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 811            }
 812        }
 813        Ok(())
 814    }
 815
 816    async fn get_channels(
 817        self: Arc<Server>,
 818        request: TypedEnvelope<proto::GetChannels>,
 819    ) -> Result<proto::GetChannelsResponse> {
 820        let user_id = self
 821            .state()
 822            .await
 823            .user_id_for_connection(request.sender_id)?;
 824        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 825        Ok(proto::GetChannelsResponse {
 826            channels: channels
 827                .into_iter()
 828                .map(|chan| proto::Channel {
 829                    id: chan.id.to_proto(),
 830                    name: chan.name,
 831                })
 832                .collect(),
 833        })
 834    }
 835
 836    async fn get_users(
 837        self: Arc<Server>,
 838        request: TypedEnvelope<proto::GetUsers>,
 839    ) -> Result<proto::GetUsersResponse> {
 840        let user_ids = request
 841            .payload
 842            .user_ids
 843            .into_iter()
 844            .map(UserId::from_proto)
 845            .collect();
 846        let users = self
 847            .app_state
 848            .db
 849            .get_users_by_ids(user_ids)
 850            .await?
 851            .into_iter()
 852            .map(|user| proto::User {
 853                id: user.id.to_proto(),
 854                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 855                github_login: user.github_login,
 856            })
 857            .collect();
 858        Ok(proto::GetUsersResponse { users })
 859    }
 860
 861    #[instrument(skip(self, state, user_ids))]
 862    fn update_contacts_for_users<'a>(
 863        self: &Arc<Self>,
 864        state: &Store,
 865        user_ids: impl IntoIterator<Item = &'a UserId>,
 866    ) {
 867        for user_id in user_ids {
 868            let contacts = state.contacts_for_user(*user_id);
 869            for connection_id in state.connection_ids_for_user(*user_id) {
 870                self.peer
 871                    .send(
 872                        connection_id,
 873                        proto::UpdateContacts {
 874                            contacts: contacts.clone(),
 875                        },
 876                    )
 877                    .trace_err();
 878            }
 879        }
 880    }
 881
 882    async fn join_channel(
 883        self: Arc<Self>,
 884        request: TypedEnvelope<proto::JoinChannel>,
 885    ) -> Result<proto::JoinChannelResponse> {
 886        let user_id = self
 887            .state()
 888            .await
 889            .user_id_for_connection(request.sender_id)?;
 890        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 891        if !self
 892            .app_state
 893            .db
 894            .can_user_access_channel(user_id, channel_id)
 895            .await?
 896        {
 897            Err(anyhow!("access denied"))?;
 898        }
 899
 900        self.state_mut()
 901            .await
 902            .join_channel(request.sender_id, channel_id);
 903        let messages = self
 904            .app_state
 905            .db
 906            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 907            .await?
 908            .into_iter()
 909            .map(|msg| proto::ChannelMessage {
 910                id: msg.id.to_proto(),
 911                body: msg.body,
 912                timestamp: msg.sent_at.unix_timestamp() as u64,
 913                sender_id: msg.sender_id.to_proto(),
 914                nonce: Some(msg.nonce.as_u128().into()),
 915            })
 916            .collect::<Vec<_>>();
 917        Ok(proto::JoinChannelResponse {
 918            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 919            messages,
 920        })
 921    }
 922
 923    async fn leave_channel(
 924        self: Arc<Self>,
 925        request: TypedEnvelope<proto::LeaveChannel>,
 926    ) -> Result<()> {
 927        let user_id = self
 928            .state()
 929            .await
 930            .user_id_for_connection(request.sender_id)?;
 931        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 932        if !self
 933            .app_state
 934            .db
 935            .can_user_access_channel(user_id, channel_id)
 936            .await?
 937        {
 938            Err(anyhow!("access denied"))?;
 939        }
 940
 941        self.state_mut()
 942            .await
 943            .leave_channel(request.sender_id, channel_id);
 944
 945        Ok(())
 946    }
 947
 948    async fn send_channel_message(
 949        self: Arc<Self>,
 950        request: TypedEnvelope<proto::SendChannelMessage>,
 951    ) -> Result<proto::SendChannelMessageResponse> {
 952        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 953        let user_id;
 954        let connection_ids;
 955        {
 956            let state = self.state().await;
 957            user_id = state.user_id_for_connection(request.sender_id)?;
 958            connection_ids = state.channel_connection_ids(channel_id)?;
 959        }
 960
 961        // Validate the message body.
 962        let body = request.payload.body.trim().to_string();
 963        if body.len() > MAX_MESSAGE_LEN {
 964            return Err(anyhow!("message is too long"))?;
 965        }
 966        if body.is_empty() {
 967            return Err(anyhow!("message can't be blank"))?;
 968        }
 969
 970        let timestamp = OffsetDateTime::now_utc();
 971        let nonce = request
 972            .payload
 973            .nonce
 974            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 975
 976        let message_id = self
 977            .app_state
 978            .db
 979            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 980            .await?
 981            .to_proto();
 982        let message = proto::ChannelMessage {
 983            sender_id: user_id.to_proto(),
 984            id: message_id,
 985            body,
 986            timestamp: timestamp.unix_timestamp() as u64,
 987            nonce: Some(nonce),
 988        };
 989        broadcast(request.sender_id, connection_ids, |conn_id| {
 990            self.peer.send(
 991                conn_id,
 992                proto::ChannelMessageSent {
 993                    channel_id: channel_id.to_proto(),
 994                    message: Some(message.clone()),
 995                },
 996            )
 997        });
 998        Ok(proto::SendChannelMessageResponse {
 999            message: Some(message),
1000        })
1001    }
1002
1003    async fn get_channel_messages(
1004        self: Arc<Self>,
1005        request: TypedEnvelope<proto::GetChannelMessages>,
1006    ) -> Result<proto::GetChannelMessagesResponse> {
1007        let user_id = self
1008            .state()
1009            .await
1010            .user_id_for_connection(request.sender_id)?;
1011        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1012        if !self
1013            .app_state
1014            .db
1015            .can_user_access_channel(user_id, channel_id)
1016            .await?
1017        {
1018            Err(anyhow!("access denied"))?;
1019        }
1020
1021        let messages = self
1022            .app_state
1023            .db
1024            .get_channel_messages(
1025                channel_id,
1026                MESSAGE_COUNT_PER_PAGE,
1027                Some(MessageId::from_proto(request.payload.before_message_id)),
1028            )
1029            .await?
1030            .into_iter()
1031            .map(|msg| proto::ChannelMessage {
1032                id: msg.id.to_proto(),
1033                body: msg.body,
1034                timestamp: msg.sent_at.unix_timestamp() as u64,
1035                sender_id: msg.sender_id.to_proto(),
1036                nonce: Some(msg.nonce.as_u128().into()),
1037            })
1038            .collect::<Vec<_>>();
1039
1040        Ok(proto::GetChannelMessagesResponse {
1041            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1042            messages,
1043        })
1044    }
1045
1046    async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1047        #[cfg(test)]
1048        tokio::task::yield_now().await;
1049        let guard = self.store.read().await;
1050        #[cfg(test)]
1051        tokio::task::yield_now().await;
1052        StoreReadGuard {
1053            guard,
1054            _not_send: PhantomData,
1055        }
1056    }
1057
1058    async fn state_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1059        #[cfg(test)]
1060        tokio::task::yield_now().await;
1061        let guard = self.store.write().await;
1062        #[cfg(test)]
1063        tokio::task::yield_now().await;
1064        StoreWriteGuard {
1065            guard,
1066            _not_send: PhantomData,
1067        }
1068    }
1069}
1070
1071impl<'a> Deref for StoreReadGuard<'a> {
1072    type Target = Store;
1073
1074    fn deref(&self) -> &Self::Target {
1075        &*self.guard
1076    }
1077}
1078
1079impl<'a> Deref for StoreWriteGuard<'a> {
1080    type Target = Store;
1081
1082    fn deref(&self) -> &Self::Target {
1083        &*self.guard
1084    }
1085}
1086
1087impl<'a> DerefMut for StoreWriteGuard<'a> {
1088    fn deref_mut(&mut self) -> &mut Self::Target {
1089        &mut *self.guard
1090    }
1091}
1092
1093impl<'a> Drop for StoreWriteGuard<'a> {
1094    fn drop(&mut self) {
1095        #[cfg(test)]
1096        self.check_invariants();
1097
1098        let metrics = self.metrics();
1099        tracing::info!(
1100            connections = metrics.connections,
1101            registered_projects = metrics.registered_projects,
1102            shared_projects = metrics.shared_projects,
1103            "metrics"
1104        );
1105    }
1106}
1107
1108impl Executor for RealExecutor {
1109    type Sleep = Sleep;
1110
1111    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1112        tokio::task::spawn(future);
1113    }
1114
1115    fn sleep(&self, duration: Duration) -> Self::Sleep {
1116        tokio::time::sleep(duration)
1117    }
1118}
1119
1120#[instrument(skip(f))]
1121fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1122where
1123    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1124{
1125    for receiver_id in receiver_ids {
1126        if receiver_id != sender_id {
1127            f(receiver_id).trace_err();
1128        }
1129    }
1130}
1131
1132lazy_static! {
1133    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1134}
1135
1136pub struct ProtocolVersion(u32);
1137
1138impl Header for ProtocolVersion {
1139    fn name() -> &'static HeaderName {
1140        &ZED_PROTOCOL_VERSION
1141    }
1142
1143    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1144    where
1145        Self: Sized,
1146        I: Iterator<Item = &'i axum::http::HeaderValue>,
1147    {
1148        let version = values
1149            .next()
1150            .ok_or_else(|| axum::headers::Error::invalid())?
1151            .to_str()
1152            .map_err(|_| axum::headers::Error::invalid())?
1153            .parse()
1154            .map_err(|_| axum::headers::Error::invalid())?;
1155        Ok(Self(version))
1156    }
1157
1158    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1159        values.extend([self.0.to_string().parse().unwrap()]);
1160    }
1161}
1162
1163pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1164    let server = Server::new(app_state.clone(), None);
1165    Router::new()
1166        .route("/rpc", get(handle_websocket_request))
1167        .layer(
1168            ServiceBuilder::new()
1169                .layer(Extension(app_state))
1170                .layer(middleware::from_fn(auth::validate_header))
1171                .layer(Extension(server)),
1172        )
1173}
1174
1175pub async fn handle_websocket_request(
1176    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1177    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1178    Extension(server): Extension<Arc<Server>>,
1179    Extension(user_id): Extension<UserId>,
1180    ws: WebSocketUpgrade,
1181) -> Response {
1182    if protocol_version != rpc::PROTOCOL_VERSION {
1183        return (
1184            StatusCode::UPGRADE_REQUIRED,
1185            "client must be upgraded".to_string(),
1186        )
1187            .into_response();
1188    }
1189    let socket_address = socket_address.to_string();
1190    ws.on_upgrade(move |socket| {
1191        let socket = socket
1192            .map_ok(to_tungstenite_message)
1193            .err_into()
1194            .with(|message| async move { Ok(to_axum_message(message)) });
1195        let connection = Connection::new(Box::pin(socket));
1196        server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
1197    })
1198}
1199
1200fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1201    match message {
1202        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1203        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1204        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1205        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1206        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1207            code: frame.code.into(),
1208            reason: frame.reason,
1209        })),
1210    }
1211}
1212
1213fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1214    match message {
1215        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1216        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1217        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1218        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1219        AxumMessage::Close(frame) => {
1220            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1221                code: frame.code.into(),
1222                reason: frame.reason,
1223            }))
1224        }
1225    }
1226}
1227
1228pub trait ResultExt {
1229    type Ok;
1230
1231    fn trace_err(self) -> Option<Self::Ok>;
1232}
1233
1234impl<T, E> ResultExt for Result<T, E>
1235where
1236    E: std::fmt::Debug,
1237{
1238    type Ok = T;
1239
1240    fn trace_err(self) -> Option<T> {
1241        match self {
1242            Ok(value) => Some(value),
1243            Err(error) => {
1244                tracing::error!("{:?}", error);
1245                None
1246            }
1247        }
1248    }
1249}
1250
1251#[cfg(test)]
1252mod tests {
1253    use super::*;
1254    use crate::{
1255        db::{tests::TestDb, UserId},
1256        AppState,
1257    };
1258    use ::rpc::Peer;
1259    use client::{
1260        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1261        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1262    };
1263    use collections::BTreeMap;
1264    use editor::{
1265        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1266        ToOffset, ToggleCodeActions, Undo,
1267    };
1268    use gpui::{
1269        executor::{self, Deterministic},
1270        geometry::vector::vec2f,
1271        ModelHandle, TestAppContext, ViewHandle,
1272    };
1273    use language::{
1274        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1275        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1276    };
1277    use lsp::{self, FakeLanguageServer};
1278    use parking_lot::Mutex;
1279    use project::{
1280        fs::{FakeFs, Fs as _},
1281        search::SearchQuery,
1282        worktree::WorktreeHandle,
1283        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1284    };
1285    use rand::prelude::*;
1286    use rpc::PeerId;
1287    use serde_json::json;
1288    use settings::Settings;
1289    use sqlx::types::time::OffsetDateTime;
1290    use std::{
1291        env,
1292        ops::Deref,
1293        path::{Path, PathBuf},
1294        rc::Rc,
1295        sync::{
1296            atomic::{AtomicBool, Ordering::SeqCst},
1297            Arc,
1298        },
1299        time::Duration,
1300    };
1301    use theme::ThemeRegistry;
1302    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1303
1304    #[cfg(test)]
1305    #[ctor::ctor]
1306    fn init_logger() {
1307        if std::env::var("RUST_LOG").is_ok() {
1308            env_logger::init();
1309        }
1310    }
1311
1312    #[gpui::test(iterations = 10)]
1313    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1314        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1315        let lang_registry = Arc::new(LanguageRegistry::test());
1316        let fs = FakeFs::new(cx_a.background());
1317        cx_a.foreground().forbid_parking();
1318
1319        // Connect to a server as 2 clients.
1320        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1321        let client_a = server.create_client(cx_a, "user_a").await;
1322        let client_b = server.create_client(cx_b, "user_b").await;
1323
1324        // Share a project as client A
1325        fs.insert_tree(
1326            "/a",
1327            json!({
1328                ".zed.toml": r#"collaborators = ["user_b"]"#,
1329                "a.txt": "a-contents",
1330                "b.txt": "b-contents",
1331            }),
1332        )
1333        .await;
1334        let project_a = cx_a.update(|cx| {
1335            Project::local(
1336                client_a.clone(),
1337                client_a.user_store.clone(),
1338                lang_registry.clone(),
1339                fs.clone(),
1340                cx,
1341            )
1342        });
1343        let (worktree_a, _) = project_a
1344            .update(cx_a, |p, cx| {
1345                p.find_or_create_local_worktree("/a", true, cx)
1346            })
1347            .await
1348            .unwrap();
1349        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1350        worktree_a
1351            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1352            .await;
1353        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1354        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1355
1356        // Join that project as client B
1357        let project_b = Project::remote(
1358            project_id,
1359            client_b.clone(),
1360            client_b.user_store.clone(),
1361            lang_registry.clone(),
1362            fs.clone(),
1363            &mut cx_b.to_async(),
1364        )
1365        .await
1366        .unwrap();
1367
1368        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1369            assert_eq!(
1370                project
1371                    .collaborators()
1372                    .get(&client_a.peer_id)
1373                    .unwrap()
1374                    .user
1375                    .github_login,
1376                "user_a"
1377            );
1378            project.replica_id()
1379        });
1380        project_a
1381            .condition(&cx_a, |tree, _| {
1382                tree.collaborators()
1383                    .get(&client_b.peer_id)
1384                    .map_or(false, |collaborator| {
1385                        collaborator.replica_id == replica_id_b
1386                            && collaborator.user.github_login == "user_b"
1387                    })
1388            })
1389            .await;
1390
1391        // Open the same file as client B and client A.
1392        let buffer_b = project_b
1393            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1394            .await
1395            .unwrap();
1396        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1397        project_a.read_with(cx_a, |project, cx| {
1398            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1399        });
1400        let buffer_a = project_a
1401            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1402            .await
1403            .unwrap();
1404
1405        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1406
1407        // TODO
1408        // // Create a selection set as client B and see that selection set as client A.
1409        // buffer_a
1410        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1411        //     .await;
1412
1413        // Edit the buffer as client B and see that edit as client A.
1414        editor_b.update(cx_b, |editor, cx| {
1415            editor.handle_input(&Input("ok, ".into()), cx)
1416        });
1417        buffer_a
1418            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1419            .await;
1420
1421        // TODO
1422        // // Remove the selection set as client B, see those selections disappear as client A.
1423        cx_b.update(move |_| drop(editor_b));
1424        // buffer_a
1425        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1426        //     .await;
1427
1428        // Dropping the client B's project removes client B from client A's collaborators.
1429        cx_b.update(move |_| drop(project_b));
1430        project_a
1431            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1432            .await;
1433    }
1434
1435    #[gpui::test(iterations = 10)]
1436    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1437        let lang_registry = Arc::new(LanguageRegistry::test());
1438        let fs = FakeFs::new(cx_a.background());
1439        cx_a.foreground().forbid_parking();
1440
1441        // Connect to a server as 2 clients.
1442        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1443        let client_a = server.create_client(cx_a, "user_a").await;
1444        let client_b = server.create_client(cx_b, "user_b").await;
1445
1446        // Share a project as client A
1447        fs.insert_tree(
1448            "/a",
1449            json!({
1450                ".zed.toml": r#"collaborators = ["user_b"]"#,
1451                "a.txt": "a-contents",
1452                "b.txt": "b-contents",
1453            }),
1454        )
1455        .await;
1456        let project_a = cx_a.update(|cx| {
1457            Project::local(
1458                client_a.clone(),
1459                client_a.user_store.clone(),
1460                lang_registry.clone(),
1461                fs.clone(),
1462                cx,
1463            )
1464        });
1465        let (worktree_a, _) = project_a
1466            .update(cx_a, |p, cx| {
1467                p.find_or_create_local_worktree("/a", true, cx)
1468            })
1469            .await
1470            .unwrap();
1471        worktree_a
1472            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1473            .await;
1474        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1475        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1476        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1477        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1478
1479        // Join that project as client B
1480        let project_b = Project::remote(
1481            project_id,
1482            client_b.clone(),
1483            client_b.user_store.clone(),
1484            lang_registry.clone(),
1485            fs.clone(),
1486            &mut cx_b.to_async(),
1487        )
1488        .await
1489        .unwrap();
1490        project_b
1491            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1492            .await
1493            .unwrap();
1494
1495        // Unshare the project as client A
1496        project_a.update(cx_a, |project, cx| project.unshare(cx));
1497        project_b
1498            .condition(cx_b, |project, _| project.is_read_only())
1499            .await;
1500        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1501        cx_b.update(|_| {
1502            drop(project_b);
1503        });
1504
1505        // Share the project again and ensure guests can still join.
1506        project_a
1507            .update(cx_a, |project, cx| project.share(cx))
1508            .await
1509            .unwrap();
1510        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1511
1512        let project_b2 = Project::remote(
1513            project_id,
1514            client_b.clone(),
1515            client_b.user_store.clone(),
1516            lang_registry.clone(),
1517            fs.clone(),
1518            &mut cx_b.to_async(),
1519        )
1520        .await
1521        .unwrap();
1522        project_b2
1523            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1524            .await
1525            .unwrap();
1526    }
1527
1528    #[gpui::test(iterations = 10)]
1529    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1530        let lang_registry = Arc::new(LanguageRegistry::test());
1531        let fs = FakeFs::new(cx_a.background());
1532        cx_a.foreground().forbid_parking();
1533
1534        // Connect to a server as 2 clients.
1535        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1536        let client_a = server.create_client(cx_a, "user_a").await;
1537        let client_b = server.create_client(cx_b, "user_b").await;
1538
1539        // Share a project as client A
1540        fs.insert_tree(
1541            "/a",
1542            json!({
1543                ".zed.toml": r#"collaborators = ["user_b"]"#,
1544                "a.txt": "a-contents",
1545                "b.txt": "b-contents",
1546            }),
1547        )
1548        .await;
1549        let project_a = cx_a.update(|cx| {
1550            Project::local(
1551                client_a.clone(),
1552                client_a.user_store.clone(),
1553                lang_registry.clone(),
1554                fs.clone(),
1555                cx,
1556            )
1557        });
1558        let (worktree_a, _) = project_a
1559            .update(cx_a, |p, cx| {
1560                p.find_or_create_local_worktree("/a", true, cx)
1561            })
1562            .await
1563            .unwrap();
1564        worktree_a
1565            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1566            .await;
1567        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1568        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1569        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1570        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1571
1572        // Join that project as client B
1573        let project_b = Project::remote(
1574            project_id,
1575            client_b.clone(),
1576            client_b.user_store.clone(),
1577            lang_registry.clone(),
1578            fs.clone(),
1579            &mut cx_b.to_async(),
1580        )
1581        .await
1582        .unwrap();
1583        project_b
1584            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1585            .await
1586            .unwrap();
1587
1588        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1589        server.disconnect_client(client_a.current_user_id(cx_a));
1590        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1591        project_a
1592            .condition(cx_a, |project, _| project.collaborators().is_empty())
1593            .await;
1594        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1595        project_b
1596            .condition(cx_b, |project, _| project.is_read_only())
1597            .await;
1598        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1599        cx_b.update(|_| {
1600            drop(project_b);
1601        });
1602
1603        // Await reconnection
1604        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1605
1606        // Share the project again and ensure guests can still join.
1607        project_a
1608            .update(cx_a, |project, cx| project.share(cx))
1609            .await
1610            .unwrap();
1611        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1612
1613        let project_b2 = Project::remote(
1614            project_id,
1615            client_b.clone(),
1616            client_b.user_store.clone(),
1617            lang_registry.clone(),
1618            fs.clone(),
1619            &mut cx_b.to_async(),
1620        )
1621        .await
1622        .unwrap();
1623        project_b2
1624            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1625            .await
1626            .unwrap();
1627    }
1628
1629    #[gpui::test(iterations = 10)]
1630    async fn test_propagate_saves_and_fs_changes(
1631        cx_a: &mut TestAppContext,
1632        cx_b: &mut TestAppContext,
1633        cx_c: &mut TestAppContext,
1634    ) {
1635        let lang_registry = Arc::new(LanguageRegistry::test());
1636        let fs = FakeFs::new(cx_a.background());
1637        cx_a.foreground().forbid_parking();
1638
1639        // Connect to a server as 3 clients.
1640        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1641        let client_a = server.create_client(cx_a, "user_a").await;
1642        let client_b = server.create_client(cx_b, "user_b").await;
1643        let client_c = server.create_client(cx_c, "user_c").await;
1644
1645        // Share a worktree as client A.
1646        fs.insert_tree(
1647            "/a",
1648            json!({
1649                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1650                "file1": "",
1651                "file2": ""
1652            }),
1653        )
1654        .await;
1655        let project_a = cx_a.update(|cx| {
1656            Project::local(
1657                client_a.clone(),
1658                client_a.user_store.clone(),
1659                lang_registry.clone(),
1660                fs.clone(),
1661                cx,
1662            )
1663        });
1664        let (worktree_a, _) = project_a
1665            .update(cx_a, |p, cx| {
1666                p.find_or_create_local_worktree("/a", true, cx)
1667            })
1668            .await
1669            .unwrap();
1670        worktree_a
1671            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1672            .await;
1673        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1674        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1675        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1676
1677        // Join that worktree as clients B and C.
1678        let project_b = Project::remote(
1679            project_id,
1680            client_b.clone(),
1681            client_b.user_store.clone(),
1682            lang_registry.clone(),
1683            fs.clone(),
1684            &mut cx_b.to_async(),
1685        )
1686        .await
1687        .unwrap();
1688        let project_c = Project::remote(
1689            project_id,
1690            client_c.clone(),
1691            client_c.user_store.clone(),
1692            lang_registry.clone(),
1693            fs.clone(),
1694            &mut cx_c.to_async(),
1695        )
1696        .await
1697        .unwrap();
1698        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1699        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1700
1701        // Open and edit a buffer as both guests B and C.
1702        let buffer_b = project_b
1703            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1704            .await
1705            .unwrap();
1706        let buffer_c = project_c
1707            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1708            .await
1709            .unwrap();
1710        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1711        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1712
1713        // Open and edit that buffer as the host.
1714        let buffer_a = project_a
1715            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1716            .await
1717            .unwrap();
1718
1719        buffer_a
1720            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1721            .await;
1722        buffer_a.update(cx_a, |buf, cx| {
1723            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1724        });
1725
1726        // Wait for edits to propagate
1727        buffer_a
1728            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1729            .await;
1730        buffer_b
1731            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1732            .await;
1733        buffer_c
1734            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1735            .await;
1736
1737        // Edit the buffer as the host and concurrently save as guest B.
1738        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1739        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1740        save_b.await.unwrap();
1741        assert_eq!(
1742            fs.load("/a/file1".as_ref()).await.unwrap(),
1743            "hi-a, i-am-c, i-am-b, i-am-a"
1744        );
1745        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1746        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1747        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1748
1749        worktree_a.flush_fs_events(cx_a).await;
1750
1751        // Make changes on host's file system, see those changes on guest worktrees.
1752        fs.rename(
1753            "/a/file1".as_ref(),
1754            "/a/file1-renamed".as_ref(),
1755            Default::default(),
1756        )
1757        .await
1758        .unwrap();
1759
1760        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1761            .await
1762            .unwrap();
1763        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1764
1765        worktree_a
1766            .condition(&cx_a, |tree, _| {
1767                tree.paths()
1768                    .map(|p| p.to_string_lossy())
1769                    .collect::<Vec<_>>()
1770                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1771            })
1772            .await;
1773        worktree_b
1774            .condition(&cx_b, |tree, _| {
1775                tree.paths()
1776                    .map(|p| p.to_string_lossy())
1777                    .collect::<Vec<_>>()
1778                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1779            })
1780            .await;
1781        worktree_c
1782            .condition(&cx_c, |tree, _| {
1783                tree.paths()
1784                    .map(|p| p.to_string_lossy())
1785                    .collect::<Vec<_>>()
1786                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1787            })
1788            .await;
1789
1790        // Ensure buffer files are updated as well.
1791        buffer_a
1792            .condition(&cx_a, |buf, _| {
1793                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1794            })
1795            .await;
1796        buffer_b
1797            .condition(&cx_b, |buf, _| {
1798                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1799            })
1800            .await;
1801        buffer_c
1802            .condition(&cx_c, |buf, _| {
1803                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1804            })
1805            .await;
1806    }
1807
1808    #[gpui::test(iterations = 10)]
1809    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1810        cx_a.foreground().forbid_parking();
1811        let lang_registry = Arc::new(LanguageRegistry::test());
1812        let fs = FakeFs::new(cx_a.background());
1813
1814        // Connect to a server as 2 clients.
1815        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1816        let client_a = server.create_client(cx_a, "user_a").await;
1817        let client_b = server.create_client(cx_b, "user_b").await;
1818
1819        // Share a project as client A
1820        fs.insert_tree(
1821            "/dir",
1822            json!({
1823                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1824                "a.txt": "a-contents",
1825            }),
1826        )
1827        .await;
1828
1829        let project_a = cx_a.update(|cx| {
1830            Project::local(
1831                client_a.clone(),
1832                client_a.user_store.clone(),
1833                lang_registry.clone(),
1834                fs.clone(),
1835                cx,
1836            )
1837        });
1838        let (worktree_a, _) = project_a
1839            .update(cx_a, |p, cx| {
1840                p.find_or_create_local_worktree("/dir", true, cx)
1841            })
1842            .await
1843            .unwrap();
1844        worktree_a
1845            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1846            .await;
1847        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1848        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1849        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1850
1851        // Join that project as client B
1852        let project_b = Project::remote(
1853            project_id,
1854            client_b.clone(),
1855            client_b.user_store.clone(),
1856            lang_registry.clone(),
1857            fs.clone(),
1858            &mut cx_b.to_async(),
1859        )
1860        .await
1861        .unwrap();
1862
1863        // Open a buffer as client B
1864        let buffer_b = project_b
1865            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1866            .await
1867            .unwrap();
1868
1869        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1870        buffer_b.read_with(cx_b, |buf, _| {
1871            assert!(buf.is_dirty());
1872            assert!(!buf.has_conflict());
1873        });
1874
1875        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1876        buffer_b
1877            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1878            .await;
1879        buffer_b.read_with(cx_b, |buf, _| {
1880            assert!(!buf.has_conflict());
1881        });
1882
1883        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1884        buffer_b.read_with(cx_b, |buf, _| {
1885            assert!(buf.is_dirty());
1886            assert!(!buf.has_conflict());
1887        });
1888    }
1889
1890    #[gpui::test(iterations = 10)]
1891    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1892        cx_a.foreground().forbid_parking();
1893        let lang_registry = Arc::new(LanguageRegistry::test());
1894        let fs = FakeFs::new(cx_a.background());
1895
1896        // Connect to a server as 2 clients.
1897        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1898        let client_a = server.create_client(cx_a, "user_a").await;
1899        let client_b = server.create_client(cx_b, "user_b").await;
1900
1901        // Share a project as client A
1902        fs.insert_tree(
1903            "/dir",
1904            json!({
1905                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1906                "a.txt": "a-contents",
1907            }),
1908        )
1909        .await;
1910
1911        let project_a = cx_a.update(|cx| {
1912            Project::local(
1913                client_a.clone(),
1914                client_a.user_store.clone(),
1915                lang_registry.clone(),
1916                fs.clone(),
1917                cx,
1918            )
1919        });
1920        let (worktree_a, _) = project_a
1921            .update(cx_a, |p, cx| {
1922                p.find_or_create_local_worktree("/dir", true, cx)
1923            })
1924            .await
1925            .unwrap();
1926        worktree_a
1927            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1928            .await;
1929        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1930        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1931        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1932
1933        // Join that project as client B
1934        let project_b = Project::remote(
1935            project_id,
1936            client_b.clone(),
1937            client_b.user_store.clone(),
1938            lang_registry.clone(),
1939            fs.clone(),
1940            &mut cx_b.to_async(),
1941        )
1942        .await
1943        .unwrap();
1944        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1945
1946        // Open a buffer as client B
1947        let buffer_b = project_b
1948            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1949            .await
1950            .unwrap();
1951        buffer_b.read_with(cx_b, |buf, _| {
1952            assert!(!buf.is_dirty());
1953            assert!(!buf.has_conflict());
1954        });
1955
1956        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1957            .await
1958            .unwrap();
1959        buffer_b
1960            .condition(&cx_b, |buf, _| {
1961                buf.text() == "new contents" && !buf.is_dirty()
1962            })
1963            .await;
1964        buffer_b.read_with(cx_b, |buf, _| {
1965            assert!(!buf.has_conflict());
1966        });
1967    }
1968
1969    #[gpui::test(iterations = 10)]
1970    async fn test_editing_while_guest_opens_buffer(
1971        cx_a: &mut TestAppContext,
1972        cx_b: &mut TestAppContext,
1973    ) {
1974        cx_a.foreground().forbid_parking();
1975        let lang_registry = Arc::new(LanguageRegistry::test());
1976        let fs = FakeFs::new(cx_a.background());
1977
1978        // Connect to a server as 2 clients.
1979        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1980        let client_a = server.create_client(cx_a, "user_a").await;
1981        let client_b = server.create_client(cx_b, "user_b").await;
1982
1983        // Share a project as client A
1984        fs.insert_tree(
1985            "/dir",
1986            json!({
1987                ".zed.toml": r#"collaborators = ["user_b"]"#,
1988                "a.txt": "a-contents",
1989            }),
1990        )
1991        .await;
1992        let project_a = cx_a.update(|cx| {
1993            Project::local(
1994                client_a.clone(),
1995                client_a.user_store.clone(),
1996                lang_registry.clone(),
1997                fs.clone(),
1998                cx,
1999            )
2000        });
2001        let (worktree_a, _) = project_a
2002            .update(cx_a, |p, cx| {
2003                p.find_or_create_local_worktree("/dir", true, cx)
2004            })
2005            .await
2006            .unwrap();
2007        worktree_a
2008            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2009            .await;
2010        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2011        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2012        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2013
2014        // Join that project as client B
2015        let project_b = Project::remote(
2016            project_id,
2017            client_b.clone(),
2018            client_b.user_store.clone(),
2019            lang_registry.clone(),
2020            fs.clone(),
2021            &mut cx_b.to_async(),
2022        )
2023        .await
2024        .unwrap();
2025
2026        // Open a buffer as client A
2027        let buffer_a = project_a
2028            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2029            .await
2030            .unwrap();
2031
2032        // Start opening the same buffer as client B
2033        let buffer_b = cx_b
2034            .background()
2035            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2036
2037        // Edit the buffer as client A while client B is still opening it.
2038        cx_b.background().simulate_random_delay().await;
2039        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
2040        cx_b.background().simulate_random_delay().await;
2041        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
2042
2043        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2044        let buffer_b = buffer_b.await.unwrap();
2045        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2046    }
2047
2048    #[gpui::test(iterations = 10)]
2049    async fn test_leaving_worktree_while_opening_buffer(
2050        cx_a: &mut TestAppContext,
2051        cx_b: &mut TestAppContext,
2052    ) {
2053        cx_a.foreground().forbid_parking();
2054        let lang_registry = Arc::new(LanguageRegistry::test());
2055        let fs = FakeFs::new(cx_a.background());
2056
2057        // Connect to a server as 2 clients.
2058        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2059        let client_a = server.create_client(cx_a, "user_a").await;
2060        let client_b = server.create_client(cx_b, "user_b").await;
2061
2062        // Share a project as client A
2063        fs.insert_tree(
2064            "/dir",
2065            json!({
2066                ".zed.toml": r#"collaborators = ["user_b"]"#,
2067                "a.txt": "a-contents",
2068            }),
2069        )
2070        .await;
2071        let project_a = cx_a.update(|cx| {
2072            Project::local(
2073                client_a.clone(),
2074                client_a.user_store.clone(),
2075                lang_registry.clone(),
2076                fs.clone(),
2077                cx,
2078            )
2079        });
2080        let (worktree_a, _) = project_a
2081            .update(cx_a, |p, cx| {
2082                p.find_or_create_local_worktree("/dir", true, cx)
2083            })
2084            .await
2085            .unwrap();
2086        worktree_a
2087            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2088            .await;
2089        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2090        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2091        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2092
2093        // Join that project as client B
2094        let project_b = Project::remote(
2095            project_id,
2096            client_b.clone(),
2097            client_b.user_store.clone(),
2098            lang_registry.clone(),
2099            fs.clone(),
2100            &mut cx_b.to_async(),
2101        )
2102        .await
2103        .unwrap();
2104
2105        // See that a guest has joined as client A.
2106        project_a
2107            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2108            .await;
2109
2110        // Begin opening a buffer as client B, but leave the project before the open completes.
2111        let buffer_b = cx_b
2112            .background()
2113            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2114        cx_b.update(|_| drop(project_b));
2115        drop(buffer_b);
2116
2117        // See that the guest has left.
2118        project_a
2119            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2120            .await;
2121    }
2122
2123    #[gpui::test(iterations = 10)]
2124    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2125        cx_a.foreground().forbid_parking();
2126        let lang_registry = Arc::new(LanguageRegistry::test());
2127        let fs = FakeFs::new(cx_a.background());
2128
2129        // Connect to a server as 2 clients.
2130        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2131        let client_a = server.create_client(cx_a, "user_a").await;
2132        let client_b = server.create_client(cx_b, "user_b").await;
2133
2134        // Share a project as client A
2135        fs.insert_tree(
2136            "/a",
2137            json!({
2138                ".zed.toml": r#"collaborators = ["user_b"]"#,
2139                "a.txt": "a-contents",
2140                "b.txt": "b-contents",
2141            }),
2142        )
2143        .await;
2144        let project_a = cx_a.update(|cx| {
2145            Project::local(
2146                client_a.clone(),
2147                client_a.user_store.clone(),
2148                lang_registry.clone(),
2149                fs.clone(),
2150                cx,
2151            )
2152        });
2153        let (worktree_a, _) = project_a
2154            .update(cx_a, |p, cx| {
2155                p.find_or_create_local_worktree("/a", true, cx)
2156            })
2157            .await
2158            .unwrap();
2159        worktree_a
2160            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2161            .await;
2162        let project_id = project_a
2163            .update(cx_a, |project, _| project.next_remote_id())
2164            .await;
2165        project_a
2166            .update(cx_a, |project, cx| project.share(cx))
2167            .await
2168            .unwrap();
2169
2170        // Join that project as client B
2171        let _project_b = Project::remote(
2172            project_id,
2173            client_b.clone(),
2174            client_b.user_store.clone(),
2175            lang_registry.clone(),
2176            fs.clone(),
2177            &mut cx_b.to_async(),
2178        )
2179        .await
2180        .unwrap();
2181
2182        // Client A sees that a guest has joined.
2183        project_a
2184            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2185            .await;
2186
2187        // Drop client B's connection and ensure client A observes client B leaving the project.
2188        client_b.disconnect(&cx_b.to_async()).unwrap();
2189        project_a
2190            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2191            .await;
2192
2193        // Rejoin the project as client B
2194        let _project_b = Project::remote(
2195            project_id,
2196            client_b.clone(),
2197            client_b.user_store.clone(),
2198            lang_registry.clone(),
2199            fs.clone(),
2200            &mut cx_b.to_async(),
2201        )
2202        .await
2203        .unwrap();
2204
2205        // Client A sees that a guest has re-joined.
2206        project_a
2207            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2208            .await;
2209
2210        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2211        client_b.wait_for_current_user(cx_b).await;
2212        server.disconnect_client(client_b.current_user_id(cx_b));
2213        cx_a.foreground().advance_clock(Duration::from_secs(3));
2214        project_a
2215            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2216            .await;
2217    }
2218
2219    #[gpui::test(iterations = 10)]
2220    async fn test_collaborating_with_diagnostics(
2221        cx_a: &mut TestAppContext,
2222        cx_b: &mut TestAppContext,
2223    ) {
2224        cx_a.foreground().forbid_parking();
2225        let lang_registry = Arc::new(LanguageRegistry::test());
2226        let fs = FakeFs::new(cx_a.background());
2227
2228        // Set up a fake language server.
2229        let mut language = Language::new(
2230            LanguageConfig {
2231                name: "Rust".into(),
2232                path_suffixes: vec!["rs".to_string()],
2233                ..Default::default()
2234            },
2235            Some(tree_sitter_rust::language()),
2236        );
2237        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2238        lang_registry.add(Arc::new(language));
2239
2240        // Connect to a server as 2 clients.
2241        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2242        let client_a = server.create_client(cx_a, "user_a").await;
2243        let client_b = server.create_client(cx_b, "user_b").await;
2244
2245        // Share a project as client A
2246        fs.insert_tree(
2247            "/a",
2248            json!({
2249                ".zed.toml": r#"collaborators = ["user_b"]"#,
2250                "a.rs": "let one = two",
2251                "other.rs": "",
2252            }),
2253        )
2254        .await;
2255        let project_a = cx_a.update(|cx| {
2256            Project::local(
2257                client_a.clone(),
2258                client_a.user_store.clone(),
2259                lang_registry.clone(),
2260                fs.clone(),
2261                cx,
2262            )
2263        });
2264        let (worktree_a, _) = project_a
2265            .update(cx_a, |p, cx| {
2266                p.find_or_create_local_worktree("/a", true, cx)
2267            })
2268            .await
2269            .unwrap();
2270        worktree_a
2271            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2272            .await;
2273        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2274        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2275        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2276
2277        // Cause the language server to start.
2278        let _ = cx_a
2279            .background()
2280            .spawn(project_a.update(cx_a, |project, cx| {
2281                project.open_buffer(
2282                    ProjectPath {
2283                        worktree_id,
2284                        path: Path::new("other.rs").into(),
2285                    },
2286                    cx,
2287                )
2288            }))
2289            .await
2290            .unwrap();
2291
2292        // Simulate a language server reporting errors for a file.
2293        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2294        fake_language_server
2295            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2296            .await;
2297        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2298            lsp::PublishDiagnosticsParams {
2299                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2300                version: None,
2301                diagnostics: vec![lsp::Diagnostic {
2302                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2303                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2304                    message: "message 1".to_string(),
2305                    ..Default::default()
2306                }],
2307            },
2308        );
2309
2310        // Wait for server to see the diagnostics update.
2311        server
2312            .condition(|store| {
2313                let worktree = store
2314                    .project(project_id)
2315                    .unwrap()
2316                    .share
2317                    .as_ref()
2318                    .unwrap()
2319                    .worktrees
2320                    .get(&worktree_id.to_proto())
2321                    .unwrap();
2322
2323                !worktree.diagnostic_summaries.is_empty()
2324            })
2325            .await;
2326
2327        // Join the worktree as client B.
2328        let project_b = Project::remote(
2329            project_id,
2330            client_b.clone(),
2331            client_b.user_store.clone(),
2332            lang_registry.clone(),
2333            fs.clone(),
2334            &mut cx_b.to_async(),
2335        )
2336        .await
2337        .unwrap();
2338
2339        project_b.read_with(cx_b, |project, cx| {
2340            assert_eq!(
2341                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2342                &[(
2343                    ProjectPath {
2344                        worktree_id,
2345                        path: Arc::from(Path::new("a.rs")),
2346                    },
2347                    DiagnosticSummary {
2348                        error_count: 1,
2349                        warning_count: 0,
2350                        ..Default::default()
2351                    },
2352                )]
2353            )
2354        });
2355
2356        // Simulate a language server reporting more errors for a file.
2357        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2358            lsp::PublishDiagnosticsParams {
2359                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2360                version: None,
2361                diagnostics: vec![
2362                    lsp::Diagnostic {
2363                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2364                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2365                        message: "message 1".to_string(),
2366                        ..Default::default()
2367                    },
2368                    lsp::Diagnostic {
2369                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2370                        range: lsp::Range::new(
2371                            lsp::Position::new(0, 10),
2372                            lsp::Position::new(0, 13),
2373                        ),
2374                        message: "message 2".to_string(),
2375                        ..Default::default()
2376                    },
2377                ],
2378            },
2379        );
2380
2381        // Client b gets the updated summaries
2382        project_b
2383            .condition(&cx_b, |project, cx| {
2384                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2385                    == &[(
2386                        ProjectPath {
2387                            worktree_id,
2388                            path: Arc::from(Path::new("a.rs")),
2389                        },
2390                        DiagnosticSummary {
2391                            error_count: 1,
2392                            warning_count: 1,
2393                            ..Default::default()
2394                        },
2395                    )]
2396            })
2397            .await;
2398
2399        // Open the file with the errors on client B. They should be present.
2400        let buffer_b = cx_b
2401            .background()
2402            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2403            .await
2404            .unwrap();
2405
2406        buffer_b.read_with(cx_b, |buffer, _| {
2407            assert_eq!(
2408                buffer
2409                    .snapshot()
2410                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2411                    .map(|entry| entry)
2412                    .collect::<Vec<_>>(),
2413                &[
2414                    DiagnosticEntry {
2415                        range: Point::new(0, 4)..Point::new(0, 7),
2416                        diagnostic: Diagnostic {
2417                            group_id: 0,
2418                            message: "message 1".to_string(),
2419                            severity: lsp::DiagnosticSeverity::ERROR,
2420                            is_primary: true,
2421                            ..Default::default()
2422                        }
2423                    },
2424                    DiagnosticEntry {
2425                        range: Point::new(0, 10)..Point::new(0, 13),
2426                        diagnostic: Diagnostic {
2427                            group_id: 1,
2428                            severity: lsp::DiagnosticSeverity::WARNING,
2429                            message: "message 2".to_string(),
2430                            is_primary: true,
2431                            ..Default::default()
2432                        }
2433                    }
2434                ]
2435            );
2436        });
2437
2438        // Simulate a language server reporting no errors for a file.
2439        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2440            lsp::PublishDiagnosticsParams {
2441                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2442                version: None,
2443                diagnostics: vec![],
2444            },
2445        );
2446        project_a
2447            .condition(cx_a, |project, cx| {
2448                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2449            })
2450            .await;
2451        project_b
2452            .condition(cx_b, |project, cx| {
2453                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2454            })
2455            .await;
2456    }
2457
2458    #[gpui::test(iterations = 10)]
2459    async fn test_collaborating_with_completion(
2460        cx_a: &mut TestAppContext,
2461        cx_b: &mut TestAppContext,
2462    ) {
2463        cx_a.foreground().forbid_parking();
2464        let lang_registry = Arc::new(LanguageRegistry::test());
2465        let fs = FakeFs::new(cx_a.background());
2466
2467        // Set up a fake language server.
2468        let mut language = Language::new(
2469            LanguageConfig {
2470                name: "Rust".into(),
2471                path_suffixes: vec!["rs".to_string()],
2472                ..Default::default()
2473            },
2474            Some(tree_sitter_rust::language()),
2475        );
2476        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2477            capabilities: lsp::ServerCapabilities {
2478                completion_provider: Some(lsp::CompletionOptions {
2479                    trigger_characters: Some(vec![".".to_string()]),
2480                    ..Default::default()
2481                }),
2482                ..Default::default()
2483            },
2484            ..Default::default()
2485        });
2486        lang_registry.add(Arc::new(language));
2487
2488        // Connect to a server as 2 clients.
2489        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2490        let client_a = server.create_client(cx_a, "user_a").await;
2491        let client_b = server.create_client(cx_b, "user_b").await;
2492
2493        // Share a project as client A
2494        fs.insert_tree(
2495            "/a",
2496            json!({
2497                ".zed.toml": r#"collaborators = ["user_b"]"#,
2498                "main.rs": "fn main() { a }",
2499                "other.rs": "",
2500            }),
2501        )
2502        .await;
2503        let project_a = cx_a.update(|cx| {
2504            Project::local(
2505                client_a.clone(),
2506                client_a.user_store.clone(),
2507                lang_registry.clone(),
2508                fs.clone(),
2509                cx,
2510            )
2511        });
2512        let (worktree_a, _) = project_a
2513            .update(cx_a, |p, cx| {
2514                p.find_or_create_local_worktree("/a", true, cx)
2515            })
2516            .await
2517            .unwrap();
2518        worktree_a
2519            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2520            .await;
2521        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2522        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2523        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2524
2525        // Join the worktree as client B.
2526        let project_b = Project::remote(
2527            project_id,
2528            client_b.clone(),
2529            client_b.user_store.clone(),
2530            lang_registry.clone(),
2531            fs.clone(),
2532            &mut cx_b.to_async(),
2533        )
2534        .await
2535        .unwrap();
2536
2537        // Open a file in an editor as the guest.
2538        let buffer_b = project_b
2539            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2540            .await
2541            .unwrap();
2542        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2543        let editor_b = cx_b.add_view(window_b, |cx| {
2544            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2545        });
2546
2547        let fake_language_server = fake_language_servers.next().await.unwrap();
2548        buffer_b
2549            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2550            .await;
2551
2552        // Type a completion trigger character as the guest.
2553        editor_b.update(cx_b, |editor, cx| {
2554            editor.select_ranges([13..13], None, cx);
2555            editor.handle_input(&Input(".".into()), cx);
2556            cx.focus(&editor_b);
2557        });
2558
2559        // Receive a completion request as the host's language server.
2560        // Return some completions from the host's language server.
2561        cx_a.foreground().start_waiting();
2562        fake_language_server
2563            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2564                assert_eq!(
2565                    params.text_document_position.text_document.uri,
2566                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2567                );
2568                assert_eq!(
2569                    params.text_document_position.position,
2570                    lsp::Position::new(0, 14),
2571                );
2572
2573                Ok(Some(lsp::CompletionResponse::Array(vec![
2574                    lsp::CompletionItem {
2575                        label: "first_method(…)".into(),
2576                        detail: Some("fn(&mut self, B) -> C".into()),
2577                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2578                            new_text: "first_method($1)".to_string(),
2579                            range: lsp::Range::new(
2580                                lsp::Position::new(0, 14),
2581                                lsp::Position::new(0, 14),
2582                            ),
2583                        })),
2584                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2585                        ..Default::default()
2586                    },
2587                    lsp::CompletionItem {
2588                        label: "second_method(…)".into(),
2589                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2590                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2591                            new_text: "second_method()".to_string(),
2592                            range: lsp::Range::new(
2593                                lsp::Position::new(0, 14),
2594                                lsp::Position::new(0, 14),
2595                            ),
2596                        })),
2597                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2598                        ..Default::default()
2599                    },
2600                ])))
2601            })
2602            .next()
2603            .await
2604            .unwrap();
2605        cx_a.foreground().finish_waiting();
2606
2607        // Open the buffer on the host.
2608        let buffer_a = project_a
2609            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2610            .await
2611            .unwrap();
2612        buffer_a
2613            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2614            .await;
2615
2616        // Confirm a completion on the guest.
2617        editor_b
2618            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2619            .await;
2620        editor_b.update(cx_b, |editor, cx| {
2621            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2622            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2623        });
2624
2625        // Return a resolved completion from the host's language server.
2626        // The resolved completion has an additional text edit.
2627        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2628            |params, _| async move {
2629                assert_eq!(params.label, "first_method(…)");
2630                Ok(lsp::CompletionItem {
2631                    label: "first_method(…)".into(),
2632                    detail: Some("fn(&mut self, B) -> C".into()),
2633                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2634                        new_text: "first_method($1)".to_string(),
2635                        range: lsp::Range::new(
2636                            lsp::Position::new(0, 14),
2637                            lsp::Position::new(0, 14),
2638                        ),
2639                    })),
2640                    additional_text_edits: Some(vec![lsp::TextEdit {
2641                        new_text: "use d::SomeTrait;\n".to_string(),
2642                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2643                    }]),
2644                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2645                    ..Default::default()
2646                })
2647            },
2648        );
2649
2650        // The additional edit is applied.
2651        buffer_a
2652            .condition(&cx_a, |buffer, _| {
2653                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2654            })
2655            .await;
2656        buffer_b
2657            .condition(&cx_b, |buffer, _| {
2658                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2659            })
2660            .await;
2661    }
2662
2663    #[gpui::test(iterations = 10)]
2664    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2665        cx_a.foreground().forbid_parking();
2666        let lang_registry = Arc::new(LanguageRegistry::test());
2667        let fs = FakeFs::new(cx_a.background());
2668
2669        // Connect to a server as 2 clients.
2670        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2671        let client_a = server.create_client(cx_a, "user_a").await;
2672        let client_b = server.create_client(cx_b, "user_b").await;
2673
2674        // Share a project as client A
2675        fs.insert_tree(
2676            "/a",
2677            json!({
2678                ".zed.toml": r#"collaborators = ["user_b"]"#,
2679                "a.rs": "let one = 1;",
2680            }),
2681        )
2682        .await;
2683        let project_a = cx_a.update(|cx| {
2684            Project::local(
2685                client_a.clone(),
2686                client_a.user_store.clone(),
2687                lang_registry.clone(),
2688                fs.clone(),
2689                cx,
2690            )
2691        });
2692        let (worktree_a, _) = project_a
2693            .update(cx_a, |p, cx| {
2694                p.find_or_create_local_worktree("/a", true, cx)
2695            })
2696            .await
2697            .unwrap();
2698        worktree_a
2699            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2700            .await;
2701        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2702        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2703        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2704        let buffer_a = project_a
2705            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2706            .await
2707            .unwrap();
2708
2709        // Join the worktree as client B.
2710        let project_b = Project::remote(
2711            project_id,
2712            client_b.clone(),
2713            client_b.user_store.clone(),
2714            lang_registry.clone(),
2715            fs.clone(),
2716            &mut cx_b.to_async(),
2717        )
2718        .await
2719        .unwrap();
2720
2721        let buffer_b = cx_b
2722            .background()
2723            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2724            .await
2725            .unwrap();
2726        buffer_b.update(cx_b, |buffer, cx| {
2727            buffer.edit([4..7], "six", cx);
2728            buffer.edit([10..11], "6", cx);
2729            assert_eq!(buffer.text(), "let six = 6;");
2730            assert!(buffer.is_dirty());
2731            assert!(!buffer.has_conflict());
2732        });
2733        buffer_a
2734            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
2735            .await;
2736
2737        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
2738            .await
2739            .unwrap();
2740        buffer_a
2741            .condition(cx_a, |buffer, _| buffer.has_conflict())
2742            .await;
2743        buffer_b
2744            .condition(cx_b, |buffer, _| buffer.has_conflict())
2745            .await;
2746
2747        project_b
2748            .update(cx_b, |project, cx| {
2749                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
2750            })
2751            .await
2752            .unwrap();
2753        buffer_a.read_with(cx_a, |buffer, _| {
2754            assert_eq!(buffer.text(), "let seven = 7;");
2755            assert!(!buffer.is_dirty());
2756            assert!(!buffer.has_conflict());
2757        });
2758        buffer_b.read_with(cx_b, |buffer, _| {
2759            assert_eq!(buffer.text(), "let seven = 7;");
2760            assert!(!buffer.is_dirty());
2761            assert!(!buffer.has_conflict());
2762        });
2763
2764        buffer_a.update(cx_a, |buffer, cx| {
2765            // Undoing on the host is a no-op when the reload was initiated by the guest.
2766            buffer.undo(cx);
2767            assert_eq!(buffer.text(), "let seven = 7;");
2768            assert!(!buffer.is_dirty());
2769            assert!(!buffer.has_conflict());
2770        });
2771        buffer_b.update(cx_b, |buffer, cx| {
2772            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
2773            buffer.undo(cx);
2774            assert_eq!(buffer.text(), "let six = 6;");
2775            assert!(buffer.is_dirty());
2776            assert!(!buffer.has_conflict());
2777        });
2778    }
2779
2780    #[gpui::test(iterations = 10)]
2781    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2782        cx_a.foreground().forbid_parking();
2783        let lang_registry = Arc::new(LanguageRegistry::test());
2784        let fs = FakeFs::new(cx_a.background());
2785
2786        // Set up a fake language server.
2787        let mut language = Language::new(
2788            LanguageConfig {
2789                name: "Rust".into(),
2790                path_suffixes: vec!["rs".to_string()],
2791                ..Default::default()
2792            },
2793            Some(tree_sitter_rust::language()),
2794        );
2795        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2796        lang_registry.add(Arc::new(language));
2797
2798        // Connect to a server as 2 clients.
2799        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2800        let client_a = server.create_client(cx_a, "user_a").await;
2801        let client_b = server.create_client(cx_b, "user_b").await;
2802
2803        // Share a project as client A
2804        fs.insert_tree(
2805            "/a",
2806            json!({
2807                ".zed.toml": r#"collaborators = ["user_b"]"#,
2808                "a.rs": "let one = two",
2809            }),
2810        )
2811        .await;
2812        let project_a = cx_a.update(|cx| {
2813            Project::local(
2814                client_a.clone(),
2815                client_a.user_store.clone(),
2816                lang_registry.clone(),
2817                fs.clone(),
2818                cx,
2819            )
2820        });
2821        let (worktree_a, _) = project_a
2822            .update(cx_a, |p, cx| {
2823                p.find_or_create_local_worktree("/a", true, cx)
2824            })
2825            .await
2826            .unwrap();
2827        worktree_a
2828            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2829            .await;
2830        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2831        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2832        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2833
2834        // Join the worktree as client B.
2835        let project_b = Project::remote(
2836            project_id,
2837            client_b.clone(),
2838            client_b.user_store.clone(),
2839            lang_registry.clone(),
2840            fs.clone(),
2841            &mut cx_b.to_async(),
2842        )
2843        .await
2844        .unwrap();
2845
2846        let buffer_b = cx_b
2847            .background()
2848            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2849            .await
2850            .unwrap();
2851
2852        let fake_language_server = fake_language_servers.next().await.unwrap();
2853        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
2854            Ok(Some(vec![
2855                lsp::TextEdit {
2856                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2857                    new_text: "h".to_string(),
2858                },
2859                lsp::TextEdit {
2860                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2861                    new_text: "y".to_string(),
2862                },
2863            ]))
2864        });
2865
2866        project_b
2867            .update(cx_b, |project, cx| {
2868                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2869            })
2870            .await
2871            .unwrap();
2872        assert_eq!(
2873            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2874            "let honey = two"
2875        );
2876    }
2877
2878    #[gpui::test(iterations = 10)]
2879    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2880        cx_a.foreground().forbid_parking();
2881        let lang_registry = Arc::new(LanguageRegistry::test());
2882        let fs = FakeFs::new(cx_a.background());
2883        fs.insert_tree(
2884            "/root-1",
2885            json!({
2886                ".zed.toml": r#"collaborators = ["user_b"]"#,
2887                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2888            }),
2889        )
2890        .await;
2891        fs.insert_tree(
2892            "/root-2",
2893            json!({
2894                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2895            }),
2896        )
2897        .await;
2898
2899        // Set up a fake language server.
2900        let mut language = Language::new(
2901            LanguageConfig {
2902                name: "Rust".into(),
2903                path_suffixes: vec!["rs".to_string()],
2904                ..Default::default()
2905            },
2906            Some(tree_sitter_rust::language()),
2907        );
2908        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2909        lang_registry.add(Arc::new(language));
2910
2911        // Connect to a server as 2 clients.
2912        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2913        let client_a = server.create_client(cx_a, "user_a").await;
2914        let client_b = server.create_client(cx_b, "user_b").await;
2915
2916        // Share a project as client A
2917        let project_a = cx_a.update(|cx| {
2918            Project::local(
2919                client_a.clone(),
2920                client_a.user_store.clone(),
2921                lang_registry.clone(),
2922                fs.clone(),
2923                cx,
2924            )
2925        });
2926        let (worktree_a, _) = project_a
2927            .update(cx_a, |p, cx| {
2928                p.find_or_create_local_worktree("/root-1", true, cx)
2929            })
2930            .await
2931            .unwrap();
2932        worktree_a
2933            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2934            .await;
2935        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2936        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2937        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2938
2939        // Join the worktree as client B.
2940        let project_b = Project::remote(
2941            project_id,
2942            client_b.clone(),
2943            client_b.user_store.clone(),
2944            lang_registry.clone(),
2945            fs.clone(),
2946            &mut cx_b.to_async(),
2947        )
2948        .await
2949        .unwrap();
2950
2951        // Open the file on client B.
2952        let buffer_b = cx_b
2953            .background()
2954            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2955            .await
2956            .unwrap();
2957
2958        // Request the definition of a symbol as the guest.
2959        let fake_language_server = fake_language_servers.next().await.unwrap();
2960        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2961            |_, _| async move {
2962                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2963                    lsp::Location::new(
2964                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2965                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2966                    ),
2967                )))
2968            },
2969        );
2970
2971        let definitions_1 = project_b
2972            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2973            .await
2974            .unwrap();
2975        cx_b.read(|cx| {
2976            assert_eq!(definitions_1.len(), 1);
2977            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2978            let target_buffer = definitions_1[0].buffer.read(cx);
2979            assert_eq!(
2980                target_buffer.text(),
2981                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2982            );
2983            assert_eq!(
2984                definitions_1[0].range.to_point(target_buffer),
2985                Point::new(0, 6)..Point::new(0, 9)
2986            );
2987        });
2988
2989        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2990        // the previous call to `definition`.
2991        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2992            |_, _| async move {
2993                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2994                    lsp::Location::new(
2995                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2996                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2997                    ),
2998                )))
2999            },
3000        );
3001
3002        let definitions_2 = project_b
3003            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3004            .await
3005            .unwrap();
3006        cx_b.read(|cx| {
3007            assert_eq!(definitions_2.len(), 1);
3008            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3009            let target_buffer = definitions_2[0].buffer.read(cx);
3010            assert_eq!(
3011                target_buffer.text(),
3012                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3013            );
3014            assert_eq!(
3015                definitions_2[0].range.to_point(target_buffer),
3016                Point::new(1, 6)..Point::new(1, 11)
3017            );
3018        });
3019        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3020    }
3021
3022    #[gpui::test(iterations = 10)]
3023    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3024        cx_a.foreground().forbid_parking();
3025        let lang_registry = Arc::new(LanguageRegistry::test());
3026        let fs = FakeFs::new(cx_a.background());
3027        fs.insert_tree(
3028            "/root-1",
3029            json!({
3030                ".zed.toml": r#"collaborators = ["user_b"]"#,
3031                "one.rs": "const ONE: usize = 1;",
3032                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3033            }),
3034        )
3035        .await;
3036        fs.insert_tree(
3037            "/root-2",
3038            json!({
3039                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3040            }),
3041        )
3042        .await;
3043
3044        // Set up a fake language server.
3045        let mut language = Language::new(
3046            LanguageConfig {
3047                name: "Rust".into(),
3048                path_suffixes: vec!["rs".to_string()],
3049                ..Default::default()
3050            },
3051            Some(tree_sitter_rust::language()),
3052        );
3053        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3054        lang_registry.add(Arc::new(language));
3055
3056        // Connect to a server as 2 clients.
3057        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3058        let client_a = server.create_client(cx_a, "user_a").await;
3059        let client_b = server.create_client(cx_b, "user_b").await;
3060
3061        // Share a project as client A
3062        let project_a = cx_a.update(|cx| {
3063            Project::local(
3064                client_a.clone(),
3065                client_a.user_store.clone(),
3066                lang_registry.clone(),
3067                fs.clone(),
3068                cx,
3069            )
3070        });
3071        let (worktree_a, _) = project_a
3072            .update(cx_a, |p, cx| {
3073                p.find_or_create_local_worktree("/root-1", true, cx)
3074            })
3075            .await
3076            .unwrap();
3077        worktree_a
3078            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3079            .await;
3080        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3081        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3082        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3083
3084        // Join the worktree as client B.
3085        let project_b = Project::remote(
3086            project_id,
3087            client_b.clone(),
3088            client_b.user_store.clone(),
3089            lang_registry.clone(),
3090            fs.clone(),
3091            &mut cx_b.to_async(),
3092        )
3093        .await
3094        .unwrap();
3095
3096        // Open the file on client B.
3097        let buffer_b = cx_b
3098            .background()
3099            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3100            .await
3101            .unwrap();
3102
3103        // Request references to a symbol as the guest.
3104        let fake_language_server = fake_language_servers.next().await.unwrap();
3105        fake_language_server.handle_request::<lsp::request::References, _, _>(
3106            |params, _| async move {
3107                assert_eq!(
3108                    params.text_document_position.text_document.uri.as_str(),
3109                    "file:///root-1/one.rs"
3110                );
3111                Ok(Some(vec![
3112                    lsp::Location {
3113                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3114                        range: lsp::Range::new(
3115                            lsp::Position::new(0, 24),
3116                            lsp::Position::new(0, 27),
3117                        ),
3118                    },
3119                    lsp::Location {
3120                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3121                        range: lsp::Range::new(
3122                            lsp::Position::new(0, 35),
3123                            lsp::Position::new(0, 38),
3124                        ),
3125                    },
3126                    lsp::Location {
3127                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3128                        range: lsp::Range::new(
3129                            lsp::Position::new(0, 37),
3130                            lsp::Position::new(0, 40),
3131                        ),
3132                    },
3133                ]))
3134            },
3135        );
3136
3137        let references = project_b
3138            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3139            .await
3140            .unwrap();
3141        cx_b.read(|cx| {
3142            assert_eq!(references.len(), 3);
3143            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3144
3145            let two_buffer = references[0].buffer.read(cx);
3146            let three_buffer = references[2].buffer.read(cx);
3147            assert_eq!(
3148                two_buffer.file().unwrap().path().as_ref(),
3149                Path::new("two.rs")
3150            );
3151            assert_eq!(references[1].buffer, references[0].buffer);
3152            assert_eq!(
3153                three_buffer.file().unwrap().full_path(cx),
3154                Path::new("three.rs")
3155            );
3156
3157            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3158            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3159            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3160        });
3161    }
3162
3163    #[gpui::test(iterations = 10)]
3164    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3165        cx_a.foreground().forbid_parking();
3166        let lang_registry = Arc::new(LanguageRegistry::test());
3167        let fs = FakeFs::new(cx_a.background());
3168        fs.insert_tree(
3169            "/root-1",
3170            json!({
3171                ".zed.toml": r#"collaborators = ["user_b"]"#,
3172                "a": "hello world",
3173                "b": "goodnight moon",
3174                "c": "a world of goo",
3175                "d": "world champion of clown world",
3176            }),
3177        )
3178        .await;
3179        fs.insert_tree(
3180            "/root-2",
3181            json!({
3182                "e": "disney world is fun",
3183            }),
3184        )
3185        .await;
3186
3187        // Connect to a server as 2 clients.
3188        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3189        let client_a = server.create_client(cx_a, "user_a").await;
3190        let client_b = server.create_client(cx_b, "user_b").await;
3191
3192        // Share a project as client A
3193        let project_a = cx_a.update(|cx| {
3194            Project::local(
3195                client_a.clone(),
3196                client_a.user_store.clone(),
3197                lang_registry.clone(),
3198                fs.clone(),
3199                cx,
3200            )
3201        });
3202        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3203
3204        let (worktree_1, _) = project_a
3205            .update(cx_a, |p, cx| {
3206                p.find_or_create_local_worktree("/root-1", true, cx)
3207            })
3208            .await
3209            .unwrap();
3210        worktree_1
3211            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3212            .await;
3213        let (worktree_2, _) = project_a
3214            .update(cx_a, |p, cx| {
3215                p.find_or_create_local_worktree("/root-2", true, cx)
3216            })
3217            .await
3218            .unwrap();
3219        worktree_2
3220            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3221            .await;
3222
3223        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3224
3225        // Join the worktree as client B.
3226        let project_b = Project::remote(
3227            project_id,
3228            client_b.clone(),
3229            client_b.user_store.clone(),
3230            lang_registry.clone(),
3231            fs.clone(),
3232            &mut cx_b.to_async(),
3233        )
3234        .await
3235        .unwrap();
3236
3237        let results = project_b
3238            .update(cx_b, |project, cx| {
3239                project.search(SearchQuery::text("world", false, false), cx)
3240            })
3241            .await
3242            .unwrap();
3243
3244        let mut ranges_by_path = results
3245            .into_iter()
3246            .map(|(buffer, ranges)| {
3247                buffer.read_with(cx_b, |buffer, cx| {
3248                    let path = buffer.file().unwrap().full_path(cx);
3249                    let offset_ranges = ranges
3250                        .into_iter()
3251                        .map(|range| range.to_offset(buffer))
3252                        .collect::<Vec<_>>();
3253                    (path, offset_ranges)
3254                })
3255            })
3256            .collect::<Vec<_>>();
3257        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3258
3259        assert_eq!(
3260            ranges_by_path,
3261            &[
3262                (PathBuf::from("root-1/a"), vec![6..11]),
3263                (PathBuf::from("root-1/c"), vec![2..7]),
3264                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3265                (PathBuf::from("root-2/e"), vec![7..12]),
3266            ]
3267        );
3268    }
3269
3270    #[gpui::test(iterations = 10)]
3271    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3272        cx_a.foreground().forbid_parking();
3273        let lang_registry = Arc::new(LanguageRegistry::test());
3274        let fs = FakeFs::new(cx_a.background());
3275        fs.insert_tree(
3276            "/root-1",
3277            json!({
3278                ".zed.toml": r#"collaborators = ["user_b"]"#,
3279                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3280            }),
3281        )
3282        .await;
3283
3284        // Set up a fake language server.
3285        let mut language = Language::new(
3286            LanguageConfig {
3287                name: "Rust".into(),
3288                path_suffixes: vec!["rs".to_string()],
3289                ..Default::default()
3290            },
3291            Some(tree_sitter_rust::language()),
3292        );
3293        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3294        lang_registry.add(Arc::new(language));
3295
3296        // Connect to a server as 2 clients.
3297        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3298        let client_a = server.create_client(cx_a, "user_a").await;
3299        let client_b = server.create_client(cx_b, "user_b").await;
3300
3301        // Share a project as client A
3302        let project_a = cx_a.update(|cx| {
3303            Project::local(
3304                client_a.clone(),
3305                client_a.user_store.clone(),
3306                lang_registry.clone(),
3307                fs.clone(),
3308                cx,
3309            )
3310        });
3311        let (worktree_a, _) = project_a
3312            .update(cx_a, |p, cx| {
3313                p.find_or_create_local_worktree("/root-1", true, cx)
3314            })
3315            .await
3316            .unwrap();
3317        worktree_a
3318            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3319            .await;
3320        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3321        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3322        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3323
3324        // Join the worktree as client B.
3325        let project_b = Project::remote(
3326            project_id,
3327            client_b.clone(),
3328            client_b.user_store.clone(),
3329            lang_registry.clone(),
3330            fs.clone(),
3331            &mut cx_b.to_async(),
3332        )
3333        .await
3334        .unwrap();
3335
3336        // Open the file on client B.
3337        let buffer_b = cx_b
3338            .background()
3339            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3340            .await
3341            .unwrap();
3342
3343        // Request document highlights as the guest.
3344        let fake_language_server = fake_language_servers.next().await.unwrap();
3345        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3346            |params, _| async move {
3347                assert_eq!(
3348                    params
3349                        .text_document_position_params
3350                        .text_document
3351                        .uri
3352                        .as_str(),
3353                    "file:///root-1/main.rs"
3354                );
3355                assert_eq!(
3356                    params.text_document_position_params.position,
3357                    lsp::Position::new(0, 34)
3358                );
3359                Ok(Some(vec![
3360                    lsp::DocumentHighlight {
3361                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3362                        range: lsp::Range::new(
3363                            lsp::Position::new(0, 10),
3364                            lsp::Position::new(0, 16),
3365                        ),
3366                    },
3367                    lsp::DocumentHighlight {
3368                        kind: Some(lsp::DocumentHighlightKind::READ),
3369                        range: lsp::Range::new(
3370                            lsp::Position::new(0, 32),
3371                            lsp::Position::new(0, 38),
3372                        ),
3373                    },
3374                    lsp::DocumentHighlight {
3375                        kind: Some(lsp::DocumentHighlightKind::READ),
3376                        range: lsp::Range::new(
3377                            lsp::Position::new(0, 41),
3378                            lsp::Position::new(0, 47),
3379                        ),
3380                    },
3381                ]))
3382            },
3383        );
3384
3385        let highlights = project_b
3386            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3387            .await
3388            .unwrap();
3389        buffer_b.read_with(cx_b, |buffer, _| {
3390            let snapshot = buffer.snapshot();
3391
3392            let highlights = highlights
3393                .into_iter()
3394                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3395                .collect::<Vec<_>>();
3396            assert_eq!(
3397                highlights,
3398                &[
3399                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3400                    (lsp::DocumentHighlightKind::READ, 32..38),
3401                    (lsp::DocumentHighlightKind::READ, 41..47)
3402                ]
3403            )
3404        });
3405    }
3406
3407    #[gpui::test(iterations = 10)]
3408    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3409        cx_a.foreground().forbid_parking();
3410        let lang_registry = Arc::new(LanguageRegistry::test());
3411        let fs = FakeFs::new(cx_a.background());
3412        fs.insert_tree(
3413            "/code",
3414            json!({
3415                "crate-1": {
3416                    ".zed.toml": r#"collaborators = ["user_b"]"#,
3417                    "one.rs": "const ONE: usize = 1;",
3418                },
3419                "crate-2": {
3420                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3421                },
3422                "private": {
3423                    "passwords.txt": "the-password",
3424                }
3425            }),
3426        )
3427        .await;
3428
3429        // Set up a fake language server.
3430        let mut language = Language::new(
3431            LanguageConfig {
3432                name: "Rust".into(),
3433                path_suffixes: vec!["rs".to_string()],
3434                ..Default::default()
3435            },
3436            Some(tree_sitter_rust::language()),
3437        );
3438        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3439        lang_registry.add(Arc::new(language));
3440
3441        // Connect to a server as 2 clients.
3442        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3443        let client_a = server.create_client(cx_a, "user_a").await;
3444        let client_b = server.create_client(cx_b, "user_b").await;
3445
3446        // Share a project as client A
3447        let project_a = cx_a.update(|cx| {
3448            Project::local(
3449                client_a.clone(),
3450                client_a.user_store.clone(),
3451                lang_registry.clone(),
3452                fs.clone(),
3453                cx,
3454            )
3455        });
3456        let (worktree_a, _) = project_a
3457            .update(cx_a, |p, cx| {
3458                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3459            })
3460            .await
3461            .unwrap();
3462        worktree_a
3463            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3464            .await;
3465        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3466        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3467        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3468
3469        // Join the worktree as client B.
3470        let project_b = Project::remote(
3471            project_id,
3472            client_b.clone(),
3473            client_b.user_store.clone(),
3474            lang_registry.clone(),
3475            fs.clone(),
3476            &mut cx_b.to_async(),
3477        )
3478        .await
3479        .unwrap();
3480
3481        // Cause the language server to start.
3482        let _buffer = cx_b
3483            .background()
3484            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3485            .await
3486            .unwrap();
3487
3488        let fake_language_server = fake_language_servers.next().await.unwrap();
3489        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3490            |_, _| async move {
3491                #[allow(deprecated)]
3492                Ok(Some(vec![lsp::SymbolInformation {
3493                    name: "TWO".into(),
3494                    location: lsp::Location {
3495                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3496                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3497                    },
3498                    kind: lsp::SymbolKind::CONSTANT,
3499                    tags: None,
3500                    container_name: None,
3501                    deprecated: None,
3502                }]))
3503            },
3504        );
3505
3506        // Request the definition of a symbol as the guest.
3507        let symbols = project_b
3508            .update(cx_b, |p, cx| p.symbols("two", cx))
3509            .await
3510            .unwrap();
3511        assert_eq!(symbols.len(), 1);
3512        assert_eq!(symbols[0].name, "TWO");
3513
3514        // Open one of the returned symbols.
3515        let buffer_b_2 = project_b
3516            .update(cx_b, |project, cx| {
3517                project.open_buffer_for_symbol(&symbols[0], cx)
3518            })
3519            .await
3520            .unwrap();
3521        buffer_b_2.read_with(cx_b, |buffer, _| {
3522            assert_eq!(
3523                buffer.file().unwrap().path().as_ref(),
3524                Path::new("../crate-2/two.rs")
3525            );
3526        });
3527
3528        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3529        let mut fake_symbol = symbols[0].clone();
3530        fake_symbol.path = Path::new("/code/secrets").into();
3531        let error = project_b
3532            .update(cx_b, |project, cx| {
3533                project.open_buffer_for_symbol(&fake_symbol, cx)
3534            })
3535            .await
3536            .unwrap_err();
3537        assert!(error.to_string().contains("invalid symbol signature"));
3538    }
3539
3540    #[gpui::test(iterations = 10)]
3541    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3542        cx_a: &mut TestAppContext,
3543        cx_b: &mut TestAppContext,
3544        mut rng: StdRng,
3545    ) {
3546        cx_a.foreground().forbid_parking();
3547        let lang_registry = Arc::new(LanguageRegistry::test());
3548        let fs = FakeFs::new(cx_a.background());
3549        fs.insert_tree(
3550            "/root",
3551            json!({
3552                ".zed.toml": r#"collaborators = ["user_b"]"#,
3553                "a.rs": "const ONE: usize = b::TWO;",
3554                "b.rs": "const TWO: usize = 2",
3555            }),
3556        )
3557        .await;
3558
3559        // Set up a fake language server.
3560        let mut language = Language::new(
3561            LanguageConfig {
3562                name: "Rust".into(),
3563                path_suffixes: vec!["rs".to_string()],
3564                ..Default::default()
3565            },
3566            Some(tree_sitter_rust::language()),
3567        );
3568        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3569        lang_registry.add(Arc::new(language));
3570
3571        // Connect to a server as 2 clients.
3572        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3573        let client_a = server.create_client(cx_a, "user_a").await;
3574        let client_b = server.create_client(cx_b, "user_b").await;
3575
3576        // Share a project as client A
3577        let project_a = cx_a.update(|cx| {
3578            Project::local(
3579                client_a.clone(),
3580                client_a.user_store.clone(),
3581                lang_registry.clone(),
3582                fs.clone(),
3583                cx,
3584            )
3585        });
3586
3587        let (worktree_a, _) = project_a
3588            .update(cx_a, |p, cx| {
3589                p.find_or_create_local_worktree("/root", true, cx)
3590            })
3591            .await
3592            .unwrap();
3593        worktree_a
3594            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3595            .await;
3596        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3597        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3598        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3599
3600        // Join the worktree as client B.
3601        let project_b = Project::remote(
3602            project_id,
3603            client_b.clone(),
3604            client_b.user_store.clone(),
3605            lang_registry.clone(),
3606            fs.clone(),
3607            &mut cx_b.to_async(),
3608        )
3609        .await
3610        .unwrap();
3611
3612        let buffer_b1 = cx_b
3613            .background()
3614            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3615            .await
3616            .unwrap();
3617
3618        let fake_language_server = fake_language_servers.next().await.unwrap();
3619        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3620            |_, _| async move {
3621                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3622                    lsp::Location::new(
3623                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
3624                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3625                    ),
3626                )))
3627            },
3628        );
3629
3630        let definitions;
3631        let buffer_b2;
3632        if rng.gen() {
3633            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3634            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3635        } else {
3636            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3637            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3638        }
3639
3640        let buffer_b2 = buffer_b2.await.unwrap();
3641        let definitions = definitions.await.unwrap();
3642        assert_eq!(definitions.len(), 1);
3643        assert_eq!(definitions[0].buffer, buffer_b2);
3644    }
3645
3646    #[gpui::test(iterations = 10)]
3647    async fn test_collaborating_with_code_actions(
3648        cx_a: &mut TestAppContext,
3649        cx_b: &mut TestAppContext,
3650    ) {
3651        cx_a.foreground().forbid_parking();
3652        let lang_registry = Arc::new(LanguageRegistry::test());
3653        let fs = FakeFs::new(cx_a.background());
3654        cx_b.update(|cx| editor::init(cx));
3655
3656        // Set up a fake language server.
3657        let mut language = Language::new(
3658            LanguageConfig {
3659                name: "Rust".into(),
3660                path_suffixes: vec!["rs".to_string()],
3661                ..Default::default()
3662            },
3663            Some(tree_sitter_rust::language()),
3664        );
3665        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3666        lang_registry.add(Arc::new(language));
3667
3668        // Connect to a server as 2 clients.
3669        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3670        let client_a = server.create_client(cx_a, "user_a").await;
3671        let client_b = server.create_client(cx_b, "user_b").await;
3672
3673        // Share a project as client A
3674        fs.insert_tree(
3675            "/a",
3676            json!({
3677                ".zed.toml": r#"collaborators = ["user_b"]"#,
3678                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3679                "other.rs": "pub fn foo() -> usize { 4 }",
3680            }),
3681        )
3682        .await;
3683        let project_a = cx_a.update(|cx| {
3684            Project::local(
3685                client_a.clone(),
3686                client_a.user_store.clone(),
3687                lang_registry.clone(),
3688                fs.clone(),
3689                cx,
3690            )
3691        });
3692        let (worktree_a, _) = project_a
3693            .update(cx_a, |p, cx| {
3694                p.find_or_create_local_worktree("/a", true, cx)
3695            })
3696            .await
3697            .unwrap();
3698        worktree_a
3699            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3700            .await;
3701        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3702        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3703        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3704
3705        // Join the worktree as client B.
3706        let project_b = Project::remote(
3707            project_id,
3708            client_b.clone(),
3709            client_b.user_store.clone(),
3710            lang_registry.clone(),
3711            fs.clone(),
3712            &mut cx_b.to_async(),
3713        )
3714        .await
3715        .unwrap();
3716        let mut params = cx_b.update(WorkspaceParams::test);
3717        params.languages = lang_registry.clone();
3718        params.client = client_b.client.clone();
3719        params.user_store = client_b.user_store.clone();
3720        params.project = project_b;
3721
3722        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3723        let editor_b = workspace_b
3724            .update(cx_b, |workspace, cx| {
3725                workspace.open_path((worktree_id, "main.rs"), cx)
3726            })
3727            .await
3728            .unwrap()
3729            .downcast::<Editor>()
3730            .unwrap();
3731
3732        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3733        fake_language_server
3734            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3735                assert_eq!(
3736                    params.text_document.uri,
3737                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3738                );
3739                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3740                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3741                Ok(None)
3742            })
3743            .next()
3744            .await;
3745
3746        // Move cursor to a location that contains code actions.
3747        editor_b.update(cx_b, |editor, cx| {
3748            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3749            cx.focus(&editor_b);
3750        });
3751
3752        fake_language_server
3753            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3754                assert_eq!(
3755                    params.text_document.uri,
3756                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3757                );
3758                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3759                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3760
3761                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
3762                    lsp::CodeAction {
3763                        title: "Inline into all callers".to_string(),
3764                        edit: Some(lsp::WorkspaceEdit {
3765                            changes: Some(
3766                                [
3767                                    (
3768                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3769                                        vec![lsp::TextEdit::new(
3770                                            lsp::Range::new(
3771                                                lsp::Position::new(1, 22),
3772                                                lsp::Position::new(1, 34),
3773                                            ),
3774                                            "4".to_string(),
3775                                        )],
3776                                    ),
3777                                    (
3778                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3779                                        vec![lsp::TextEdit::new(
3780                                            lsp::Range::new(
3781                                                lsp::Position::new(0, 0),
3782                                                lsp::Position::new(0, 27),
3783                                            ),
3784                                            "".to_string(),
3785                                        )],
3786                                    ),
3787                                ]
3788                                .into_iter()
3789                                .collect(),
3790                            ),
3791                            ..Default::default()
3792                        }),
3793                        data: Some(json!({
3794                            "codeActionParams": {
3795                                "range": {
3796                                    "start": {"line": 1, "column": 31},
3797                                    "end": {"line": 1, "column": 31},
3798                                }
3799                            }
3800                        })),
3801                        ..Default::default()
3802                    },
3803                )]))
3804            })
3805            .next()
3806            .await;
3807
3808        // Toggle code actions and wait for them to display.
3809        editor_b.update(cx_b, |editor, cx| {
3810            editor.toggle_code_actions(
3811                &ToggleCodeActions {
3812                    deployed_from_indicator: false,
3813                },
3814                cx,
3815            );
3816        });
3817        editor_b
3818            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3819            .await;
3820
3821        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3822
3823        // Confirming the code action will trigger a resolve request.
3824        let confirm_action = workspace_b
3825            .update(cx_b, |workspace, cx| {
3826                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
3827            })
3828            .unwrap();
3829        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
3830            |_, _| async move {
3831                Ok(lsp::CodeAction {
3832                    title: "Inline into all callers".to_string(),
3833                    edit: Some(lsp::WorkspaceEdit {
3834                        changes: Some(
3835                            [
3836                                (
3837                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3838                                    vec![lsp::TextEdit::new(
3839                                        lsp::Range::new(
3840                                            lsp::Position::new(1, 22),
3841                                            lsp::Position::new(1, 34),
3842                                        ),
3843                                        "4".to_string(),
3844                                    )],
3845                                ),
3846                                (
3847                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
3848                                    vec![lsp::TextEdit::new(
3849                                        lsp::Range::new(
3850                                            lsp::Position::new(0, 0),
3851                                            lsp::Position::new(0, 27),
3852                                        ),
3853                                        "".to_string(),
3854                                    )],
3855                                ),
3856                            ]
3857                            .into_iter()
3858                            .collect(),
3859                        ),
3860                        ..Default::default()
3861                    }),
3862                    ..Default::default()
3863                })
3864            },
3865        );
3866
3867        // After the action is confirmed, an editor containing both modified files is opened.
3868        confirm_action.await.unwrap();
3869        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3870            workspace
3871                .active_item(cx)
3872                .unwrap()
3873                .downcast::<Editor>()
3874                .unwrap()
3875        });
3876        code_action_editor.update(cx_b, |editor, cx| {
3877            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3878            editor.undo(&Undo, cx);
3879            assert_eq!(
3880                editor.text(cx),
3881                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3882            );
3883            editor.redo(&Redo, cx);
3884            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3885        });
3886    }
3887
3888    #[gpui::test(iterations = 10)]
3889    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3890        cx_a.foreground().forbid_parking();
3891        let lang_registry = Arc::new(LanguageRegistry::test());
3892        let fs = FakeFs::new(cx_a.background());
3893        cx_b.update(|cx| editor::init(cx));
3894
3895        // Set up a fake language server.
3896        let mut language = Language::new(
3897            LanguageConfig {
3898                name: "Rust".into(),
3899                path_suffixes: vec!["rs".to_string()],
3900                ..Default::default()
3901            },
3902            Some(tree_sitter_rust::language()),
3903        );
3904        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3905            capabilities: lsp::ServerCapabilities {
3906                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
3907                    prepare_provider: Some(true),
3908                    work_done_progress_options: Default::default(),
3909                })),
3910                ..Default::default()
3911            },
3912            ..Default::default()
3913        });
3914        lang_registry.add(Arc::new(language));
3915
3916        // Connect to a server as 2 clients.
3917        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3918        let client_a = server.create_client(cx_a, "user_a").await;
3919        let client_b = server.create_client(cx_b, "user_b").await;
3920
3921        // Share a project as client A
3922        fs.insert_tree(
3923            "/dir",
3924            json!({
3925                ".zed.toml": r#"collaborators = ["user_b"]"#,
3926                "one.rs": "const ONE: usize = 1;",
3927                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3928            }),
3929        )
3930        .await;
3931        let project_a = cx_a.update(|cx| {
3932            Project::local(
3933                client_a.clone(),
3934                client_a.user_store.clone(),
3935                lang_registry.clone(),
3936                fs.clone(),
3937                cx,
3938            )
3939        });
3940        let (worktree_a, _) = project_a
3941            .update(cx_a, |p, cx| {
3942                p.find_or_create_local_worktree("/dir", true, cx)
3943            })
3944            .await
3945            .unwrap();
3946        worktree_a
3947            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3948            .await;
3949        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3950        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3951        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3952
3953        // Join the worktree as client B.
3954        let project_b = Project::remote(
3955            project_id,
3956            client_b.clone(),
3957            client_b.user_store.clone(),
3958            lang_registry.clone(),
3959            fs.clone(),
3960            &mut cx_b.to_async(),
3961        )
3962        .await
3963        .unwrap();
3964        let mut params = cx_b.update(WorkspaceParams::test);
3965        params.languages = lang_registry.clone();
3966        params.client = client_b.client.clone();
3967        params.user_store = client_b.user_store.clone();
3968        params.project = project_b;
3969
3970        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3971        let editor_b = workspace_b
3972            .update(cx_b, |workspace, cx| {
3973                workspace.open_path((worktree_id, "one.rs"), cx)
3974            })
3975            .await
3976            .unwrap()
3977            .downcast::<Editor>()
3978            .unwrap();
3979        let fake_language_server = fake_language_servers.next().await.unwrap();
3980
3981        // Move cursor to a location that can be renamed.
3982        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3983            editor.select_ranges([7..7], None, cx);
3984            editor.rename(&Rename, cx).unwrap()
3985        });
3986
3987        fake_language_server
3988            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
3989                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3990                assert_eq!(params.position, lsp::Position::new(0, 7));
3991                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3992                    lsp::Position::new(0, 6),
3993                    lsp::Position::new(0, 9),
3994                ))))
3995            })
3996            .next()
3997            .await
3998            .unwrap();
3999        prepare_rename.await.unwrap();
4000        editor_b.update(cx_b, |editor, cx| {
4001            let rename = editor.pending_rename().unwrap();
4002            let buffer = editor.buffer().read(cx).snapshot(cx);
4003            assert_eq!(
4004                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4005                6..9
4006            );
4007            rename.editor.update(cx, |rename_editor, cx| {
4008                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4009                    rename_buffer.edit([0..3], "THREE", cx);
4010                });
4011            });
4012        });
4013
4014        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4015            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4016        });
4017        fake_language_server
4018            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4019                assert_eq!(
4020                    params.text_document_position.text_document.uri.as_str(),
4021                    "file:///dir/one.rs"
4022                );
4023                assert_eq!(
4024                    params.text_document_position.position,
4025                    lsp::Position::new(0, 6)
4026                );
4027                assert_eq!(params.new_name, "THREE");
4028                Ok(Some(lsp::WorkspaceEdit {
4029                    changes: Some(
4030                        [
4031                            (
4032                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4033                                vec![lsp::TextEdit::new(
4034                                    lsp::Range::new(
4035                                        lsp::Position::new(0, 6),
4036                                        lsp::Position::new(0, 9),
4037                                    ),
4038                                    "THREE".to_string(),
4039                                )],
4040                            ),
4041                            (
4042                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4043                                vec![
4044                                    lsp::TextEdit::new(
4045                                        lsp::Range::new(
4046                                            lsp::Position::new(0, 24),
4047                                            lsp::Position::new(0, 27),
4048                                        ),
4049                                        "THREE".to_string(),
4050                                    ),
4051                                    lsp::TextEdit::new(
4052                                        lsp::Range::new(
4053                                            lsp::Position::new(0, 35),
4054                                            lsp::Position::new(0, 38),
4055                                        ),
4056                                        "THREE".to_string(),
4057                                    ),
4058                                ],
4059                            ),
4060                        ]
4061                        .into_iter()
4062                        .collect(),
4063                    ),
4064                    ..Default::default()
4065                }))
4066            })
4067            .next()
4068            .await
4069            .unwrap();
4070        confirm_rename.await.unwrap();
4071
4072        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4073            workspace
4074                .active_item(cx)
4075                .unwrap()
4076                .downcast::<Editor>()
4077                .unwrap()
4078        });
4079        rename_editor.update(cx_b, |editor, cx| {
4080            assert_eq!(
4081                editor.text(cx),
4082                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
4083            );
4084            editor.undo(&Undo, cx);
4085            assert_eq!(
4086                editor.text(cx),
4087                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
4088            );
4089            editor.redo(&Redo, cx);
4090            assert_eq!(
4091                editor.text(cx),
4092                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
4093            );
4094        });
4095
4096        // Ensure temporary rename edits cannot be undone/redone.
4097        editor_b.update(cx_b, |editor, cx| {
4098            editor.undo(&Undo, cx);
4099            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4100            editor.undo(&Undo, cx);
4101            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4102            editor.redo(&Redo, cx);
4103            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4104        })
4105    }
4106
4107    #[gpui::test(iterations = 10)]
4108    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4109        cx_a.foreground().forbid_parking();
4110
4111        // Connect to a server as 2 clients.
4112        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4113        let client_a = server.create_client(cx_a, "user_a").await;
4114        let client_b = server.create_client(cx_b, "user_b").await;
4115
4116        // Create an org that includes these 2 users.
4117        let db = &server.app_state.db;
4118        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4119        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4120            .await
4121            .unwrap();
4122        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4123            .await
4124            .unwrap();
4125
4126        // Create a channel that includes all the users.
4127        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4128        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4129            .await
4130            .unwrap();
4131        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4132            .await
4133            .unwrap();
4134        db.create_channel_message(
4135            channel_id,
4136            client_b.current_user_id(&cx_b),
4137            "hello A, it's B.",
4138            OffsetDateTime::now_utc(),
4139            1,
4140        )
4141        .await
4142        .unwrap();
4143
4144        let channels_a = cx_a
4145            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4146        channels_a
4147            .condition(cx_a, |list, _| list.available_channels().is_some())
4148            .await;
4149        channels_a.read_with(cx_a, |list, _| {
4150            assert_eq!(
4151                list.available_channels().unwrap(),
4152                &[ChannelDetails {
4153                    id: channel_id.to_proto(),
4154                    name: "test-channel".to_string()
4155                }]
4156            )
4157        });
4158        let channel_a = channels_a.update(cx_a, |this, cx| {
4159            this.get_channel(channel_id.to_proto(), cx).unwrap()
4160        });
4161        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4162        channel_a
4163            .condition(&cx_a, |channel, _| {
4164                channel_messages(channel)
4165                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4166            })
4167            .await;
4168
4169        let channels_b = cx_b
4170            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4171        channels_b
4172            .condition(cx_b, |list, _| list.available_channels().is_some())
4173            .await;
4174        channels_b.read_with(cx_b, |list, _| {
4175            assert_eq!(
4176                list.available_channels().unwrap(),
4177                &[ChannelDetails {
4178                    id: channel_id.to_proto(),
4179                    name: "test-channel".to_string()
4180                }]
4181            )
4182        });
4183
4184        let channel_b = channels_b.update(cx_b, |this, cx| {
4185            this.get_channel(channel_id.to_proto(), cx).unwrap()
4186        });
4187        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4188        channel_b
4189            .condition(&cx_b, |channel, _| {
4190                channel_messages(channel)
4191                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4192            })
4193            .await;
4194
4195        channel_a
4196            .update(cx_a, |channel, cx| {
4197                channel
4198                    .send_message("oh, hi B.".to_string(), cx)
4199                    .unwrap()
4200                    .detach();
4201                let task = channel.send_message("sup".to_string(), cx).unwrap();
4202                assert_eq!(
4203                    channel_messages(channel),
4204                    &[
4205                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4206                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4207                        ("user_a".to_string(), "sup".to_string(), true)
4208                    ]
4209                );
4210                task
4211            })
4212            .await
4213            .unwrap();
4214
4215        channel_b
4216            .condition(&cx_b, |channel, _| {
4217                channel_messages(channel)
4218                    == [
4219                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4220                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4221                        ("user_a".to_string(), "sup".to_string(), false),
4222                    ]
4223            })
4224            .await;
4225
4226        assert_eq!(
4227            server
4228                .state()
4229                .await
4230                .channel(channel_id)
4231                .unwrap()
4232                .connection_ids
4233                .len(),
4234            2
4235        );
4236        cx_b.update(|_| drop(channel_b));
4237        server
4238            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4239            .await;
4240
4241        cx_a.update(|_| drop(channel_a));
4242        server
4243            .condition(|state| state.channel(channel_id).is_none())
4244            .await;
4245    }
4246
4247    #[gpui::test(iterations = 10)]
4248    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4249        cx_a.foreground().forbid_parking();
4250
4251        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4252        let client_a = server.create_client(cx_a, "user_a").await;
4253
4254        let db = &server.app_state.db;
4255        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4256        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4257        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4258            .await
4259            .unwrap();
4260        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4261            .await
4262            .unwrap();
4263
4264        let channels_a = cx_a
4265            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4266        channels_a
4267            .condition(cx_a, |list, _| list.available_channels().is_some())
4268            .await;
4269        let channel_a = channels_a.update(cx_a, |this, cx| {
4270            this.get_channel(channel_id.to_proto(), cx).unwrap()
4271        });
4272
4273        // Messages aren't allowed to be too long.
4274        channel_a
4275            .update(cx_a, |channel, cx| {
4276                let long_body = "this is long.\n".repeat(1024);
4277                channel.send_message(long_body, cx).unwrap()
4278            })
4279            .await
4280            .unwrap_err();
4281
4282        // Messages aren't allowed to be blank.
4283        channel_a.update(cx_a, |channel, cx| {
4284            channel.send_message(String::new(), cx).unwrap_err()
4285        });
4286
4287        // Leading and trailing whitespace are trimmed.
4288        channel_a
4289            .update(cx_a, |channel, cx| {
4290                channel
4291                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4292                    .unwrap()
4293            })
4294            .await
4295            .unwrap();
4296        assert_eq!(
4297            db.get_channel_messages(channel_id, 10, None)
4298                .await
4299                .unwrap()
4300                .iter()
4301                .map(|m| &m.body)
4302                .collect::<Vec<_>>(),
4303            &["surrounded by whitespace"]
4304        );
4305    }
4306
4307    #[gpui::test(iterations = 10)]
4308    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4309        cx_a.foreground().forbid_parking();
4310
4311        // Connect to a server as 2 clients.
4312        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4313        let client_a = server.create_client(cx_a, "user_a").await;
4314        let client_b = server.create_client(cx_b, "user_b").await;
4315        let mut status_b = client_b.status();
4316
4317        // Create an org that includes these 2 users.
4318        let db = &server.app_state.db;
4319        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4320        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4321            .await
4322            .unwrap();
4323        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4324            .await
4325            .unwrap();
4326
4327        // Create a channel that includes all the users.
4328        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4329        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4330            .await
4331            .unwrap();
4332        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4333            .await
4334            .unwrap();
4335        db.create_channel_message(
4336            channel_id,
4337            client_b.current_user_id(&cx_b),
4338            "hello A, it's B.",
4339            OffsetDateTime::now_utc(),
4340            2,
4341        )
4342        .await
4343        .unwrap();
4344
4345        let channels_a = cx_a
4346            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4347        channels_a
4348            .condition(cx_a, |list, _| list.available_channels().is_some())
4349            .await;
4350
4351        channels_a.read_with(cx_a, |list, _| {
4352            assert_eq!(
4353                list.available_channels().unwrap(),
4354                &[ChannelDetails {
4355                    id: channel_id.to_proto(),
4356                    name: "test-channel".to_string()
4357                }]
4358            )
4359        });
4360        let channel_a = channels_a.update(cx_a, |this, cx| {
4361            this.get_channel(channel_id.to_proto(), cx).unwrap()
4362        });
4363        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4364        channel_a
4365            .condition(&cx_a, |channel, _| {
4366                channel_messages(channel)
4367                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4368            })
4369            .await;
4370
4371        let channels_b = cx_b
4372            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4373        channels_b
4374            .condition(cx_b, |list, _| list.available_channels().is_some())
4375            .await;
4376        channels_b.read_with(cx_b, |list, _| {
4377            assert_eq!(
4378                list.available_channels().unwrap(),
4379                &[ChannelDetails {
4380                    id: channel_id.to_proto(),
4381                    name: "test-channel".to_string()
4382                }]
4383            )
4384        });
4385
4386        let channel_b = channels_b.update(cx_b, |this, cx| {
4387            this.get_channel(channel_id.to_proto(), cx).unwrap()
4388        });
4389        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4390        channel_b
4391            .condition(&cx_b, |channel, _| {
4392                channel_messages(channel)
4393                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4394            })
4395            .await;
4396
4397        // Disconnect client B, ensuring we can still access its cached channel data.
4398        server.forbid_connections();
4399        server.disconnect_client(client_b.current_user_id(&cx_b));
4400        cx_b.foreground().advance_clock(Duration::from_secs(3));
4401        while !matches!(
4402            status_b.next().await,
4403            Some(client::Status::ReconnectionError { .. })
4404        ) {}
4405
4406        channels_b.read_with(cx_b, |channels, _| {
4407            assert_eq!(
4408                channels.available_channels().unwrap(),
4409                [ChannelDetails {
4410                    id: channel_id.to_proto(),
4411                    name: "test-channel".to_string()
4412                }]
4413            )
4414        });
4415        channel_b.read_with(cx_b, |channel, _| {
4416            assert_eq!(
4417                channel_messages(channel),
4418                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4419            )
4420        });
4421
4422        // Send a message from client B while it is disconnected.
4423        channel_b
4424            .update(cx_b, |channel, cx| {
4425                let task = channel
4426                    .send_message("can you see this?".to_string(), cx)
4427                    .unwrap();
4428                assert_eq!(
4429                    channel_messages(channel),
4430                    &[
4431                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4432                        ("user_b".to_string(), "can you see this?".to_string(), true)
4433                    ]
4434                );
4435                task
4436            })
4437            .await
4438            .unwrap_err();
4439
4440        // Send a message from client A while B is disconnected.
4441        channel_a
4442            .update(cx_a, |channel, cx| {
4443                channel
4444                    .send_message("oh, hi B.".to_string(), cx)
4445                    .unwrap()
4446                    .detach();
4447                let task = channel.send_message("sup".to_string(), cx).unwrap();
4448                assert_eq!(
4449                    channel_messages(channel),
4450                    &[
4451                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4452                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4453                        ("user_a".to_string(), "sup".to_string(), true)
4454                    ]
4455                );
4456                task
4457            })
4458            .await
4459            .unwrap();
4460
4461        // Give client B a chance to reconnect.
4462        server.allow_connections();
4463        cx_b.foreground().advance_clock(Duration::from_secs(10));
4464
4465        // Verify that B sees the new messages upon reconnection, as well as the message client B
4466        // sent while offline.
4467        channel_b
4468            .condition(&cx_b, |channel, _| {
4469                channel_messages(channel)
4470                    == [
4471                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4472                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4473                        ("user_a".to_string(), "sup".to_string(), false),
4474                        ("user_b".to_string(), "can you see this?".to_string(), false),
4475                    ]
4476            })
4477            .await;
4478
4479        // Ensure client A and B can communicate normally after reconnection.
4480        channel_a
4481            .update(cx_a, |channel, cx| {
4482                channel.send_message("you online?".to_string(), cx).unwrap()
4483            })
4484            .await
4485            .unwrap();
4486        channel_b
4487            .condition(&cx_b, |channel, _| {
4488                channel_messages(channel)
4489                    == [
4490                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4491                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4492                        ("user_a".to_string(), "sup".to_string(), false),
4493                        ("user_b".to_string(), "can you see this?".to_string(), false),
4494                        ("user_a".to_string(), "you online?".to_string(), false),
4495                    ]
4496            })
4497            .await;
4498
4499        channel_b
4500            .update(cx_b, |channel, cx| {
4501                channel.send_message("yep".to_string(), cx).unwrap()
4502            })
4503            .await
4504            .unwrap();
4505        channel_a
4506            .condition(&cx_a, |channel, _| {
4507                channel_messages(channel)
4508                    == [
4509                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4510                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4511                        ("user_a".to_string(), "sup".to_string(), false),
4512                        ("user_b".to_string(), "can you see this?".to_string(), false),
4513                        ("user_a".to_string(), "you online?".to_string(), false),
4514                        ("user_b".to_string(), "yep".to_string(), false),
4515                    ]
4516            })
4517            .await;
4518    }
4519
4520    #[gpui::test(iterations = 10)]
4521    async fn test_contacts(
4522        cx_a: &mut TestAppContext,
4523        cx_b: &mut TestAppContext,
4524        cx_c: &mut TestAppContext,
4525    ) {
4526        cx_a.foreground().forbid_parking();
4527        let lang_registry = Arc::new(LanguageRegistry::test());
4528        let fs = FakeFs::new(cx_a.background());
4529
4530        // Connect to a server as 3 clients.
4531        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4532        let client_a = server.create_client(cx_a, "user_a").await;
4533        let client_b = server.create_client(cx_b, "user_b").await;
4534        let client_c = server.create_client(cx_c, "user_c").await;
4535
4536        // Share a worktree as client A.
4537        fs.insert_tree(
4538            "/a",
4539            json!({
4540                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4541            }),
4542        )
4543        .await;
4544
4545        let project_a = cx_a.update(|cx| {
4546            Project::local(
4547                client_a.clone(),
4548                client_a.user_store.clone(),
4549                lang_registry.clone(),
4550                fs.clone(),
4551                cx,
4552            )
4553        });
4554        let (worktree_a, _) = project_a
4555            .update(cx_a, |p, cx| {
4556                p.find_or_create_local_worktree("/a", true, cx)
4557            })
4558            .await
4559            .unwrap();
4560        worktree_a
4561            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4562            .await;
4563
4564        client_a
4565            .user_store
4566            .condition(&cx_a, |user_store, _| {
4567                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4568            })
4569            .await;
4570        client_b
4571            .user_store
4572            .condition(&cx_b, |user_store, _| {
4573                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4574            })
4575            .await;
4576        client_c
4577            .user_store
4578            .condition(&cx_c, |user_store, _| {
4579                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4580            })
4581            .await;
4582
4583        let project_id = project_a
4584            .update(cx_a, |project, _| project.next_remote_id())
4585            .await;
4586        project_a
4587            .update(cx_a, |project, cx| project.share(cx))
4588            .await
4589            .unwrap();
4590        client_a
4591            .user_store
4592            .condition(&cx_a, |user_store, _| {
4593                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4594            })
4595            .await;
4596        client_b
4597            .user_store
4598            .condition(&cx_b, |user_store, _| {
4599                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4600            })
4601            .await;
4602        client_c
4603            .user_store
4604            .condition(&cx_c, |user_store, _| {
4605                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4606            })
4607            .await;
4608
4609        let _project_b = Project::remote(
4610            project_id,
4611            client_b.clone(),
4612            client_b.user_store.clone(),
4613            lang_registry.clone(),
4614            fs.clone(),
4615            &mut cx_b.to_async(),
4616        )
4617        .await
4618        .unwrap();
4619
4620        client_a
4621            .user_store
4622            .condition(&cx_a, |user_store, _| {
4623                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4624            })
4625            .await;
4626        client_b
4627            .user_store
4628            .condition(&cx_b, |user_store, _| {
4629                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4630            })
4631            .await;
4632        client_c
4633            .user_store
4634            .condition(&cx_c, |user_store, _| {
4635                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4636            })
4637            .await;
4638
4639        project_a
4640            .condition(&cx_a, |project, _| {
4641                project.collaborators().contains_key(&client_b.peer_id)
4642            })
4643            .await;
4644
4645        cx_a.update(move |_| drop(project_a));
4646        client_a
4647            .user_store
4648            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4649            .await;
4650        client_b
4651            .user_store
4652            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4653            .await;
4654        client_c
4655            .user_store
4656            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4657            .await;
4658
4659        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
4660            user_store
4661                .contacts()
4662                .iter()
4663                .map(|contact| {
4664                    let worktrees = contact
4665                        .projects
4666                        .iter()
4667                        .map(|p| {
4668                            (
4669                                p.worktree_root_names[0].as_str(),
4670                                p.is_shared,
4671                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4672                            )
4673                        })
4674                        .collect();
4675                    (contact.user.github_login.as_str(), worktrees)
4676                })
4677                .collect()
4678        }
4679    }
4680
4681    #[gpui::test(iterations = 10)]
4682    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4683        cx_a.foreground().forbid_parking();
4684        let fs = FakeFs::new(cx_a.background());
4685
4686        // 2 clients connect to a server.
4687        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4688        let mut client_a = server.create_client(cx_a, "user_a").await;
4689        let mut client_b = server.create_client(cx_b, "user_b").await;
4690        cx_a.update(editor::init);
4691        cx_b.update(editor::init);
4692
4693        // Client A shares a project.
4694        fs.insert_tree(
4695            "/a",
4696            json!({
4697                ".zed.toml": r#"collaborators = ["user_b"]"#,
4698                "1.txt": "one",
4699                "2.txt": "two",
4700                "3.txt": "three",
4701            }),
4702        )
4703        .await;
4704        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4705        project_a
4706            .update(cx_a, |project, cx| project.share(cx))
4707            .await
4708            .unwrap();
4709
4710        // Client B joins the project.
4711        let project_b = client_b
4712            .build_remote_project(
4713                project_a
4714                    .read_with(cx_a, |project, _| project.remote_id())
4715                    .unwrap(),
4716                cx_b,
4717            )
4718            .await;
4719
4720        // Client A opens some editors.
4721        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4722        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4723        let editor_a1 = workspace_a
4724            .update(cx_a, |workspace, cx| {
4725                workspace.open_path((worktree_id, "1.txt"), cx)
4726            })
4727            .await
4728            .unwrap()
4729            .downcast::<Editor>()
4730            .unwrap();
4731        let editor_a2 = workspace_a
4732            .update(cx_a, |workspace, cx| {
4733                workspace.open_path((worktree_id, "2.txt"), cx)
4734            })
4735            .await
4736            .unwrap()
4737            .downcast::<Editor>()
4738            .unwrap();
4739
4740        // Client B opens an editor.
4741        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4742        let editor_b1 = workspace_b
4743            .update(cx_b, |workspace, cx| {
4744                workspace.open_path((worktree_id, "1.txt"), cx)
4745            })
4746            .await
4747            .unwrap()
4748            .downcast::<Editor>()
4749            .unwrap();
4750
4751        let client_a_id = project_b.read_with(cx_b, |project, _| {
4752            project.collaborators().values().next().unwrap().peer_id
4753        });
4754        let client_b_id = project_a.read_with(cx_a, |project, _| {
4755            project.collaborators().values().next().unwrap().peer_id
4756        });
4757
4758        // When client B starts following client A, all visible view states are replicated to client B.
4759        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4760        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4761        workspace_b
4762            .update(cx_b, |workspace, cx| {
4763                workspace
4764                    .toggle_follow(&ToggleFollow(client_a_id), cx)
4765                    .unwrap()
4766            })
4767            .await
4768            .unwrap();
4769        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4770            workspace
4771                .active_item(cx)
4772                .unwrap()
4773                .downcast::<Editor>()
4774                .unwrap()
4775        });
4776        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4777        assert_eq!(
4778            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4779            Some((worktree_id, "2.txt").into())
4780        );
4781        assert_eq!(
4782            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4783            vec![2..3]
4784        );
4785        assert_eq!(
4786            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4787            vec![0..1]
4788        );
4789
4790        // When client A activates a different editor, client B does so as well.
4791        workspace_a.update(cx_a, |workspace, cx| {
4792            workspace.activate_item(&editor_a1, cx)
4793        });
4794        workspace_b
4795            .condition(cx_b, |workspace, cx| {
4796                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4797            })
4798            .await;
4799
4800        // When client A navigates back and forth, client B does so as well.
4801        workspace_a
4802            .update(cx_a, |workspace, cx| {
4803                workspace::Pane::go_back(workspace, None, cx)
4804            })
4805            .await;
4806        workspace_b
4807            .condition(cx_b, |workspace, cx| {
4808                workspace.active_item(cx).unwrap().id() == editor_b2.id()
4809            })
4810            .await;
4811
4812        workspace_a
4813            .update(cx_a, |workspace, cx| {
4814                workspace::Pane::go_forward(workspace, None, cx)
4815            })
4816            .await;
4817        workspace_b
4818            .condition(cx_b, |workspace, cx| {
4819                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4820            })
4821            .await;
4822
4823        // Changes to client A's editor are reflected on client B.
4824        editor_a1.update(cx_a, |editor, cx| {
4825            editor.select_ranges([1..1, 2..2], None, cx);
4826        });
4827        editor_b1
4828            .condition(cx_b, |editor, cx| {
4829                editor.selected_ranges(cx) == vec![1..1, 2..2]
4830            })
4831            .await;
4832
4833        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
4834        editor_b1
4835            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
4836            .await;
4837
4838        editor_a1.update(cx_a, |editor, cx| {
4839            editor.select_ranges([3..3], None, cx);
4840            editor.set_scroll_position(vec2f(0., 100.), cx);
4841        });
4842        editor_b1
4843            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
4844            .await;
4845
4846        // After unfollowing, client B stops receiving updates from client A.
4847        workspace_b.update(cx_b, |workspace, cx| {
4848            workspace.unfollow(&workspace.active_pane().clone(), cx)
4849        });
4850        workspace_a.update(cx_a, |workspace, cx| {
4851            workspace.activate_item(&editor_a2, cx)
4852        });
4853        cx_a.foreground().run_until_parked();
4854        assert_eq!(
4855            workspace_b.read_with(cx_b, |workspace, cx| workspace
4856                .active_item(cx)
4857                .unwrap()
4858                .id()),
4859            editor_b1.id()
4860        );
4861
4862        // Client A starts following client B.
4863        workspace_a
4864            .update(cx_a, |workspace, cx| {
4865                workspace
4866                    .toggle_follow(&ToggleFollow(client_b_id), cx)
4867                    .unwrap()
4868            })
4869            .await
4870            .unwrap();
4871        assert_eq!(
4872            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4873            Some(client_b_id)
4874        );
4875        assert_eq!(
4876            workspace_a.read_with(cx_a, |workspace, cx| workspace
4877                .active_item(cx)
4878                .unwrap()
4879                .id()),
4880            editor_a1.id()
4881        );
4882
4883        // Following interrupts when client B disconnects.
4884        client_b.disconnect(&cx_b.to_async()).unwrap();
4885        cx_a.foreground().run_until_parked();
4886        assert_eq!(
4887            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4888            None
4889        );
4890    }
4891
4892    #[gpui::test(iterations = 10)]
4893    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4894        cx_a.foreground().forbid_parking();
4895        let fs = FakeFs::new(cx_a.background());
4896
4897        // 2 clients connect to a server.
4898        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4899        let mut client_a = server.create_client(cx_a, "user_a").await;
4900        let mut client_b = server.create_client(cx_b, "user_b").await;
4901        cx_a.update(editor::init);
4902        cx_b.update(editor::init);
4903
4904        // Client A shares a project.
4905        fs.insert_tree(
4906            "/a",
4907            json!({
4908                ".zed.toml": r#"collaborators = ["user_b"]"#,
4909                "1.txt": "one",
4910                "2.txt": "two",
4911                "3.txt": "three",
4912                "4.txt": "four",
4913            }),
4914        )
4915        .await;
4916        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4917        project_a
4918            .update(cx_a, |project, cx| project.share(cx))
4919            .await
4920            .unwrap();
4921
4922        // Client B joins the project.
4923        let project_b = client_b
4924            .build_remote_project(
4925                project_a
4926                    .read_with(cx_a, |project, _| project.remote_id())
4927                    .unwrap(),
4928                cx_b,
4929            )
4930            .await;
4931
4932        // Client A opens some editors.
4933        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4934        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4935        let _editor_a1 = workspace_a
4936            .update(cx_a, |workspace, cx| {
4937                workspace.open_path((worktree_id, "1.txt"), cx)
4938            })
4939            .await
4940            .unwrap()
4941            .downcast::<Editor>()
4942            .unwrap();
4943
4944        // Client B opens an editor.
4945        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4946        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
4947        let _editor_b1 = workspace_b
4948            .update(cx_b, |workspace, cx| {
4949                workspace.open_path((worktree_id, "2.txt"), cx)
4950            })
4951            .await
4952            .unwrap()
4953            .downcast::<Editor>()
4954            .unwrap();
4955
4956        // Clients A and B follow each other in split panes
4957        workspace_a
4958            .update(cx_a, |workspace, cx| {
4959                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4960                assert_ne!(*workspace.active_pane(), pane_a1);
4961                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
4962                workspace
4963                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4964                    .unwrap()
4965            })
4966            .await
4967            .unwrap();
4968        workspace_b
4969            .update(cx_b, |workspace, cx| {
4970                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4971                assert_ne!(*workspace.active_pane(), pane_b1);
4972                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
4973                workspace
4974                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4975                    .unwrap()
4976            })
4977            .await
4978            .unwrap();
4979
4980        workspace_a
4981            .update(cx_a, |workspace, cx| {
4982                workspace.activate_next_pane(cx);
4983                assert_eq!(*workspace.active_pane(), pane_a1);
4984                workspace.open_path((worktree_id, "3.txt"), cx)
4985            })
4986            .await
4987            .unwrap();
4988        workspace_b
4989            .update(cx_b, |workspace, cx| {
4990                workspace.activate_next_pane(cx);
4991                assert_eq!(*workspace.active_pane(), pane_b1);
4992                workspace.open_path((worktree_id, "4.txt"), cx)
4993            })
4994            .await
4995            .unwrap();
4996        cx_a.foreground().run_until_parked();
4997
4998        // Ensure leader updates don't change the active pane of followers
4999        workspace_a.read_with(cx_a, |workspace, _| {
5000            assert_eq!(*workspace.active_pane(), pane_a1);
5001        });
5002        workspace_b.read_with(cx_b, |workspace, _| {
5003            assert_eq!(*workspace.active_pane(), pane_b1);
5004        });
5005
5006        // Ensure peers following each other doesn't cause an infinite loop.
5007        assert_eq!(
5008            workspace_a.read_with(cx_a, |workspace, cx| workspace
5009                .active_item(cx)
5010                .unwrap()
5011                .project_path(cx)),
5012            Some((worktree_id, "3.txt").into())
5013        );
5014        workspace_a.update(cx_a, |workspace, cx| {
5015            assert_eq!(
5016                workspace.active_item(cx).unwrap().project_path(cx),
5017                Some((worktree_id, "3.txt").into())
5018            );
5019            workspace.activate_next_pane(cx);
5020            assert_eq!(
5021                workspace.active_item(cx).unwrap().project_path(cx),
5022                Some((worktree_id, "4.txt").into())
5023            );
5024        });
5025        workspace_b.update(cx_b, |workspace, cx| {
5026            assert_eq!(
5027                workspace.active_item(cx).unwrap().project_path(cx),
5028                Some((worktree_id, "4.txt").into())
5029            );
5030            workspace.activate_next_pane(cx);
5031            assert_eq!(
5032                workspace.active_item(cx).unwrap().project_path(cx),
5033                Some((worktree_id, "3.txt").into())
5034            );
5035        });
5036    }
5037
5038    #[gpui::test(iterations = 10)]
5039    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5040        cx_a.foreground().forbid_parking();
5041        let fs = FakeFs::new(cx_a.background());
5042
5043        // 2 clients connect to a server.
5044        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5045        let mut client_a = server.create_client(cx_a, "user_a").await;
5046        let mut client_b = server.create_client(cx_b, "user_b").await;
5047        cx_a.update(editor::init);
5048        cx_b.update(editor::init);
5049
5050        // Client A shares a project.
5051        fs.insert_tree(
5052            "/a",
5053            json!({
5054                ".zed.toml": r#"collaborators = ["user_b"]"#,
5055                "1.txt": "one",
5056                "2.txt": "two",
5057                "3.txt": "three",
5058            }),
5059        )
5060        .await;
5061        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5062        project_a
5063            .update(cx_a, |project, cx| project.share(cx))
5064            .await
5065            .unwrap();
5066
5067        // Client B joins the project.
5068        let project_b = client_b
5069            .build_remote_project(
5070                project_a
5071                    .read_with(cx_a, |project, _| project.remote_id())
5072                    .unwrap(),
5073                cx_b,
5074            )
5075            .await;
5076
5077        // Client A opens some editors.
5078        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5079        let _editor_a1 = workspace_a
5080            .update(cx_a, |workspace, cx| {
5081                workspace.open_path((worktree_id, "1.txt"), cx)
5082            })
5083            .await
5084            .unwrap()
5085            .downcast::<Editor>()
5086            .unwrap();
5087
5088        // Client B starts following client A.
5089        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5090        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5091        let leader_id = project_b.read_with(cx_b, |project, _| {
5092            project.collaborators().values().next().unwrap().peer_id
5093        });
5094        workspace_b
5095            .update(cx_b, |workspace, cx| {
5096                workspace
5097                    .toggle_follow(&ToggleFollow(leader_id), cx)
5098                    .unwrap()
5099            })
5100            .await
5101            .unwrap();
5102        assert_eq!(
5103            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5104            Some(leader_id)
5105        );
5106        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5107            workspace
5108                .active_item(cx)
5109                .unwrap()
5110                .downcast::<Editor>()
5111                .unwrap()
5112        });
5113
5114        // When client B moves, it automatically stops following client A.
5115        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5116        assert_eq!(
5117            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5118            None
5119        );
5120
5121        workspace_b
5122            .update(cx_b, |workspace, cx| {
5123                workspace
5124                    .toggle_follow(&ToggleFollow(leader_id), cx)
5125                    .unwrap()
5126            })
5127            .await
5128            .unwrap();
5129        assert_eq!(
5130            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5131            Some(leader_id)
5132        );
5133
5134        // When client B edits, it automatically stops following client A.
5135        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5136        assert_eq!(
5137            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5138            None
5139        );
5140
5141        workspace_b
5142            .update(cx_b, |workspace, cx| {
5143                workspace
5144                    .toggle_follow(&ToggleFollow(leader_id), cx)
5145                    .unwrap()
5146            })
5147            .await
5148            .unwrap();
5149        assert_eq!(
5150            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5151            Some(leader_id)
5152        );
5153
5154        // When client B scrolls, it automatically stops following client A.
5155        editor_b2.update(cx_b, |editor, cx| {
5156            editor.set_scroll_position(vec2f(0., 3.), cx)
5157        });
5158        assert_eq!(
5159            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5160            None
5161        );
5162
5163        workspace_b
5164            .update(cx_b, |workspace, cx| {
5165                workspace
5166                    .toggle_follow(&ToggleFollow(leader_id), cx)
5167                    .unwrap()
5168            })
5169            .await
5170            .unwrap();
5171        assert_eq!(
5172            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5173            Some(leader_id)
5174        );
5175
5176        // When client B activates a different pane, it continues following client A in the original pane.
5177        workspace_b.update(cx_b, |workspace, cx| {
5178            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5179        });
5180        assert_eq!(
5181            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5182            Some(leader_id)
5183        );
5184
5185        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5186        assert_eq!(
5187            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5188            Some(leader_id)
5189        );
5190
5191        // When client B activates a different item in the original pane, it automatically stops following client A.
5192        workspace_b
5193            .update(cx_b, |workspace, cx| {
5194                workspace.open_path((worktree_id, "2.txt"), cx)
5195            })
5196            .await
5197            .unwrap();
5198        assert_eq!(
5199            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5200            None
5201        );
5202    }
5203
5204    #[gpui::test(iterations = 100)]
5205    async fn test_random_collaboration(
5206        cx: &mut TestAppContext,
5207        deterministic: Arc<Deterministic>,
5208        rng: StdRng,
5209    ) {
5210        cx.foreground().forbid_parking();
5211        let max_peers = env::var("MAX_PEERS")
5212            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5213            .unwrap_or(5);
5214        assert!(max_peers <= 5);
5215
5216        let max_operations = env::var("OPERATIONS")
5217            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5218            .unwrap_or(10);
5219
5220        let rng = Arc::new(Mutex::new(rng));
5221
5222        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5223        let host_language_registry = Arc::new(LanguageRegistry::test());
5224
5225        let fs = FakeFs::new(cx.background());
5226        fs.insert_tree(
5227            "/_collab",
5228            json!({
5229                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5230            }),
5231        )
5232        .await;
5233
5234        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5235        let mut clients = Vec::new();
5236        let mut user_ids = Vec::new();
5237        let mut op_start_signals = Vec::new();
5238        let files = Arc::new(Mutex::new(Vec::new()));
5239
5240        let mut next_entity_id = 100000;
5241        let mut host_cx = TestAppContext::new(
5242            cx.foreground_platform(),
5243            cx.platform(),
5244            deterministic.build_foreground(next_entity_id),
5245            deterministic.build_background(),
5246            cx.font_cache(),
5247            cx.leak_detector(),
5248            next_entity_id,
5249        );
5250        let host = server.create_client(&mut host_cx, "host").await;
5251        let host_project = host_cx.update(|cx| {
5252            Project::local(
5253                host.client.clone(),
5254                host.user_store.clone(),
5255                host_language_registry.clone(),
5256                fs.clone(),
5257                cx,
5258            )
5259        });
5260        let host_project_id = host_project
5261            .update(&mut host_cx, |p, _| p.next_remote_id())
5262            .await;
5263
5264        let (collab_worktree, _) = host_project
5265            .update(&mut host_cx, |project, cx| {
5266                project.find_or_create_local_worktree("/_collab", true, cx)
5267            })
5268            .await
5269            .unwrap();
5270        collab_worktree
5271            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5272            .await;
5273        host_project
5274            .update(&mut host_cx, |project, cx| project.share(cx))
5275            .await
5276            .unwrap();
5277
5278        // Set up fake language servers.
5279        let mut language = Language::new(
5280            LanguageConfig {
5281                name: "Rust".into(),
5282                path_suffixes: vec!["rs".to_string()],
5283                ..Default::default()
5284            },
5285            None,
5286        );
5287        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5288            name: "the-fake-language-server",
5289            capabilities: lsp::LanguageServer::full_capabilities(),
5290            initializer: Some(Box::new({
5291                let rng = rng.clone();
5292                let files = files.clone();
5293                let project = host_project.downgrade();
5294                move |fake_server: &mut FakeLanguageServer| {
5295                    fake_server.handle_request::<lsp::request::Completion, _, _>(
5296                        |_, _| async move {
5297                            Ok(Some(lsp::CompletionResponse::Array(vec![
5298                                lsp::CompletionItem {
5299                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5300                                        range: lsp::Range::new(
5301                                            lsp::Position::new(0, 0),
5302                                            lsp::Position::new(0, 0),
5303                                        ),
5304                                        new_text: "the-new-text".to_string(),
5305                                    })),
5306                                    ..Default::default()
5307                                },
5308                            ])))
5309                        },
5310                    );
5311
5312                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5313                        |_, _| async move {
5314                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5315                                lsp::CodeAction {
5316                                    title: "the-code-action".to_string(),
5317                                    ..Default::default()
5318                                },
5319                            )]))
5320                        },
5321                    );
5322
5323                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5324                        |params, _| async move {
5325                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5326                                params.position,
5327                                params.position,
5328                            ))))
5329                        },
5330                    );
5331
5332                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5333                        let files = files.clone();
5334                        let rng = rng.clone();
5335                        move |_, _| {
5336                            let files = files.clone();
5337                            let rng = rng.clone();
5338                            async move {
5339                                let files = files.lock();
5340                                let mut rng = rng.lock();
5341                                let count = rng.gen_range::<usize, _>(1..3);
5342                                let files = (0..count)
5343                                    .map(|_| files.choose(&mut *rng).unwrap())
5344                                    .collect::<Vec<_>>();
5345                                log::info!("LSP: Returning definitions in files {:?}", &files);
5346                                Ok(Some(lsp::GotoDefinitionResponse::Array(
5347                                    files
5348                                        .into_iter()
5349                                        .map(|file| lsp::Location {
5350                                            uri: lsp::Url::from_file_path(file).unwrap(),
5351                                            range: Default::default(),
5352                                        })
5353                                        .collect(),
5354                                )))
5355                            }
5356                        }
5357                    });
5358
5359                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5360                        let rng = rng.clone();
5361                        let project = project.clone();
5362                        move |params, mut cx| {
5363                            let highlights = if let Some(project) = project.upgrade(&cx) {
5364                                project.update(&mut cx, |project, cx| {
5365                                    let path = params
5366                                        .text_document_position_params
5367                                        .text_document
5368                                        .uri
5369                                        .to_file_path()
5370                                        .unwrap();
5371                                    let (worktree, relative_path) =
5372                                        project.find_local_worktree(&path, cx)?;
5373                                    let project_path =
5374                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
5375                                    let buffer =
5376                                        project.get_open_buffer(&project_path, cx)?.read(cx);
5377
5378                                    let mut highlights = Vec::new();
5379                                    let highlight_count = rng.lock().gen_range(1..=5);
5380                                    let mut prev_end = 0;
5381                                    for _ in 0..highlight_count {
5382                                        let range =
5383                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
5384
5385                                        highlights.push(lsp::DocumentHighlight {
5386                                            range: range_to_lsp(range.to_point_utf16(buffer)),
5387                                            kind: Some(lsp::DocumentHighlightKind::READ),
5388                                        });
5389                                        prev_end = range.end;
5390                                    }
5391                                    Some(highlights)
5392                                })
5393                            } else {
5394                                None
5395                            };
5396                            async move { Ok(highlights) }
5397                        }
5398                    });
5399                }
5400            })),
5401            ..Default::default()
5402        });
5403        host_language_registry.add(Arc::new(language));
5404
5405        let op_start_signal = futures::channel::mpsc::unbounded();
5406        user_ids.push(host.current_user_id(&host_cx));
5407        op_start_signals.push(op_start_signal.0);
5408        clients.push(host_cx.foreground().spawn(host.simulate_host(
5409            host_project,
5410            files,
5411            op_start_signal.1,
5412            rng.clone(),
5413            host_cx,
5414        )));
5415
5416        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5417            rng.lock().gen_range(0..max_operations)
5418        } else {
5419            max_operations
5420        };
5421        let mut available_guests = vec![
5422            "guest-1".to_string(),
5423            "guest-2".to_string(),
5424            "guest-3".to_string(),
5425            "guest-4".to_string(),
5426        ];
5427        let mut operations = 0;
5428        while operations < max_operations {
5429            if operations == disconnect_host_at {
5430                server.disconnect_client(user_ids[0]);
5431                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5432                drop(op_start_signals);
5433                let mut clients = futures::future::join_all(clients).await;
5434                cx.foreground().run_until_parked();
5435
5436                let (host, mut host_cx, host_err) = clients.remove(0);
5437                if let Some(host_err) = host_err {
5438                    log::error!("host error - {}", host_err);
5439                }
5440                host.project
5441                    .as_ref()
5442                    .unwrap()
5443                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5444                for (guest, mut guest_cx, guest_err) in clients {
5445                    if let Some(guest_err) = guest_err {
5446                        log::error!("{} error - {}", guest.username, guest_err);
5447                    }
5448                    let contacts = server
5449                        .store
5450                        .read()
5451                        .await
5452                        .contacts_for_user(guest.current_user_id(&guest_cx));
5453                    assert!(!contacts
5454                        .iter()
5455                        .flat_map(|contact| &contact.projects)
5456                        .any(|project| project.id == host_project_id));
5457                    guest
5458                        .project
5459                        .as_ref()
5460                        .unwrap()
5461                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5462                    guest_cx.update(|_| drop(guest));
5463                }
5464                host_cx.update(|_| drop(host));
5465
5466                return;
5467            }
5468
5469            let distribution = rng.lock().gen_range(0..100);
5470            match distribution {
5471                0..=19 if !available_guests.is_empty() => {
5472                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
5473                    let guest_username = available_guests.remove(guest_ix);
5474                    log::info!("Adding new connection for {}", guest_username);
5475                    next_entity_id += 100000;
5476                    let mut guest_cx = TestAppContext::new(
5477                        cx.foreground_platform(),
5478                        cx.platform(),
5479                        deterministic.build_foreground(next_entity_id),
5480                        deterministic.build_background(),
5481                        cx.font_cache(),
5482                        cx.leak_detector(),
5483                        next_entity_id,
5484                    );
5485                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
5486                    let guest_project = Project::remote(
5487                        host_project_id,
5488                        guest.client.clone(),
5489                        guest.user_store.clone(),
5490                        guest_lang_registry.clone(),
5491                        FakeFs::new(cx.background()),
5492                        &mut guest_cx.to_async(),
5493                    )
5494                    .await
5495                    .unwrap();
5496                    let op_start_signal = futures::channel::mpsc::unbounded();
5497                    user_ids.push(guest.current_user_id(&guest_cx));
5498                    op_start_signals.push(op_start_signal.0);
5499                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5500                        guest_username.clone(),
5501                        guest_project,
5502                        op_start_signal.1,
5503                        rng.clone(),
5504                        guest_cx,
5505                    )));
5506
5507                    log::info!("Added connection for {}", guest_username);
5508                    operations += 1;
5509                }
5510                20..=29 if clients.len() > 1 => {
5511                    log::info!("Removing guest");
5512                    let guest_ix = rng.lock().gen_range(1..clients.len());
5513                    let removed_guest_id = user_ids.remove(guest_ix);
5514                    let guest = clients.remove(guest_ix);
5515                    op_start_signals.remove(guest_ix);
5516                    server.disconnect_client(removed_guest_id);
5517                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5518                    let (guest, mut guest_cx, guest_err) = guest.await;
5519                    if let Some(guest_err) = guest_err {
5520                        log::error!("{} error - {}", guest.username, guest_err);
5521                    }
5522                    guest
5523                        .project
5524                        .as_ref()
5525                        .unwrap()
5526                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5527                    for user_id in &user_ids {
5528                        for contact in server.store.read().await.contacts_for_user(*user_id) {
5529                            assert_ne!(
5530                                contact.user_id, removed_guest_id.0 as u64,
5531                                "removed guest is still a contact of another peer"
5532                            );
5533                            for project in contact.projects {
5534                                for project_guest_id in project.guests {
5535                                    assert_ne!(
5536                                        project_guest_id, removed_guest_id.0 as u64,
5537                                        "removed guest appears as still participating on a project"
5538                                    );
5539                                }
5540                            }
5541                        }
5542                    }
5543
5544                    log::info!("{} removed", guest.username);
5545                    available_guests.push(guest.username.clone());
5546                    guest_cx.update(|_| drop(guest));
5547
5548                    operations += 1;
5549                }
5550                _ => {
5551                    while operations < max_operations && rng.lock().gen_bool(0.7) {
5552                        op_start_signals
5553                            .choose(&mut *rng.lock())
5554                            .unwrap()
5555                            .unbounded_send(())
5556                            .unwrap();
5557                        operations += 1;
5558                    }
5559
5560                    if rng.lock().gen_bool(0.8) {
5561                        cx.foreground().run_until_parked();
5562                    }
5563                }
5564            }
5565        }
5566
5567        drop(op_start_signals);
5568        let mut clients = futures::future::join_all(clients).await;
5569        cx.foreground().run_until_parked();
5570
5571        let (host_client, mut host_cx, host_err) = clients.remove(0);
5572        if let Some(host_err) = host_err {
5573            panic!("host error - {}", host_err);
5574        }
5575        let host_project = host_client.project.as_ref().unwrap();
5576        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5577            project
5578                .worktrees(cx)
5579                .map(|worktree| {
5580                    let snapshot = worktree.read(cx).snapshot();
5581                    (snapshot.id(), snapshot)
5582                })
5583                .collect::<BTreeMap<_, _>>()
5584        });
5585
5586        host_client
5587            .project
5588            .as_ref()
5589            .unwrap()
5590            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5591
5592        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5593            if let Some(guest_err) = guest_err {
5594                panic!("{} error - {}", guest_client.username, guest_err);
5595            }
5596            let worktree_snapshots =
5597                guest_client
5598                    .project
5599                    .as_ref()
5600                    .unwrap()
5601                    .read_with(&guest_cx, |project, cx| {
5602                        project
5603                            .worktrees(cx)
5604                            .map(|worktree| {
5605                                let worktree = worktree.read(cx);
5606                                (worktree.id(), worktree.snapshot())
5607                            })
5608                            .collect::<BTreeMap<_, _>>()
5609                    });
5610
5611            assert_eq!(
5612                worktree_snapshots.keys().collect::<Vec<_>>(),
5613                host_worktree_snapshots.keys().collect::<Vec<_>>(),
5614                "{} has different worktrees than the host",
5615                guest_client.username
5616            );
5617            for (id, host_snapshot) in &host_worktree_snapshots {
5618                let guest_snapshot = &worktree_snapshots[id];
5619                assert_eq!(
5620                    guest_snapshot.root_name(),
5621                    host_snapshot.root_name(),
5622                    "{} has different root name than the host for worktree {}",
5623                    guest_client.username,
5624                    id
5625                );
5626                assert_eq!(
5627                    guest_snapshot.entries(false).collect::<Vec<_>>(),
5628                    host_snapshot.entries(false).collect::<Vec<_>>(),
5629                    "{} has different snapshot than the host for worktree {}",
5630                    guest_client.username,
5631                    id
5632                );
5633            }
5634
5635            guest_client
5636                .project
5637                .as_ref()
5638                .unwrap()
5639                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5640
5641            for guest_buffer in &guest_client.buffers {
5642                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5643                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5644                    project.buffer_for_id(buffer_id, cx).expect(&format!(
5645                        "host does not have buffer for guest:{}, peer:{}, id:{}",
5646                        guest_client.username, guest_client.peer_id, buffer_id
5647                    ))
5648                });
5649                let path = host_buffer
5650                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5651
5652                assert_eq!(
5653                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5654                    0,
5655                    "{}, buffer {}, path {:?} has deferred operations",
5656                    guest_client.username,
5657                    buffer_id,
5658                    path,
5659                );
5660                assert_eq!(
5661                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5662                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5663                    "{}, buffer {}, path {:?}, differs from the host's buffer",
5664                    guest_client.username,
5665                    buffer_id,
5666                    path
5667                );
5668            }
5669
5670            guest_cx.update(|_| drop(guest_client));
5671        }
5672
5673        host_cx.update(|_| drop(host_client));
5674    }
5675
5676    struct TestServer {
5677        peer: Arc<Peer>,
5678        app_state: Arc<AppState>,
5679        server: Arc<Server>,
5680        foreground: Rc<executor::Foreground>,
5681        notifications: mpsc::UnboundedReceiver<()>,
5682        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5683        forbid_connections: Arc<AtomicBool>,
5684        _test_db: TestDb,
5685    }
5686
5687    impl TestServer {
5688        async fn start(
5689            foreground: Rc<executor::Foreground>,
5690            background: Arc<executor::Background>,
5691        ) -> Self {
5692            let test_db = TestDb::fake(background);
5693            let app_state = Self::build_app_state(&test_db).await;
5694            let peer = Peer::new();
5695            let notifications = mpsc::unbounded();
5696            let server = Server::new(app_state.clone(), Some(notifications.0));
5697            Self {
5698                peer,
5699                app_state,
5700                server,
5701                foreground,
5702                notifications: notifications.1,
5703                connection_killers: Default::default(),
5704                forbid_connections: Default::default(),
5705                _test_db: test_db,
5706            }
5707        }
5708
5709        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
5710            cx.update(|cx| {
5711                let settings = Settings::test(cx);
5712                cx.set_global(settings);
5713            });
5714
5715            let http = FakeHttpClient::with_404_response();
5716            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
5717            let client_name = name.to_string();
5718            let mut client = Client::new(http.clone());
5719            let server = self.server.clone();
5720            let connection_killers = self.connection_killers.clone();
5721            let forbid_connections = self.forbid_connections.clone();
5722            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
5723
5724            Arc::get_mut(&mut client)
5725                .unwrap()
5726                .override_authenticate(move |cx| {
5727                    cx.spawn(|_| async move {
5728                        let access_token = "the-token".to_string();
5729                        Ok(Credentials {
5730                            user_id: user_id.0 as u64,
5731                            access_token,
5732                        })
5733                    })
5734                })
5735                .override_establish_connection(move |credentials, cx| {
5736                    assert_eq!(credentials.user_id, user_id.0 as u64);
5737                    assert_eq!(credentials.access_token, "the-token");
5738
5739                    let server = server.clone();
5740                    let connection_killers = connection_killers.clone();
5741                    let forbid_connections = forbid_connections.clone();
5742                    let client_name = client_name.clone();
5743                    let connection_id_tx = connection_id_tx.clone();
5744                    cx.spawn(move |cx| async move {
5745                        if forbid_connections.load(SeqCst) {
5746                            Err(EstablishConnectionError::other(anyhow!(
5747                                "server is forbidding connections"
5748                            )))
5749                        } else {
5750                            let (client_conn, server_conn, killed) =
5751                                Connection::in_memory(cx.background());
5752                            connection_killers.lock().insert(user_id, killed);
5753                            cx.background()
5754                                .spawn(server.handle_connection(
5755                                    server_conn,
5756                                    client_name,
5757                                    user_id,
5758                                    Some(connection_id_tx),
5759                                    cx.background(),
5760                                ))
5761                                .detach();
5762                            Ok(client_conn)
5763                        }
5764                    })
5765                });
5766
5767            client
5768                .authenticate_and_connect(false, &cx.to_async())
5769                .await
5770                .unwrap();
5771
5772            Channel::init(&client);
5773            Project::init(&client);
5774            cx.update(|cx| {
5775                workspace::init(&client, cx);
5776            });
5777
5778            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5779            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5780
5781            let client = TestClient {
5782                client,
5783                peer_id,
5784                username: name.to_string(),
5785                user_store,
5786                language_registry: Arc::new(LanguageRegistry::test()),
5787                project: Default::default(),
5788                buffers: Default::default(),
5789            };
5790            client.wait_for_current_user(cx).await;
5791            client
5792        }
5793
5794        fn disconnect_client(&self, user_id: UserId) {
5795            self.connection_killers
5796                .lock()
5797                .remove(&user_id)
5798                .unwrap()
5799                .store(true, SeqCst);
5800        }
5801
5802        fn forbid_connections(&self) {
5803            self.forbid_connections.store(true, SeqCst);
5804        }
5805
5806        fn allow_connections(&self) {
5807            self.forbid_connections.store(false, SeqCst);
5808        }
5809
5810        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
5811            Arc::new(AppState {
5812                db: test_db.db().clone(),
5813                api_token: Default::default(),
5814            })
5815        }
5816
5817        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
5818            self.server.store.read().await
5819        }
5820
5821        async fn condition<F>(&mut self, mut predicate: F)
5822        where
5823            F: FnMut(&Store) -> bool,
5824        {
5825            assert!(
5826                self.foreground.parking_forbidden(),
5827                "you must call forbid_parking to use server conditions so we don't block indefinitely"
5828            );
5829            while !(predicate)(&*self.server.store.read().await) {
5830                self.foreground.start_waiting();
5831                self.notifications.next().await;
5832                self.foreground.finish_waiting();
5833            }
5834        }
5835    }
5836
5837    impl Deref for TestServer {
5838        type Target = Server;
5839
5840        fn deref(&self) -> &Self::Target {
5841            &self.server
5842        }
5843    }
5844
5845    impl Drop for TestServer {
5846        fn drop(&mut self) {
5847            self.peer.reset();
5848        }
5849    }
5850
5851    struct TestClient {
5852        client: Arc<Client>,
5853        username: String,
5854        pub peer_id: PeerId,
5855        pub user_store: ModelHandle<UserStore>,
5856        language_registry: Arc<LanguageRegistry>,
5857        project: Option<ModelHandle<Project>>,
5858        buffers: HashSet<ModelHandle<language::Buffer>>,
5859    }
5860
5861    impl Deref for TestClient {
5862        type Target = Arc<Client>;
5863
5864        fn deref(&self) -> &Self::Target {
5865            &self.client
5866        }
5867    }
5868
5869    impl TestClient {
5870        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
5871            UserId::from_proto(
5872                self.user_store
5873                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
5874            )
5875        }
5876
5877        async fn wait_for_current_user(&self, cx: &TestAppContext) {
5878            let mut authed_user = self
5879                .user_store
5880                .read_with(cx, |user_store, _| user_store.watch_current_user());
5881            while authed_user.next().await.unwrap().is_none() {}
5882        }
5883
5884        async fn build_local_project(
5885            &mut self,
5886            fs: Arc<FakeFs>,
5887            root_path: impl AsRef<Path>,
5888            cx: &mut TestAppContext,
5889        ) -> (ModelHandle<Project>, WorktreeId) {
5890            let project = cx.update(|cx| {
5891                Project::local(
5892                    self.client.clone(),
5893                    self.user_store.clone(),
5894                    self.language_registry.clone(),
5895                    fs,
5896                    cx,
5897                )
5898            });
5899            self.project = Some(project.clone());
5900            let (worktree, _) = project
5901                .update(cx, |p, cx| {
5902                    p.find_or_create_local_worktree(root_path, true, cx)
5903                })
5904                .await
5905                .unwrap();
5906            worktree
5907                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
5908                .await;
5909            project
5910                .update(cx, |project, _| project.next_remote_id())
5911                .await;
5912            (project, worktree.read_with(cx, |tree, _| tree.id()))
5913        }
5914
5915        async fn build_remote_project(
5916            &mut self,
5917            project_id: u64,
5918            cx: &mut TestAppContext,
5919        ) -> ModelHandle<Project> {
5920            let project = Project::remote(
5921                project_id,
5922                self.client.clone(),
5923                self.user_store.clone(),
5924                self.language_registry.clone(),
5925                FakeFs::new(cx.background()),
5926                &mut cx.to_async(),
5927            )
5928            .await
5929            .unwrap();
5930            self.project = Some(project.clone());
5931            project
5932        }
5933
5934        fn build_workspace(
5935            &self,
5936            project: &ModelHandle<Project>,
5937            cx: &mut TestAppContext,
5938        ) -> ViewHandle<Workspace> {
5939            let (window_id, _) = cx.add_window(|_| EmptyView);
5940            cx.add_view(window_id, |cx| {
5941                let fs = project.read(cx).fs().clone();
5942                Workspace::new(
5943                    &WorkspaceParams {
5944                        fs,
5945                        project: project.clone(),
5946                        user_store: self.user_store.clone(),
5947                        languages: self.language_registry.clone(),
5948                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
5949                        channel_list: cx.add_model(|cx| {
5950                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
5951                        }),
5952                        client: self.client.clone(),
5953                    },
5954                    cx,
5955                )
5956            })
5957        }
5958
5959        async fn simulate_host(
5960            mut self,
5961            project: ModelHandle<Project>,
5962            files: Arc<Mutex<Vec<PathBuf>>>,
5963            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5964            rng: Arc<Mutex<StdRng>>,
5965            mut cx: TestAppContext,
5966        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
5967            async fn simulate_host_internal(
5968                client: &mut TestClient,
5969                project: ModelHandle<Project>,
5970                files: Arc<Mutex<Vec<PathBuf>>>,
5971                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5972                rng: Arc<Mutex<StdRng>>,
5973                cx: &mut TestAppContext,
5974            ) -> anyhow::Result<()> {
5975                let fs = project.read_with(cx, |project, _| project.fs().clone());
5976
5977                while op_start_signal.next().await.is_some() {
5978                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
5979                    match distribution {
5980                        0..=20 if !files.lock().is_empty() => {
5981                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5982                            let mut path = path.as_path();
5983                            while let Some(parent_path) = path.parent() {
5984                                path = parent_path;
5985                                if rng.lock().gen() {
5986                                    break;
5987                                }
5988                            }
5989
5990                            log::info!("Host: find/create local worktree {:?}", path);
5991                            let find_or_create_worktree = project.update(cx, |project, cx| {
5992                                project.find_or_create_local_worktree(path, true, cx)
5993                            });
5994                            if rng.lock().gen() {
5995                                cx.background().spawn(find_or_create_worktree).detach();
5996                            } else {
5997                                find_or_create_worktree.await?;
5998                            }
5999                        }
6000                        10..=80 if !files.lock().is_empty() => {
6001                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6002                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6003                                let (worktree, path) = project
6004                                    .update(cx, |project, cx| {
6005                                        project.find_or_create_local_worktree(
6006                                            file.clone(),
6007                                            true,
6008                                            cx,
6009                                        )
6010                                    })
6011                                    .await?;
6012                                let project_path =
6013                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6014                                log::info!(
6015                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6016                                    file,
6017                                    project_path.0,
6018                                    project_path.1
6019                                );
6020                                let buffer = project
6021                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6022                                    .await
6023                                    .unwrap();
6024                                client.buffers.insert(buffer.clone());
6025                                buffer
6026                            } else {
6027                                client
6028                                    .buffers
6029                                    .iter()
6030                                    .choose(&mut *rng.lock())
6031                                    .unwrap()
6032                                    .clone()
6033                            };
6034
6035                            if rng.lock().gen_bool(0.1) {
6036                                cx.update(|cx| {
6037                                    log::info!(
6038                                        "Host: dropping buffer {:?}",
6039                                        buffer.read(cx).file().unwrap().full_path(cx)
6040                                    );
6041                                    client.buffers.remove(&buffer);
6042                                    drop(buffer);
6043                                });
6044                            } else {
6045                                buffer.update(cx, |buffer, cx| {
6046                                    log::info!(
6047                                        "Host: updating buffer {:?} ({})",
6048                                        buffer.file().unwrap().full_path(cx),
6049                                        buffer.remote_id()
6050                                    );
6051
6052                                    if rng.lock().gen_bool(0.7) {
6053                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6054                                    } else {
6055                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6056                                    }
6057                                });
6058                            }
6059                        }
6060                        _ => loop {
6061                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6062                            let mut path = PathBuf::new();
6063                            path.push("/");
6064                            for _ in 0..path_component_count {
6065                                let letter = rng.lock().gen_range(b'a'..=b'z');
6066                                path.push(std::str::from_utf8(&[letter]).unwrap());
6067                            }
6068                            path.set_extension("rs");
6069                            let parent_path = path.parent().unwrap();
6070
6071                            log::info!("Host: creating file {:?}", path,);
6072
6073                            if fs.create_dir(&parent_path).await.is_ok()
6074                                && fs.create_file(&path, Default::default()).await.is_ok()
6075                            {
6076                                files.lock().push(path);
6077                                break;
6078                            } else {
6079                                log::info!("Host: cannot create file");
6080                            }
6081                        },
6082                    }
6083
6084                    cx.background().simulate_random_delay().await;
6085                }
6086
6087                Ok(())
6088            }
6089
6090            let result = simulate_host_internal(
6091                &mut self,
6092                project.clone(),
6093                files,
6094                op_start_signal,
6095                rng,
6096                &mut cx,
6097            )
6098            .await;
6099            log::info!("Host done");
6100            self.project = Some(project);
6101            (self, cx, result.err())
6102        }
6103
6104        pub async fn simulate_guest(
6105            mut self,
6106            guest_username: String,
6107            project: ModelHandle<Project>,
6108            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6109            rng: Arc<Mutex<StdRng>>,
6110            mut cx: TestAppContext,
6111        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6112            async fn simulate_guest_internal(
6113                client: &mut TestClient,
6114                guest_username: &str,
6115                project: ModelHandle<Project>,
6116                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6117                rng: Arc<Mutex<StdRng>>,
6118                cx: &mut TestAppContext,
6119            ) -> anyhow::Result<()> {
6120                while op_start_signal.next().await.is_some() {
6121                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6122                        let worktree = if let Some(worktree) =
6123                            project.read_with(cx, |project, cx| {
6124                                project
6125                                    .worktrees(&cx)
6126                                    .filter(|worktree| {
6127                                        let worktree = worktree.read(cx);
6128                                        worktree.is_visible()
6129                                            && worktree.entries(false).any(|e| e.is_file())
6130                                    })
6131                                    .choose(&mut *rng.lock())
6132                            }) {
6133                            worktree
6134                        } else {
6135                            cx.background().simulate_random_delay().await;
6136                            continue;
6137                        };
6138
6139                        let (worktree_root_name, project_path) =
6140                            worktree.read_with(cx, |worktree, _| {
6141                                let entry = worktree
6142                                    .entries(false)
6143                                    .filter(|e| e.is_file())
6144                                    .choose(&mut *rng.lock())
6145                                    .unwrap();
6146                                (
6147                                    worktree.root_name().to_string(),
6148                                    (worktree.id(), entry.path.clone()),
6149                                )
6150                            });
6151                        log::info!(
6152                            "{}: opening path {:?} in worktree {} ({})",
6153                            guest_username,
6154                            project_path.1,
6155                            project_path.0,
6156                            worktree_root_name,
6157                        );
6158                        let buffer = project
6159                            .update(cx, |project, cx| {
6160                                project.open_buffer(project_path.clone(), cx)
6161                            })
6162                            .await?;
6163                        log::info!(
6164                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6165                            guest_username,
6166                            project_path.1,
6167                            project_path.0,
6168                            worktree_root_name,
6169                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6170                        );
6171                        client.buffers.insert(buffer.clone());
6172                        buffer
6173                    } else {
6174                        client
6175                            .buffers
6176                            .iter()
6177                            .choose(&mut *rng.lock())
6178                            .unwrap()
6179                            .clone()
6180                    };
6181
6182                    let choice = rng.lock().gen_range(0..100);
6183                    match choice {
6184                        0..=9 => {
6185                            cx.update(|cx| {
6186                                log::info!(
6187                                    "{}: dropping buffer {:?}",
6188                                    guest_username,
6189                                    buffer.read(cx).file().unwrap().full_path(cx)
6190                                );
6191                                client.buffers.remove(&buffer);
6192                                drop(buffer);
6193                            });
6194                        }
6195                        10..=19 => {
6196                            let completions = project.update(cx, |project, cx| {
6197                                log::info!(
6198                                    "{}: requesting completions for buffer {} ({:?})",
6199                                    guest_username,
6200                                    buffer.read(cx).remote_id(),
6201                                    buffer.read(cx).file().unwrap().full_path(cx)
6202                                );
6203                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6204                                project.completions(&buffer, offset, cx)
6205                            });
6206                            let completions = cx.background().spawn(async move {
6207                                completions
6208                                    .await
6209                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6210                            });
6211                            if rng.lock().gen_bool(0.3) {
6212                                log::info!("{}: detaching completions request", guest_username);
6213                                cx.update(|cx| completions.detach_and_log_err(cx));
6214                            } else {
6215                                completions.await?;
6216                            }
6217                        }
6218                        20..=29 => {
6219                            let code_actions = project.update(cx, |project, cx| {
6220                                log::info!(
6221                                    "{}: requesting code actions for buffer {} ({:?})",
6222                                    guest_username,
6223                                    buffer.read(cx).remote_id(),
6224                                    buffer.read(cx).file().unwrap().full_path(cx)
6225                                );
6226                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6227                                project.code_actions(&buffer, range, cx)
6228                            });
6229                            let code_actions = cx.background().spawn(async move {
6230                                code_actions.await.map_err(|err| {
6231                                    anyhow!("code actions request failed: {:?}", err)
6232                                })
6233                            });
6234                            if rng.lock().gen_bool(0.3) {
6235                                log::info!("{}: detaching code actions request", guest_username);
6236                                cx.update(|cx| code_actions.detach_and_log_err(cx));
6237                            } else {
6238                                code_actions.await?;
6239                            }
6240                        }
6241                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6242                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6243                                log::info!(
6244                                    "{}: saving buffer {} ({:?})",
6245                                    guest_username,
6246                                    buffer.remote_id(),
6247                                    buffer.file().unwrap().full_path(cx)
6248                                );
6249                                (buffer.version(), buffer.save(cx))
6250                            });
6251                            let save = cx.background().spawn(async move {
6252                                let (saved_version, _) = save
6253                                    .await
6254                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6255                                assert!(saved_version.observed_all(&requested_version));
6256                                Ok::<_, anyhow::Error>(())
6257                            });
6258                            if rng.lock().gen_bool(0.3) {
6259                                log::info!("{}: detaching save request", guest_username);
6260                                cx.update(|cx| save.detach_and_log_err(cx));
6261                            } else {
6262                                save.await?;
6263                            }
6264                        }
6265                        40..=44 => {
6266                            let prepare_rename = project.update(cx, |project, cx| {
6267                                log::info!(
6268                                    "{}: preparing rename for buffer {} ({:?})",
6269                                    guest_username,
6270                                    buffer.read(cx).remote_id(),
6271                                    buffer.read(cx).file().unwrap().full_path(cx)
6272                                );
6273                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6274                                project.prepare_rename(buffer, offset, cx)
6275                            });
6276                            let prepare_rename = cx.background().spawn(async move {
6277                                prepare_rename.await.map_err(|err| {
6278                                    anyhow!("prepare rename request failed: {:?}", err)
6279                                })
6280                            });
6281                            if rng.lock().gen_bool(0.3) {
6282                                log::info!("{}: detaching prepare rename request", guest_username);
6283                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6284                            } else {
6285                                prepare_rename.await?;
6286                            }
6287                        }
6288                        45..=49 => {
6289                            let definitions = project.update(cx, |project, cx| {
6290                                log::info!(
6291                                    "{}: requesting definitions for buffer {} ({:?})",
6292                                    guest_username,
6293                                    buffer.read(cx).remote_id(),
6294                                    buffer.read(cx).file().unwrap().full_path(cx)
6295                                );
6296                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6297                                project.definition(&buffer, offset, cx)
6298                            });
6299                            let definitions = cx.background().spawn(async move {
6300                                definitions
6301                                    .await
6302                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6303                            });
6304                            if rng.lock().gen_bool(0.3) {
6305                                log::info!("{}: detaching definitions request", guest_username);
6306                                cx.update(|cx| definitions.detach_and_log_err(cx));
6307                            } else {
6308                                client
6309                                    .buffers
6310                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6311                            }
6312                        }
6313                        50..=54 => {
6314                            let highlights = project.update(cx, |project, cx| {
6315                                log::info!(
6316                                    "{}: requesting highlights for buffer {} ({:?})",
6317                                    guest_username,
6318                                    buffer.read(cx).remote_id(),
6319                                    buffer.read(cx).file().unwrap().full_path(cx)
6320                                );
6321                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6322                                project.document_highlights(&buffer, offset, cx)
6323                            });
6324                            let highlights = cx.background().spawn(async move {
6325                                highlights
6326                                    .await
6327                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6328                            });
6329                            if rng.lock().gen_bool(0.3) {
6330                                log::info!("{}: detaching highlights request", guest_username);
6331                                cx.update(|cx| highlights.detach_and_log_err(cx));
6332                            } else {
6333                                highlights.await?;
6334                            }
6335                        }
6336                        55..=59 => {
6337                            let search = project.update(cx, |project, cx| {
6338                                let query = rng.lock().gen_range('a'..='z');
6339                                log::info!("{}: project-wide search {:?}", guest_username, query);
6340                                project.search(SearchQuery::text(query, false, false), cx)
6341                            });
6342                            let search = cx.background().spawn(async move {
6343                                search
6344                                    .await
6345                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
6346                            });
6347                            if rng.lock().gen_bool(0.3) {
6348                                log::info!("{}: detaching search request", guest_username);
6349                                cx.update(|cx| search.detach_and_log_err(cx));
6350                            } else {
6351                                client.buffers.extend(search.await?.into_keys());
6352                            }
6353                        }
6354                        _ => {
6355                            buffer.update(cx, |buffer, cx| {
6356                                log::info!(
6357                                    "{}: updating buffer {} ({:?})",
6358                                    guest_username,
6359                                    buffer.remote_id(),
6360                                    buffer.file().unwrap().full_path(cx)
6361                                );
6362                                if rng.lock().gen_bool(0.7) {
6363                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6364                                } else {
6365                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6366                                }
6367                            });
6368                        }
6369                    }
6370                    cx.background().simulate_random_delay().await;
6371                }
6372                Ok(())
6373            }
6374
6375            let result = simulate_guest_internal(
6376                &mut self,
6377                &guest_username,
6378                project.clone(),
6379                op_start_signal,
6380                rng,
6381                &mut cx,
6382            )
6383            .await;
6384            log::info!("{}: done", guest_username);
6385
6386            self.project = Some(project);
6387            (self, cx, result.err())
6388        }
6389    }
6390
6391    impl Drop for TestClient {
6392        fn drop(&mut self) {
6393            self.client.tear_down();
6394        }
6395    }
6396
6397    impl Executor for Arc<gpui::executor::Background> {
6398        type Sleep = gpui::executor::Timer;
6399
6400        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6401            self.spawn(future).detach();
6402        }
6403
6404        fn sleep(&self, duration: Duration) -> Self::Sleep {
6405            self.as_ref().timer(duration)
6406        }
6407    }
6408
6409    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6410        channel
6411            .messages()
6412            .cursor::<()>()
6413            .map(|m| {
6414                (
6415                    m.sender.github_login.clone(),
6416                    m.body.clone(),
6417                    m.is_pending(),
6418                )
6419            })
6420            .collect()
6421    }
6422
6423    struct EmptyView;
6424
6425    impl gpui::Entity for EmptyView {
6426        type Event = ();
6427    }
6428
6429    impl gpui::View for EmptyView {
6430        fn ui_name() -> &'static str {
6431            "empty view"
6432        }
6433
6434        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6435            gpui::Element::boxed(gpui::elements::Empty)
6436        }
6437    }
6438}