rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::{IntoResponse, Response},
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::{HashMap, HashSet};
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use log::{as_debug, as_display};
  29use rpc::{
  30    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  31    Connection, ConnectionId, Peer, TypedEnvelope,
  32};
  33use std::{
  34    any::TypeId,
  35    future::Future,
  36    marker::PhantomData,
  37    net::SocketAddr,
  38    ops::{Deref, DerefMut},
  39    rc::Rc,
  40    sync::Arc,
  41    time::{Duration, Instant},
  42};
  43use store::{Store, Worktree};
  44use time::OffsetDateTime;
  45use tokio::{
  46    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  47    time::Sleep,
  48};
  49use tower::ServiceBuilder;
  50use util::ResultExt;
  51
  52type MessageHandler = Box<
  53    dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, Result<()>>,
  54>;
  55
  56pub struct Server {
  57    peer: Arc<Peer>,
  58    store: RwLock<Store>,
  59    app_state: Arc<AppState>,
  60    handlers: HashMap<TypeId, MessageHandler>,
  61    notifications: Option<mpsc::UnboundedSender<()>>,
  62}
  63
  64pub trait Executor: Send + Clone {
  65    type Sleep: Send + Future;
  66    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  67    fn sleep(&self, duration: Duration) -> Self::Sleep;
  68}
  69
  70#[derive(Clone)]
  71pub struct RealExecutor;
  72
  73const MESSAGE_COUNT_PER_PAGE: usize = 100;
  74const MAX_MESSAGE_LEN: usize = 1024;
  75
  76struct StoreReadGuard<'a> {
  77    guard: RwLockReadGuard<'a, Store>,
  78    _not_send: PhantomData<Rc<()>>,
  79}
  80
  81struct StoreWriteGuard<'a> {
  82    guard: RwLockWriteGuard<'a, Store>,
  83    _not_send: PhantomData<Rc<()>>,
  84}
  85
  86impl Server {
  87    pub fn new(
  88        app_state: Arc<AppState>,
  89        notifications: Option<mpsc::UnboundedSender<()>>,
  90    ) -> Arc<Self> {
  91        let mut server = Self {
  92            peer: Peer::new(),
  93            app_state,
  94            store: Default::default(),
  95            handlers: Default::default(),
  96            notifications,
  97        };
  98
  99        server
 100            .add_request_handler(Server::ping)
 101            .add_request_handler(Server::register_project)
 102            .add_message_handler(Server::unregister_project)
 103            .add_request_handler(Server::share_project)
 104            .add_message_handler(Server::unshare_project)
 105            .add_sync_request_handler(Server::join_project)
 106            .add_message_handler(Server::leave_project)
 107            .add_request_handler(Server::register_worktree)
 108            .add_message_handler(Server::unregister_worktree)
 109            .add_request_handler(Server::update_worktree)
 110            .add_message_handler(Server::start_language_server)
 111            .add_message_handler(Server::update_language_server)
 112            .add_message_handler(Server::update_diagnostic_summary)
 113            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 114            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 115            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 116            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 117            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 118            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 119            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 120            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 121            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 122            .add_request_handler(
 123                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 124            )
 125            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 126            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 127            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 128            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 129            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 130            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 131            .add_request_handler(Server::update_buffer)
 132            .add_message_handler(Server::update_buffer_file)
 133            .add_message_handler(Server::buffer_reloaded)
 134            .add_message_handler(Server::buffer_saved)
 135            .add_request_handler(Server::save_buffer)
 136            .add_request_handler(Server::get_channels)
 137            .add_request_handler(Server::get_users)
 138            .add_request_handler(Server::join_channel)
 139            .add_message_handler(Server::leave_channel)
 140            .add_request_handler(Server::send_channel_message)
 141            .add_request_handler(Server::follow)
 142            .add_message_handler(Server::unfollow)
 143            .add_message_handler(Server::update_followers)
 144            .add_request_handler(Server::get_channel_messages);
 145
 146        Arc::new(server)
 147    }
 148
 149    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 150    where
 151        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 152        Fut: 'static + Send + Future<Output = Result<()>>,
 153        M: EnvelopedMessage,
 154    {
 155        let prev_handler = self.handlers.insert(
 156            TypeId::of::<M>(),
 157            Box::new(move |server, envelope| {
 158                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 159                (handler)(server, *envelope).boxed()
 160            }),
 161        );
 162        if prev_handler.is_some() {
 163            panic!("registered a handler for the same message twice");
 164        }
 165        self
 166    }
 167
 168    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 169    where
 170        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 171        Fut: 'static + Send + Future<Output = Result<M::Response>>,
 172        M: RequestMessage,
 173    {
 174        self.add_message_handler(move |server, envelope| {
 175            let receipt = envelope.receipt();
 176            let response = (handler)(server.clone(), envelope);
 177            async move {
 178                match response.await {
 179                    Ok(response) => {
 180                        server.peer.respond(receipt, response)?;
 181                        Ok(())
 182                    }
 183                    Err(error) => {
 184                        server.peer.respond_with_error(
 185                            receipt,
 186                            proto::Error {
 187                                message: error.to_string(),
 188                            },
 189                        )?;
 190                        Err(error)
 191                    }
 192                }
 193            }
 194        })
 195    }
 196
 197    /// Handle a request while holding a lock to the store. This is useful when we're registering
 198    /// a connection but we want to respond on the connection before anybody else can send on it.
 199    fn add_sync_request_handler<F, M>(&mut self, handler: F) -> &mut Self
 200    where
 201        F: 'static
 202            + Send
 203            + Sync
 204            + Fn(Arc<Self>, &mut Store, TypedEnvelope<M>) -> Result<M::Response>,
 205        M: RequestMessage,
 206    {
 207        let handler = Arc::new(handler);
 208        self.add_message_handler(move |server, envelope| {
 209            let receipt = envelope.receipt();
 210            let handler = handler.clone();
 211            async move {
 212                let mut store = server.store.write().await;
 213                let response = (handler)(server.clone(), &mut *store, envelope);
 214                match response {
 215                    Ok(response) => {
 216                        server.peer.respond(receipt, response)?;
 217                        Ok(())
 218                    }
 219                    Err(error) => {
 220                        server.peer.respond_with_error(
 221                            receipt,
 222                            proto::Error {
 223                                message: error.to_string(),
 224                            },
 225                        )?;
 226                        Err(error)
 227                    }
 228                }
 229            }
 230        })
 231    }
 232
 233    pub fn handle_connection<E: Executor>(
 234        self: &Arc<Self>,
 235        connection: Connection,
 236        addr: String,
 237        user_id: UserId,
 238        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 239        executor: E,
 240    ) -> impl Future<Output = ()> {
 241        let mut this = self.clone();
 242        async move {
 243            let (connection_id, handle_io, mut incoming_rx) = this
 244                .peer
 245                .add_connection(connection, {
 246                    let executor = executor.clone();
 247                    move |duration| {
 248                        let timer = executor.sleep(duration);
 249                        async move {
 250                            timer.await;
 251                        }
 252                    }
 253                })
 254                .await;
 255
 256            if let Some(send_connection_id) = send_connection_id.as_mut() {
 257                let _ = send_connection_id.send(connection_id).await;
 258            }
 259
 260            {
 261                let mut state = this.state_mut().await;
 262                state.add_connection(connection_id, user_id);
 263                this.update_contacts_for_users(&*state, &[user_id]);
 264            }
 265
 266            let handle_io = handle_io.fuse();
 267            futures::pin_mut!(handle_io);
 268            loop {
 269                let next_message = incoming_rx.next().fuse();
 270                futures::pin_mut!(next_message);
 271                futures::select_biased! {
 272                    result = handle_io => {
 273                        if let Err(err) = result {
 274                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 275                        }
 276                        break;
 277                    }
 278                    message = next_message => {
 279                        if let Some(message) = message {
 280                            let start_time = Instant::now();
 281                            let type_name = message.payload_type_name();
 282                            log::info!(connection_id = connection_id.0, type_name = type_name; "rpc message received");
 283                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 284                                let notifications = this.notifications.clone();
 285                                let is_background = message.is_background();
 286                                let handle_message = (handler)(this.clone(), message);
 287                                let handle_message = async move {
 288                                    if let Err(err) = handle_message.await {
 289                                        log::error!(connection_id = connection_id.0, type = type_name, error = as_display!(err); "rpc message error");
 290                                    } else {
 291                                        log::info!(connection_id = connection_id.0, type = type_name, duration = as_debug!(start_time.elapsed()); "rpc message handled");
 292                                    }
 293                                    if let Some(mut notifications) = notifications {
 294                                        let _ = notifications.send(()).await;
 295                                    }
 296                                };
 297                                if is_background {
 298                                    executor.spawn_detached(handle_message);
 299                                } else {
 300                                    handle_message.await;
 301                                }
 302                            } else {
 303                                log::warn!("unhandled message: {}", type_name);
 304                            }
 305                        } else {
 306                            log::info!(address = as_debug!(addr); "rpc connection closed");
 307                            break;
 308                        }
 309                    }
 310                }
 311            }
 312
 313            if let Err(err) = this.sign_out(connection_id).await {
 314                log::error!("error signing out connection {:?} - {:?}", addr, err);
 315            }
 316        }
 317    }
 318
 319    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 320        self.peer.disconnect(connection_id);
 321        let mut state = self.state_mut().await;
 322        let removed_connection = state.remove_connection(connection_id)?;
 323
 324        for (project_id, project) in removed_connection.hosted_projects {
 325            if let Some(share) = project.share {
 326                broadcast(
 327                    connection_id,
 328                    share.guests.keys().copied().collect(),
 329                    |conn_id| {
 330                        self.peer
 331                            .send(conn_id, proto::UnshareProject { project_id })
 332                    },
 333                );
 334            }
 335        }
 336
 337        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 338            broadcast(connection_id, peer_ids, |conn_id| {
 339                self.peer.send(
 340                    conn_id,
 341                    proto::RemoveProjectCollaborator {
 342                        project_id,
 343                        peer_id: connection_id.0,
 344                    },
 345                )
 346            });
 347        }
 348
 349        self.update_contacts_for_users(&*state, removed_connection.contact_ids.iter());
 350        Ok(())
 351    }
 352
 353    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> Result<proto::Ack> {
 354        Ok(proto::Ack {})
 355    }
 356
 357    async fn register_project(
 358        self: Arc<Server>,
 359        request: TypedEnvelope<proto::RegisterProject>,
 360    ) -> Result<proto::RegisterProjectResponse> {
 361        let project_id = {
 362            let mut state = self.state_mut().await;
 363            let user_id = state.user_id_for_connection(request.sender_id)?;
 364            state.register_project(request.sender_id, user_id)
 365        };
 366        Ok(proto::RegisterProjectResponse { project_id })
 367    }
 368
 369    async fn unregister_project(
 370        self: Arc<Server>,
 371        request: TypedEnvelope<proto::UnregisterProject>,
 372    ) -> Result<()> {
 373        let mut state = self.state_mut().await;
 374        let project = state.unregister_project(request.payload.project_id, request.sender_id)?;
 375        self.update_contacts_for_users(&*state, &project.authorized_user_ids());
 376        Ok(())
 377    }
 378
 379    async fn share_project(
 380        self: Arc<Server>,
 381        request: TypedEnvelope<proto::ShareProject>,
 382    ) -> Result<proto::Ack> {
 383        let mut state = self.state_mut().await;
 384        let project = state.share_project(request.payload.project_id, request.sender_id)?;
 385        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 386        Ok(proto::Ack {})
 387    }
 388
 389    async fn unshare_project(
 390        self: Arc<Server>,
 391        request: TypedEnvelope<proto::UnshareProject>,
 392    ) -> Result<()> {
 393        let project_id = request.payload.project_id;
 394        let mut state = self.state_mut().await;
 395        let project = state.unshare_project(project_id, request.sender_id)?;
 396        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 397            self.peer
 398                .send(conn_id, proto::UnshareProject { project_id })
 399        });
 400        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 401        Ok(())
 402    }
 403
 404    fn join_project(
 405        self: Arc<Server>,
 406        state: &mut Store,
 407        request: TypedEnvelope<proto::JoinProject>,
 408    ) -> Result<proto::JoinProjectResponse> {
 409        let project_id = request.payload.project_id;
 410
 411        let user_id = state.user_id_for_connection(request.sender_id)?;
 412        let (response, connection_ids, contact_user_ids) = state
 413            .join_project(request.sender_id, user_id, project_id)
 414            .and_then(|joined| {
 415                let share = joined.project.share()?;
 416                let peer_count = share.guests.len();
 417                let mut collaborators = Vec::with_capacity(peer_count);
 418                collaborators.push(proto::Collaborator {
 419                    peer_id: joined.project.host_connection_id.0,
 420                    replica_id: 0,
 421                    user_id: joined.project.host_user_id.to_proto(),
 422                });
 423                let worktrees = share
 424                    .worktrees
 425                    .iter()
 426                    .filter_map(|(id, shared_worktree)| {
 427                        let worktree = joined.project.worktrees.get(&id)?;
 428                        Some(proto::Worktree {
 429                            id: *id,
 430                            root_name: worktree.root_name.clone(),
 431                            entries: shared_worktree.entries.values().cloned().collect(),
 432                            diagnostic_summaries: shared_worktree
 433                                .diagnostic_summaries
 434                                .values()
 435                                .cloned()
 436                                .collect(),
 437                            visible: worktree.visible,
 438                        })
 439                    })
 440                    .collect();
 441                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 442                    if *peer_conn_id != request.sender_id {
 443                        collaborators.push(proto::Collaborator {
 444                            peer_id: peer_conn_id.0,
 445                            replica_id: *peer_replica_id as u32,
 446                            user_id: peer_user_id.to_proto(),
 447                        });
 448                    }
 449                }
 450                let response = proto::JoinProjectResponse {
 451                    worktrees,
 452                    replica_id: joined.replica_id as u32,
 453                    collaborators,
 454                    language_servers: joined.project.language_servers.clone(),
 455                };
 456                let connection_ids = joined.project.connection_ids();
 457                let contact_user_ids = joined.project.authorized_user_ids();
 458                Ok((response, connection_ids, contact_user_ids))
 459            })?;
 460
 461        broadcast(request.sender_id, connection_ids, |conn_id| {
 462            self.peer.send(
 463                conn_id,
 464                proto::AddProjectCollaborator {
 465                    project_id,
 466                    collaborator: Some(proto::Collaborator {
 467                        peer_id: request.sender_id.0,
 468                        replica_id: response.replica_id,
 469                        user_id: user_id.to_proto(),
 470                    }),
 471                },
 472            )
 473        });
 474        self.update_contacts_for_users(state, &contact_user_ids);
 475        Ok(response)
 476    }
 477
 478    async fn leave_project(
 479        self: Arc<Server>,
 480        request: TypedEnvelope<proto::LeaveProject>,
 481    ) -> Result<()> {
 482        let sender_id = request.sender_id;
 483        let project_id = request.payload.project_id;
 484        let mut state = self.state_mut().await;
 485        let worktree = state.leave_project(sender_id, project_id)?;
 486        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 487            self.peer.send(
 488                conn_id,
 489                proto::RemoveProjectCollaborator {
 490                    project_id,
 491                    peer_id: sender_id.0,
 492                },
 493            )
 494        });
 495        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 496        Ok(())
 497    }
 498
 499    async fn register_worktree(
 500        self: Arc<Server>,
 501        request: TypedEnvelope<proto::RegisterWorktree>,
 502    ) -> Result<proto::Ack> {
 503        let mut contact_user_ids = HashSet::default();
 504        for github_login in &request.payload.authorized_logins {
 505            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 506            contact_user_ids.insert(contact_user_id);
 507        }
 508
 509        let mut state = self.state_mut().await;
 510        let host_user_id = state.user_id_for_connection(request.sender_id)?;
 511        contact_user_ids.insert(host_user_id);
 512
 513        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 514        let guest_connection_ids = state
 515            .read_project(request.payload.project_id, request.sender_id)?
 516            .guest_connection_ids();
 517        state.register_worktree(
 518            request.payload.project_id,
 519            request.payload.worktree_id,
 520            request.sender_id,
 521            Worktree {
 522                authorized_user_ids: contact_user_ids.clone(),
 523                root_name: request.payload.root_name.clone(),
 524                visible: request.payload.visible,
 525            },
 526        )?;
 527
 528        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 529            self.peer
 530                .forward_send(request.sender_id, connection_id, request.payload.clone())
 531        });
 532        self.update_contacts_for_users(&*state, &contact_user_ids);
 533        Ok(proto::Ack {})
 534    }
 535
 536    async fn unregister_worktree(
 537        self: Arc<Server>,
 538        request: TypedEnvelope<proto::UnregisterWorktree>,
 539    ) -> Result<()> {
 540        let project_id = request.payload.project_id;
 541        let worktree_id = request.payload.worktree_id;
 542        let mut state = self.state_mut().await;
 543        let (worktree, guest_connection_ids) =
 544            state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 545        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 546            self.peer.send(
 547                conn_id,
 548                proto::UnregisterWorktree {
 549                    project_id,
 550                    worktree_id,
 551                },
 552            )
 553        });
 554        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 555        Ok(())
 556    }
 557
 558    async fn update_worktree(
 559        self: Arc<Server>,
 560        request: TypedEnvelope<proto::UpdateWorktree>,
 561    ) -> Result<proto::Ack> {
 562        let connection_ids = self.state_mut().await.update_worktree(
 563            request.sender_id,
 564            request.payload.project_id,
 565            request.payload.worktree_id,
 566            &request.payload.removed_entries,
 567            &request.payload.updated_entries,
 568        )?;
 569
 570        broadcast(request.sender_id, connection_ids, |connection_id| {
 571            self.peer
 572                .forward_send(request.sender_id, connection_id, request.payload.clone())
 573        });
 574
 575        Ok(proto::Ack {})
 576    }
 577
 578    async fn update_diagnostic_summary(
 579        self: Arc<Server>,
 580        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 581    ) -> Result<()> {
 582        let summary = request
 583            .payload
 584            .summary
 585            .clone()
 586            .ok_or_else(|| anyhow!("invalid summary"))?;
 587        let receiver_ids = self.state_mut().await.update_diagnostic_summary(
 588            request.payload.project_id,
 589            request.payload.worktree_id,
 590            request.sender_id,
 591            summary,
 592        )?;
 593
 594        broadcast(request.sender_id, receiver_ids, |connection_id| {
 595            self.peer
 596                .forward_send(request.sender_id, connection_id, request.payload.clone())
 597        });
 598        Ok(())
 599    }
 600
 601    async fn start_language_server(
 602        self: Arc<Server>,
 603        request: TypedEnvelope<proto::StartLanguageServer>,
 604    ) -> Result<()> {
 605        let receiver_ids = self.state_mut().await.start_language_server(
 606            request.payload.project_id,
 607            request.sender_id,
 608            request
 609                .payload
 610                .server
 611                .clone()
 612                .ok_or_else(|| anyhow!("invalid language server"))?,
 613        )?;
 614        broadcast(request.sender_id, receiver_ids, |connection_id| {
 615            self.peer
 616                .forward_send(request.sender_id, connection_id, request.payload.clone())
 617        });
 618        Ok(())
 619    }
 620
 621    async fn update_language_server(
 622        self: Arc<Server>,
 623        request: TypedEnvelope<proto::UpdateLanguageServer>,
 624    ) -> Result<()> {
 625        let receiver_ids = self
 626            .state()
 627            .await
 628            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 629        broadcast(request.sender_id, receiver_ids, |connection_id| {
 630            self.peer
 631                .forward_send(request.sender_id, connection_id, request.payload.clone())
 632        });
 633        Ok(())
 634    }
 635
 636    async fn forward_project_request<T>(
 637        self: Arc<Server>,
 638        request: TypedEnvelope<T>,
 639    ) -> Result<T::Response>
 640    where
 641        T: EntityMessage + RequestMessage,
 642    {
 643        let host_connection_id = self
 644            .state()
 645            .await
 646            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 647            .host_connection_id;
 648        Ok(self
 649            .peer
 650            .forward_request(request.sender_id, host_connection_id, request.payload)
 651            .await?)
 652    }
 653
 654    async fn save_buffer(
 655        self: Arc<Server>,
 656        request: TypedEnvelope<proto::SaveBuffer>,
 657    ) -> Result<proto::BufferSaved> {
 658        let host = self
 659            .state()
 660            .await
 661            .read_project(request.payload.project_id, request.sender_id)?
 662            .host_connection_id;
 663        let response = self
 664            .peer
 665            .forward_request(request.sender_id, host, request.payload.clone())
 666            .await?;
 667
 668        let mut guests = self
 669            .state()
 670            .await
 671            .read_project(request.payload.project_id, request.sender_id)?
 672            .connection_ids();
 673        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 674        broadcast(host, guests, |conn_id| {
 675            self.peer.forward_send(host, conn_id, response.clone())
 676        });
 677
 678        Ok(response)
 679    }
 680
 681    async fn update_buffer(
 682        self: Arc<Server>,
 683        request: TypedEnvelope<proto::UpdateBuffer>,
 684    ) -> Result<proto::Ack> {
 685        let receiver_ids = self
 686            .state()
 687            .await
 688            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 689        broadcast(request.sender_id, receiver_ids, |connection_id| {
 690            self.peer
 691                .forward_send(request.sender_id, connection_id, request.payload.clone())
 692        });
 693        Ok(proto::Ack {})
 694    }
 695
 696    async fn update_buffer_file(
 697        self: Arc<Server>,
 698        request: TypedEnvelope<proto::UpdateBufferFile>,
 699    ) -> Result<()> {
 700        let receiver_ids = self
 701            .state()
 702            .await
 703            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 704        broadcast(request.sender_id, receiver_ids, |connection_id| {
 705            self.peer
 706                .forward_send(request.sender_id, connection_id, request.payload.clone())
 707        });
 708        Ok(())
 709    }
 710
 711    async fn buffer_reloaded(
 712        self: Arc<Server>,
 713        request: TypedEnvelope<proto::BufferReloaded>,
 714    ) -> Result<()> {
 715        let receiver_ids = self
 716            .state()
 717            .await
 718            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 719        broadcast(request.sender_id, receiver_ids, |connection_id| {
 720            self.peer
 721                .forward_send(request.sender_id, connection_id, request.payload.clone())
 722        });
 723        Ok(())
 724    }
 725
 726    async fn buffer_saved(
 727        self: Arc<Server>,
 728        request: TypedEnvelope<proto::BufferSaved>,
 729    ) -> Result<()> {
 730        let receiver_ids = self
 731            .state()
 732            .await
 733            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 734        broadcast(request.sender_id, receiver_ids, |connection_id| {
 735            self.peer
 736                .forward_send(request.sender_id, connection_id, request.payload.clone())
 737        });
 738        Ok(())
 739    }
 740
 741    async fn follow(
 742        self: Arc<Self>,
 743        request: TypedEnvelope<proto::Follow>,
 744    ) -> Result<proto::FollowResponse> {
 745        let leader_id = ConnectionId(request.payload.leader_id);
 746        let follower_id = request.sender_id;
 747        if !self
 748            .state()
 749            .await
 750            .project_connection_ids(request.payload.project_id, follower_id)?
 751            .contains(&leader_id)
 752        {
 753            Err(anyhow!("no such peer"))?;
 754        }
 755        let mut response = self
 756            .peer
 757            .forward_request(request.sender_id, leader_id, request.payload)
 758            .await?;
 759        response
 760            .views
 761            .retain(|view| view.leader_id != Some(follower_id.0));
 762        Ok(response)
 763    }
 764
 765    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 766        let leader_id = ConnectionId(request.payload.leader_id);
 767        if !self
 768            .state()
 769            .await
 770            .project_connection_ids(request.payload.project_id, request.sender_id)?
 771            .contains(&leader_id)
 772        {
 773            Err(anyhow!("no such peer"))?;
 774        }
 775        self.peer
 776            .forward_send(request.sender_id, leader_id, request.payload)?;
 777        Ok(())
 778    }
 779
 780    async fn update_followers(
 781        self: Arc<Self>,
 782        request: TypedEnvelope<proto::UpdateFollowers>,
 783    ) -> Result<()> {
 784        let connection_ids = self
 785            .state()
 786            .await
 787            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 788        let leader_id = request
 789            .payload
 790            .variant
 791            .as_ref()
 792            .and_then(|variant| match variant {
 793                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 794                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 795                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 796            });
 797        for follower_id in &request.payload.follower_ids {
 798            let follower_id = ConnectionId(*follower_id);
 799            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 800                self.peer
 801                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 802            }
 803        }
 804        Ok(())
 805    }
 806
 807    async fn get_channels(
 808        self: Arc<Server>,
 809        request: TypedEnvelope<proto::GetChannels>,
 810    ) -> Result<proto::GetChannelsResponse> {
 811        let user_id = self
 812            .state()
 813            .await
 814            .user_id_for_connection(request.sender_id)?;
 815        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 816        Ok(proto::GetChannelsResponse {
 817            channels: channels
 818                .into_iter()
 819                .map(|chan| proto::Channel {
 820                    id: chan.id.to_proto(),
 821                    name: chan.name,
 822                })
 823                .collect(),
 824        })
 825    }
 826
 827    async fn get_users(
 828        self: Arc<Server>,
 829        request: TypedEnvelope<proto::GetUsers>,
 830    ) -> Result<proto::GetUsersResponse> {
 831        let user_ids = request
 832            .payload
 833            .user_ids
 834            .into_iter()
 835            .map(UserId::from_proto)
 836            .collect();
 837        let users = self
 838            .app_state
 839            .db
 840            .get_users_by_ids(user_ids)
 841            .await?
 842            .into_iter()
 843            .map(|user| proto::User {
 844                id: user.id.to_proto(),
 845                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 846                github_login: user.github_login,
 847            })
 848            .collect();
 849        Ok(proto::GetUsersResponse { users })
 850    }
 851
 852    fn update_contacts_for_users<'a>(
 853        self: &Arc<Self>,
 854        state: &Store,
 855        user_ids: impl IntoIterator<Item = &'a UserId>,
 856    ) {
 857        for user_id in user_ids {
 858            let contacts = state.contacts_for_user(*user_id);
 859            for connection_id in state.connection_ids_for_user(*user_id) {
 860                self.peer
 861                    .send(
 862                        connection_id,
 863                        proto::UpdateContacts {
 864                            contacts: contacts.clone(),
 865                        },
 866                    )
 867                    .log_err();
 868            }
 869        }
 870    }
 871
 872    async fn join_channel(
 873        self: Arc<Self>,
 874        request: TypedEnvelope<proto::JoinChannel>,
 875    ) -> Result<proto::JoinChannelResponse> {
 876        let user_id = self
 877            .state()
 878            .await
 879            .user_id_for_connection(request.sender_id)?;
 880        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 881        if !self
 882            .app_state
 883            .db
 884            .can_user_access_channel(user_id, channel_id)
 885            .await?
 886        {
 887            Err(anyhow!("access denied"))?;
 888        }
 889
 890        self.state_mut()
 891            .await
 892            .join_channel(request.sender_id, channel_id);
 893        let messages = self
 894            .app_state
 895            .db
 896            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 897            .await?
 898            .into_iter()
 899            .map(|msg| proto::ChannelMessage {
 900                id: msg.id.to_proto(),
 901                body: msg.body,
 902                timestamp: msg.sent_at.unix_timestamp() as u64,
 903                sender_id: msg.sender_id.to_proto(),
 904                nonce: Some(msg.nonce.as_u128().into()),
 905            })
 906            .collect::<Vec<_>>();
 907        Ok(proto::JoinChannelResponse {
 908            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 909            messages,
 910        })
 911    }
 912
 913    async fn leave_channel(
 914        self: Arc<Self>,
 915        request: TypedEnvelope<proto::LeaveChannel>,
 916    ) -> Result<()> {
 917        let user_id = self
 918            .state()
 919            .await
 920            .user_id_for_connection(request.sender_id)?;
 921        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 922        if !self
 923            .app_state
 924            .db
 925            .can_user_access_channel(user_id, channel_id)
 926            .await?
 927        {
 928            Err(anyhow!("access denied"))?;
 929        }
 930
 931        self.state_mut()
 932            .await
 933            .leave_channel(request.sender_id, channel_id);
 934
 935        Ok(())
 936    }
 937
 938    async fn send_channel_message(
 939        self: Arc<Self>,
 940        request: TypedEnvelope<proto::SendChannelMessage>,
 941    ) -> Result<proto::SendChannelMessageResponse> {
 942        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 943        let user_id;
 944        let connection_ids;
 945        {
 946            let state = self.state().await;
 947            user_id = state.user_id_for_connection(request.sender_id)?;
 948            connection_ids = state.channel_connection_ids(channel_id)?;
 949        }
 950
 951        // Validate the message body.
 952        let body = request.payload.body.trim().to_string();
 953        if body.len() > MAX_MESSAGE_LEN {
 954            return Err(anyhow!("message is too long"))?;
 955        }
 956        if body.is_empty() {
 957            return Err(anyhow!("message can't be blank"))?;
 958        }
 959
 960        let timestamp = OffsetDateTime::now_utc();
 961        let nonce = request
 962            .payload
 963            .nonce
 964            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 965
 966        let message_id = self
 967            .app_state
 968            .db
 969            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 970            .await?
 971            .to_proto();
 972        let message = proto::ChannelMessage {
 973            sender_id: user_id.to_proto(),
 974            id: message_id,
 975            body,
 976            timestamp: timestamp.unix_timestamp() as u64,
 977            nonce: Some(nonce),
 978        };
 979        broadcast(request.sender_id, connection_ids, |conn_id| {
 980            self.peer.send(
 981                conn_id,
 982                proto::ChannelMessageSent {
 983                    channel_id: channel_id.to_proto(),
 984                    message: Some(message.clone()),
 985                },
 986            )
 987        });
 988        Ok(proto::SendChannelMessageResponse {
 989            message: Some(message),
 990        })
 991    }
 992
 993    async fn get_channel_messages(
 994        self: Arc<Self>,
 995        request: TypedEnvelope<proto::GetChannelMessages>,
 996    ) -> Result<proto::GetChannelMessagesResponse> {
 997        let user_id = self
 998            .state()
 999            .await
1000            .user_id_for_connection(request.sender_id)?;
1001        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1002        if !self
1003            .app_state
1004            .db
1005            .can_user_access_channel(user_id, channel_id)
1006            .await?
1007        {
1008            Err(anyhow!("access denied"))?;
1009        }
1010
1011        let messages = self
1012            .app_state
1013            .db
1014            .get_channel_messages(
1015                channel_id,
1016                MESSAGE_COUNT_PER_PAGE,
1017                Some(MessageId::from_proto(request.payload.before_message_id)),
1018            )
1019            .await?
1020            .into_iter()
1021            .map(|msg| proto::ChannelMessage {
1022                id: msg.id.to_proto(),
1023                body: msg.body,
1024                timestamp: msg.sent_at.unix_timestamp() as u64,
1025                sender_id: msg.sender_id.to_proto(),
1026                nonce: Some(msg.nonce.as_u128().into()),
1027            })
1028            .collect::<Vec<_>>();
1029
1030        Ok(proto::GetChannelMessagesResponse {
1031            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1032            messages,
1033        })
1034    }
1035
1036    async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1037        #[cfg(test)]
1038        tokio::task::yield_now().await;
1039        let guard = self.store.read().await;
1040        #[cfg(test)]
1041        tokio::task::yield_now().await;
1042        StoreReadGuard {
1043            guard,
1044            _not_send: PhantomData,
1045        }
1046    }
1047
1048    async fn state_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1049        #[cfg(test)]
1050        tokio::task::yield_now().await;
1051        let guard = self.store.write().await;
1052        #[cfg(test)]
1053        tokio::task::yield_now().await;
1054        StoreWriteGuard {
1055            guard,
1056            _not_send: PhantomData,
1057        }
1058    }
1059}
1060
1061impl<'a> Deref for StoreReadGuard<'a> {
1062    type Target = Store;
1063
1064    fn deref(&self) -> &Self::Target {
1065        &*self.guard
1066    }
1067}
1068
1069impl<'a> Deref for StoreWriteGuard<'a> {
1070    type Target = Store;
1071
1072    fn deref(&self) -> &Self::Target {
1073        &*self.guard
1074    }
1075}
1076
1077impl<'a> DerefMut for StoreWriteGuard<'a> {
1078    fn deref_mut(&mut self) -> &mut Self::Target {
1079        &mut *self.guard
1080    }
1081}
1082
1083impl<'a> Drop for StoreWriteGuard<'a> {
1084    fn drop(&mut self) {
1085        #[cfg(test)]
1086        self.check_invariants();
1087    }
1088}
1089
1090impl Executor for RealExecutor {
1091    type Sleep = Sleep;
1092
1093    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1094        tokio::task::spawn(future);
1095    }
1096
1097    fn sleep(&self, duration: Duration) -> Self::Sleep {
1098        tokio::time::sleep(duration)
1099    }
1100}
1101
1102fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1103where
1104    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1105{
1106    for receiver_id in receiver_ids {
1107        if receiver_id != sender_id {
1108            f(receiver_id).log_err();
1109        }
1110    }
1111}
1112
1113lazy_static! {
1114    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1115}
1116
1117pub struct ProtocolVersion(u32);
1118
1119impl Header for ProtocolVersion {
1120    fn name() -> &'static HeaderName {
1121        &ZED_PROTOCOL_VERSION
1122    }
1123
1124    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1125    where
1126        Self: Sized,
1127        I: Iterator<Item = &'i axum::http::HeaderValue>,
1128    {
1129        let version = values
1130            .next()
1131            .ok_or_else(|| axum::headers::Error::invalid())?
1132            .to_str()
1133            .map_err(|_| axum::headers::Error::invalid())?
1134            .parse()
1135            .map_err(|_| axum::headers::Error::invalid())?;
1136        Ok(Self(version))
1137    }
1138
1139    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1140        values.extend([self.0.to_string().parse().unwrap()]);
1141    }
1142}
1143
1144pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1145    let server = Server::new(app_state.clone(), None);
1146    Router::new()
1147        .route("/rpc", get(handle_websocket_request))
1148        .layer(
1149            ServiceBuilder::new()
1150                .layer(Extension(app_state))
1151                .layer(middleware::from_fn(auth::validate_header))
1152                .layer(Extension(server)),
1153        )
1154}
1155
1156pub async fn handle_websocket_request(
1157    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1158    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1159    Extension(server): Extension<Arc<Server>>,
1160    Extension(user_id): Extension<UserId>,
1161    ws: WebSocketUpgrade,
1162) -> Response {
1163    if protocol_version != rpc::PROTOCOL_VERSION {
1164        return (
1165            StatusCode::UPGRADE_REQUIRED,
1166            "client must be upgraded".to_string(),
1167        )
1168            .into_response();
1169    }
1170    let socket_address = socket_address.to_string();
1171    ws.on_upgrade(move |socket| {
1172        let socket = socket
1173            .map_ok(to_tungstenite_message)
1174            .err_into()
1175            .with(|message| async move { Ok(to_axum_message(message)) });
1176        let connection = Connection::new(Box::pin(socket));
1177        server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
1178    })
1179}
1180
1181fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1182    match message {
1183        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1184        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1185        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1186        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1187        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1188            code: frame.code.into(),
1189            reason: frame.reason,
1190        })),
1191    }
1192}
1193
1194fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1195    match message {
1196        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1197        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1198        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1199        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1200        AxumMessage::Close(frame) => {
1201            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1202                code: frame.code.into(),
1203                reason: frame.reason,
1204            }))
1205        }
1206    }
1207}
1208
1209#[cfg(test)]
1210mod tests {
1211    use super::*;
1212    use crate::{
1213        db::{tests::TestDb, UserId},
1214        AppState,
1215    };
1216    use ::rpc::Peer;
1217    use client::{
1218        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1219        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1220    };
1221    use collections::BTreeMap;
1222    use editor::{
1223        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1224        ToOffset, ToggleCodeActions, Undo,
1225    };
1226    use gpui::{
1227        executor::{self, Deterministic},
1228        geometry::vector::vec2f,
1229        ModelHandle, TestAppContext, ViewHandle,
1230    };
1231    use language::{
1232        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1233        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1234    };
1235    use lsp::{self, FakeLanguageServer};
1236    use parking_lot::Mutex;
1237    use project::{
1238        fs::{FakeFs, Fs as _},
1239        search::SearchQuery,
1240        worktree::WorktreeHandle,
1241        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1242    };
1243    use rand::prelude::*;
1244    use rpc::PeerId;
1245    use serde_json::json;
1246    use settings::Settings;
1247    use sqlx::types::time::OffsetDateTime;
1248    use std::{
1249        env,
1250        ops::Deref,
1251        path::{Path, PathBuf},
1252        rc::Rc,
1253        sync::{
1254            atomic::{AtomicBool, Ordering::SeqCst},
1255            Arc,
1256        },
1257        time::Duration,
1258    };
1259    use theme::ThemeRegistry;
1260    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1261
1262    #[cfg(test)]
1263    #[ctor::ctor]
1264    fn init_logger() {
1265        if std::env::var("RUST_LOG").is_ok() {
1266            env_logger::init();
1267        }
1268    }
1269
1270    #[gpui::test(iterations = 10)]
1271    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1272        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1273        let lang_registry = Arc::new(LanguageRegistry::test());
1274        let fs = FakeFs::new(cx_a.background());
1275        cx_a.foreground().forbid_parking();
1276
1277        // Connect to a server as 2 clients.
1278        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1279        let client_a = server.create_client(cx_a, "user_a").await;
1280        let client_b = server.create_client(cx_b, "user_b").await;
1281
1282        // Share a project as client A
1283        fs.insert_tree(
1284            "/a",
1285            json!({
1286                ".zed.toml": r#"collaborators = ["user_b"]"#,
1287                "a.txt": "a-contents",
1288                "b.txt": "b-contents",
1289            }),
1290        )
1291        .await;
1292        let project_a = cx_a.update(|cx| {
1293            Project::local(
1294                client_a.clone(),
1295                client_a.user_store.clone(),
1296                lang_registry.clone(),
1297                fs.clone(),
1298                cx,
1299            )
1300        });
1301        let (worktree_a, _) = project_a
1302            .update(cx_a, |p, cx| {
1303                p.find_or_create_local_worktree("/a", true, cx)
1304            })
1305            .await
1306            .unwrap();
1307        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1308        worktree_a
1309            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1310            .await;
1311        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1312        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1313
1314        // Join that project as client B
1315        let project_b = Project::remote(
1316            project_id,
1317            client_b.clone(),
1318            client_b.user_store.clone(),
1319            lang_registry.clone(),
1320            fs.clone(),
1321            &mut cx_b.to_async(),
1322        )
1323        .await
1324        .unwrap();
1325
1326        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1327            assert_eq!(
1328                project
1329                    .collaborators()
1330                    .get(&client_a.peer_id)
1331                    .unwrap()
1332                    .user
1333                    .github_login,
1334                "user_a"
1335            );
1336            project.replica_id()
1337        });
1338        project_a
1339            .condition(&cx_a, |tree, _| {
1340                tree.collaborators()
1341                    .get(&client_b.peer_id)
1342                    .map_or(false, |collaborator| {
1343                        collaborator.replica_id == replica_id_b
1344                            && collaborator.user.github_login == "user_b"
1345                    })
1346            })
1347            .await;
1348
1349        // Open the same file as client B and client A.
1350        let buffer_b = project_b
1351            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1352            .await
1353            .unwrap();
1354        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1355        project_a.read_with(cx_a, |project, cx| {
1356            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1357        });
1358        let buffer_a = project_a
1359            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1360            .await
1361            .unwrap();
1362
1363        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1364
1365        // TODO
1366        // // Create a selection set as client B and see that selection set as client A.
1367        // buffer_a
1368        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1369        //     .await;
1370
1371        // Edit the buffer as client B and see that edit as client A.
1372        editor_b.update(cx_b, |editor, cx| {
1373            editor.handle_input(&Input("ok, ".into()), cx)
1374        });
1375        buffer_a
1376            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1377            .await;
1378
1379        // TODO
1380        // // Remove the selection set as client B, see those selections disappear as client A.
1381        cx_b.update(move |_| drop(editor_b));
1382        // buffer_a
1383        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1384        //     .await;
1385
1386        // Dropping the client B's project removes client B from client A's collaborators.
1387        cx_b.update(move |_| drop(project_b));
1388        project_a
1389            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1390            .await;
1391    }
1392
1393    #[gpui::test(iterations = 10)]
1394    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1395        let lang_registry = Arc::new(LanguageRegistry::test());
1396        let fs = FakeFs::new(cx_a.background());
1397        cx_a.foreground().forbid_parking();
1398
1399        // Connect to a server as 2 clients.
1400        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1401        let client_a = server.create_client(cx_a, "user_a").await;
1402        let client_b = server.create_client(cx_b, "user_b").await;
1403
1404        // Share a project as client A
1405        fs.insert_tree(
1406            "/a",
1407            json!({
1408                ".zed.toml": r#"collaborators = ["user_b"]"#,
1409                "a.txt": "a-contents",
1410                "b.txt": "b-contents",
1411            }),
1412        )
1413        .await;
1414        let project_a = cx_a.update(|cx| {
1415            Project::local(
1416                client_a.clone(),
1417                client_a.user_store.clone(),
1418                lang_registry.clone(),
1419                fs.clone(),
1420                cx,
1421            )
1422        });
1423        let (worktree_a, _) = project_a
1424            .update(cx_a, |p, cx| {
1425                p.find_or_create_local_worktree("/a", true, cx)
1426            })
1427            .await
1428            .unwrap();
1429        worktree_a
1430            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1431            .await;
1432        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1433        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1434        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1435        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1436
1437        // Join that project as client B
1438        let project_b = Project::remote(
1439            project_id,
1440            client_b.clone(),
1441            client_b.user_store.clone(),
1442            lang_registry.clone(),
1443            fs.clone(),
1444            &mut cx_b.to_async(),
1445        )
1446        .await
1447        .unwrap();
1448        project_b
1449            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1450            .await
1451            .unwrap();
1452
1453        // Unshare the project as client A
1454        project_a.update(cx_a, |project, cx| project.unshare(cx));
1455        project_b
1456            .condition(cx_b, |project, _| project.is_read_only())
1457            .await;
1458        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1459        cx_b.update(|_| {
1460            drop(project_b);
1461        });
1462
1463        // Share the project again and ensure guests can still join.
1464        project_a
1465            .update(cx_a, |project, cx| project.share(cx))
1466            .await
1467            .unwrap();
1468        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1469
1470        let project_b2 = Project::remote(
1471            project_id,
1472            client_b.clone(),
1473            client_b.user_store.clone(),
1474            lang_registry.clone(),
1475            fs.clone(),
1476            &mut cx_b.to_async(),
1477        )
1478        .await
1479        .unwrap();
1480        project_b2
1481            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1482            .await
1483            .unwrap();
1484    }
1485
1486    #[gpui::test(iterations = 10)]
1487    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1488        let lang_registry = Arc::new(LanguageRegistry::test());
1489        let fs = FakeFs::new(cx_a.background());
1490        cx_a.foreground().forbid_parking();
1491
1492        // Connect to a server as 2 clients.
1493        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1494        let client_a = server.create_client(cx_a, "user_a").await;
1495        let client_b = server.create_client(cx_b, "user_b").await;
1496
1497        // Share a project as client A
1498        fs.insert_tree(
1499            "/a",
1500            json!({
1501                ".zed.toml": r#"collaborators = ["user_b"]"#,
1502                "a.txt": "a-contents",
1503                "b.txt": "b-contents",
1504            }),
1505        )
1506        .await;
1507        let project_a = cx_a.update(|cx| {
1508            Project::local(
1509                client_a.clone(),
1510                client_a.user_store.clone(),
1511                lang_registry.clone(),
1512                fs.clone(),
1513                cx,
1514            )
1515        });
1516        let (worktree_a, _) = project_a
1517            .update(cx_a, |p, cx| {
1518                p.find_or_create_local_worktree("/a", true, cx)
1519            })
1520            .await
1521            .unwrap();
1522        worktree_a
1523            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1524            .await;
1525        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1526        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1527        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1528        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1529
1530        // Join that project as client B
1531        let project_b = Project::remote(
1532            project_id,
1533            client_b.clone(),
1534            client_b.user_store.clone(),
1535            lang_registry.clone(),
1536            fs.clone(),
1537            &mut cx_b.to_async(),
1538        )
1539        .await
1540        .unwrap();
1541        project_b
1542            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1543            .await
1544            .unwrap();
1545
1546        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1547        server.disconnect_client(client_a.current_user_id(cx_a));
1548        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1549        project_a
1550            .condition(cx_a, |project, _| project.collaborators().is_empty())
1551            .await;
1552        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1553        project_b
1554            .condition(cx_b, |project, _| project.is_read_only())
1555            .await;
1556        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1557        cx_b.update(|_| {
1558            drop(project_b);
1559        });
1560
1561        // Await reconnection
1562        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1563
1564        // Share the project again and ensure guests can still join.
1565        project_a
1566            .update(cx_a, |project, cx| project.share(cx))
1567            .await
1568            .unwrap();
1569        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1570
1571        let project_b2 = Project::remote(
1572            project_id,
1573            client_b.clone(),
1574            client_b.user_store.clone(),
1575            lang_registry.clone(),
1576            fs.clone(),
1577            &mut cx_b.to_async(),
1578        )
1579        .await
1580        .unwrap();
1581        project_b2
1582            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1583            .await
1584            .unwrap();
1585    }
1586
1587    #[gpui::test(iterations = 10)]
1588    async fn test_propagate_saves_and_fs_changes(
1589        cx_a: &mut TestAppContext,
1590        cx_b: &mut TestAppContext,
1591        cx_c: &mut TestAppContext,
1592    ) {
1593        let lang_registry = Arc::new(LanguageRegistry::test());
1594        let fs = FakeFs::new(cx_a.background());
1595        cx_a.foreground().forbid_parking();
1596
1597        // Connect to a server as 3 clients.
1598        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1599        let client_a = server.create_client(cx_a, "user_a").await;
1600        let client_b = server.create_client(cx_b, "user_b").await;
1601        let client_c = server.create_client(cx_c, "user_c").await;
1602
1603        // Share a worktree as client A.
1604        fs.insert_tree(
1605            "/a",
1606            json!({
1607                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1608                "file1": "",
1609                "file2": ""
1610            }),
1611        )
1612        .await;
1613        let project_a = cx_a.update(|cx| {
1614            Project::local(
1615                client_a.clone(),
1616                client_a.user_store.clone(),
1617                lang_registry.clone(),
1618                fs.clone(),
1619                cx,
1620            )
1621        });
1622        let (worktree_a, _) = project_a
1623            .update(cx_a, |p, cx| {
1624                p.find_or_create_local_worktree("/a", true, cx)
1625            })
1626            .await
1627            .unwrap();
1628        worktree_a
1629            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1630            .await;
1631        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1632        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1633        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1634
1635        // Join that worktree as clients B and C.
1636        let project_b = Project::remote(
1637            project_id,
1638            client_b.clone(),
1639            client_b.user_store.clone(),
1640            lang_registry.clone(),
1641            fs.clone(),
1642            &mut cx_b.to_async(),
1643        )
1644        .await
1645        .unwrap();
1646        let project_c = Project::remote(
1647            project_id,
1648            client_c.clone(),
1649            client_c.user_store.clone(),
1650            lang_registry.clone(),
1651            fs.clone(),
1652            &mut cx_c.to_async(),
1653        )
1654        .await
1655        .unwrap();
1656        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1657        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1658
1659        // Open and edit a buffer as both guests B and C.
1660        let buffer_b = project_b
1661            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1662            .await
1663            .unwrap();
1664        let buffer_c = project_c
1665            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1666            .await
1667            .unwrap();
1668        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1669        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1670
1671        // Open and edit that buffer as the host.
1672        let buffer_a = project_a
1673            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1674            .await
1675            .unwrap();
1676
1677        buffer_a
1678            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1679            .await;
1680        buffer_a.update(cx_a, |buf, cx| {
1681            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1682        });
1683
1684        // Wait for edits to propagate
1685        buffer_a
1686            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1687            .await;
1688        buffer_b
1689            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1690            .await;
1691        buffer_c
1692            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1693            .await;
1694
1695        // Edit the buffer as the host and concurrently save as guest B.
1696        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1697        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1698        save_b.await.unwrap();
1699        assert_eq!(
1700            fs.load("/a/file1".as_ref()).await.unwrap(),
1701            "hi-a, i-am-c, i-am-b, i-am-a"
1702        );
1703        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1704        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1705        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1706
1707        worktree_a.flush_fs_events(cx_a).await;
1708
1709        // Make changes on host's file system, see those changes on guest worktrees.
1710        fs.rename(
1711            "/a/file1".as_ref(),
1712            "/a/file1-renamed".as_ref(),
1713            Default::default(),
1714        )
1715        .await
1716        .unwrap();
1717
1718        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1719            .await
1720            .unwrap();
1721        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1722
1723        worktree_a
1724            .condition(&cx_a, |tree, _| {
1725                tree.paths()
1726                    .map(|p| p.to_string_lossy())
1727                    .collect::<Vec<_>>()
1728                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1729            })
1730            .await;
1731        worktree_b
1732            .condition(&cx_b, |tree, _| {
1733                tree.paths()
1734                    .map(|p| p.to_string_lossy())
1735                    .collect::<Vec<_>>()
1736                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1737            })
1738            .await;
1739        worktree_c
1740            .condition(&cx_c, |tree, _| {
1741                tree.paths()
1742                    .map(|p| p.to_string_lossy())
1743                    .collect::<Vec<_>>()
1744                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1745            })
1746            .await;
1747
1748        // Ensure buffer files are updated as well.
1749        buffer_a
1750            .condition(&cx_a, |buf, _| {
1751                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1752            })
1753            .await;
1754        buffer_b
1755            .condition(&cx_b, |buf, _| {
1756                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1757            })
1758            .await;
1759        buffer_c
1760            .condition(&cx_c, |buf, _| {
1761                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1762            })
1763            .await;
1764    }
1765
1766    #[gpui::test(iterations = 10)]
1767    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1768        cx_a.foreground().forbid_parking();
1769        let lang_registry = Arc::new(LanguageRegistry::test());
1770        let fs = FakeFs::new(cx_a.background());
1771
1772        // Connect to a server as 2 clients.
1773        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1774        let client_a = server.create_client(cx_a, "user_a").await;
1775        let client_b = server.create_client(cx_b, "user_b").await;
1776
1777        // Share a project as client A
1778        fs.insert_tree(
1779            "/dir",
1780            json!({
1781                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1782                "a.txt": "a-contents",
1783            }),
1784        )
1785        .await;
1786
1787        let project_a = cx_a.update(|cx| {
1788            Project::local(
1789                client_a.clone(),
1790                client_a.user_store.clone(),
1791                lang_registry.clone(),
1792                fs.clone(),
1793                cx,
1794            )
1795        });
1796        let (worktree_a, _) = project_a
1797            .update(cx_a, |p, cx| {
1798                p.find_or_create_local_worktree("/dir", true, cx)
1799            })
1800            .await
1801            .unwrap();
1802        worktree_a
1803            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1804            .await;
1805        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1806        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1807        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1808
1809        // Join that project as client B
1810        let project_b = Project::remote(
1811            project_id,
1812            client_b.clone(),
1813            client_b.user_store.clone(),
1814            lang_registry.clone(),
1815            fs.clone(),
1816            &mut cx_b.to_async(),
1817        )
1818        .await
1819        .unwrap();
1820
1821        // Open a buffer as client B
1822        let buffer_b = project_b
1823            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1824            .await
1825            .unwrap();
1826
1827        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1828        buffer_b.read_with(cx_b, |buf, _| {
1829            assert!(buf.is_dirty());
1830            assert!(!buf.has_conflict());
1831        });
1832
1833        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1834        buffer_b
1835            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1836            .await;
1837        buffer_b.read_with(cx_b, |buf, _| {
1838            assert!(!buf.has_conflict());
1839        });
1840
1841        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1842        buffer_b.read_with(cx_b, |buf, _| {
1843            assert!(buf.is_dirty());
1844            assert!(!buf.has_conflict());
1845        });
1846    }
1847
1848    #[gpui::test(iterations = 10)]
1849    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1850        cx_a.foreground().forbid_parking();
1851        let lang_registry = Arc::new(LanguageRegistry::test());
1852        let fs = FakeFs::new(cx_a.background());
1853
1854        // Connect to a server as 2 clients.
1855        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1856        let client_a = server.create_client(cx_a, "user_a").await;
1857        let client_b = server.create_client(cx_b, "user_b").await;
1858
1859        // Share a project as client A
1860        fs.insert_tree(
1861            "/dir",
1862            json!({
1863                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1864                "a.txt": "a-contents",
1865            }),
1866        )
1867        .await;
1868
1869        let project_a = cx_a.update(|cx| {
1870            Project::local(
1871                client_a.clone(),
1872                client_a.user_store.clone(),
1873                lang_registry.clone(),
1874                fs.clone(),
1875                cx,
1876            )
1877        });
1878        let (worktree_a, _) = project_a
1879            .update(cx_a, |p, cx| {
1880                p.find_or_create_local_worktree("/dir", true, cx)
1881            })
1882            .await
1883            .unwrap();
1884        worktree_a
1885            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1886            .await;
1887        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1888        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1889        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1890
1891        // Join that project as client B
1892        let project_b = Project::remote(
1893            project_id,
1894            client_b.clone(),
1895            client_b.user_store.clone(),
1896            lang_registry.clone(),
1897            fs.clone(),
1898            &mut cx_b.to_async(),
1899        )
1900        .await
1901        .unwrap();
1902        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1903
1904        // Open a buffer as client B
1905        let buffer_b = project_b
1906            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1907            .await
1908            .unwrap();
1909        buffer_b.read_with(cx_b, |buf, _| {
1910            assert!(!buf.is_dirty());
1911            assert!(!buf.has_conflict());
1912        });
1913
1914        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1915            .await
1916            .unwrap();
1917        buffer_b
1918            .condition(&cx_b, |buf, _| {
1919                buf.text() == "new contents" && !buf.is_dirty()
1920            })
1921            .await;
1922        buffer_b.read_with(cx_b, |buf, _| {
1923            assert!(!buf.has_conflict());
1924        });
1925    }
1926
1927    #[gpui::test(iterations = 10)]
1928    async fn test_editing_while_guest_opens_buffer(
1929        cx_a: &mut TestAppContext,
1930        cx_b: &mut TestAppContext,
1931    ) {
1932        cx_a.foreground().forbid_parking();
1933        let lang_registry = Arc::new(LanguageRegistry::test());
1934        let fs = FakeFs::new(cx_a.background());
1935
1936        // Connect to a server as 2 clients.
1937        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1938        let client_a = server.create_client(cx_a, "user_a").await;
1939        let client_b = server.create_client(cx_b, "user_b").await;
1940
1941        // Share a project as client A
1942        fs.insert_tree(
1943            "/dir",
1944            json!({
1945                ".zed.toml": r#"collaborators = ["user_b"]"#,
1946                "a.txt": "a-contents",
1947            }),
1948        )
1949        .await;
1950        let project_a = cx_a.update(|cx| {
1951            Project::local(
1952                client_a.clone(),
1953                client_a.user_store.clone(),
1954                lang_registry.clone(),
1955                fs.clone(),
1956                cx,
1957            )
1958        });
1959        let (worktree_a, _) = project_a
1960            .update(cx_a, |p, cx| {
1961                p.find_or_create_local_worktree("/dir", true, cx)
1962            })
1963            .await
1964            .unwrap();
1965        worktree_a
1966            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1967            .await;
1968        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1969        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1970        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1971
1972        // Join that project as client B
1973        let project_b = Project::remote(
1974            project_id,
1975            client_b.clone(),
1976            client_b.user_store.clone(),
1977            lang_registry.clone(),
1978            fs.clone(),
1979            &mut cx_b.to_async(),
1980        )
1981        .await
1982        .unwrap();
1983
1984        // Open a buffer as client A
1985        let buffer_a = project_a
1986            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1987            .await
1988            .unwrap();
1989
1990        // Start opening the same buffer as client B
1991        let buffer_b = cx_b
1992            .background()
1993            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1994
1995        // Edit the buffer as client A while client B is still opening it.
1996        cx_b.background().simulate_random_delay().await;
1997        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1998        cx_b.background().simulate_random_delay().await;
1999        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
2000
2001        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2002        let buffer_b = buffer_b.await.unwrap();
2003        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2004    }
2005
2006    #[gpui::test(iterations = 10)]
2007    async fn test_leaving_worktree_while_opening_buffer(
2008        cx_a: &mut TestAppContext,
2009        cx_b: &mut TestAppContext,
2010    ) {
2011        cx_a.foreground().forbid_parking();
2012        let lang_registry = Arc::new(LanguageRegistry::test());
2013        let fs = FakeFs::new(cx_a.background());
2014
2015        // Connect to a server as 2 clients.
2016        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2017        let client_a = server.create_client(cx_a, "user_a").await;
2018        let client_b = server.create_client(cx_b, "user_b").await;
2019
2020        // Share a project as client A
2021        fs.insert_tree(
2022            "/dir",
2023            json!({
2024                ".zed.toml": r#"collaborators = ["user_b"]"#,
2025                "a.txt": "a-contents",
2026            }),
2027        )
2028        .await;
2029        let project_a = cx_a.update(|cx| {
2030            Project::local(
2031                client_a.clone(),
2032                client_a.user_store.clone(),
2033                lang_registry.clone(),
2034                fs.clone(),
2035                cx,
2036            )
2037        });
2038        let (worktree_a, _) = project_a
2039            .update(cx_a, |p, cx| {
2040                p.find_or_create_local_worktree("/dir", true, cx)
2041            })
2042            .await
2043            .unwrap();
2044        worktree_a
2045            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2046            .await;
2047        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2048        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2049        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2050
2051        // Join that project as client B
2052        let project_b = Project::remote(
2053            project_id,
2054            client_b.clone(),
2055            client_b.user_store.clone(),
2056            lang_registry.clone(),
2057            fs.clone(),
2058            &mut cx_b.to_async(),
2059        )
2060        .await
2061        .unwrap();
2062
2063        // See that a guest has joined as client A.
2064        project_a
2065            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2066            .await;
2067
2068        // Begin opening a buffer as client B, but leave the project before the open completes.
2069        let buffer_b = cx_b
2070            .background()
2071            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2072        cx_b.update(|_| drop(project_b));
2073        drop(buffer_b);
2074
2075        // See that the guest has left.
2076        project_a
2077            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2078            .await;
2079    }
2080
2081    #[gpui::test(iterations = 10)]
2082    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2083        cx_a.foreground().forbid_parking();
2084        let lang_registry = Arc::new(LanguageRegistry::test());
2085        let fs = FakeFs::new(cx_a.background());
2086
2087        // Connect to a server as 2 clients.
2088        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2089        let client_a = server.create_client(cx_a, "user_a").await;
2090        let client_b = server.create_client(cx_b, "user_b").await;
2091
2092        // Share a project as client A
2093        fs.insert_tree(
2094            "/a",
2095            json!({
2096                ".zed.toml": r#"collaborators = ["user_b"]"#,
2097                "a.txt": "a-contents",
2098                "b.txt": "b-contents",
2099            }),
2100        )
2101        .await;
2102        let project_a = cx_a.update(|cx| {
2103            Project::local(
2104                client_a.clone(),
2105                client_a.user_store.clone(),
2106                lang_registry.clone(),
2107                fs.clone(),
2108                cx,
2109            )
2110        });
2111        let (worktree_a, _) = project_a
2112            .update(cx_a, |p, cx| {
2113                p.find_or_create_local_worktree("/a", true, cx)
2114            })
2115            .await
2116            .unwrap();
2117        worktree_a
2118            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2119            .await;
2120        let project_id = project_a
2121            .update(cx_a, |project, _| project.next_remote_id())
2122            .await;
2123        project_a
2124            .update(cx_a, |project, cx| project.share(cx))
2125            .await
2126            .unwrap();
2127
2128        // Join that project as client B
2129        let _project_b = Project::remote(
2130            project_id,
2131            client_b.clone(),
2132            client_b.user_store.clone(),
2133            lang_registry.clone(),
2134            fs.clone(),
2135            &mut cx_b.to_async(),
2136        )
2137        .await
2138        .unwrap();
2139
2140        // Client A sees that a guest has joined.
2141        project_a
2142            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2143            .await;
2144
2145        // Drop client B's connection and ensure client A observes client B leaving the project.
2146        client_b.disconnect(&cx_b.to_async()).unwrap();
2147        project_a
2148            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2149            .await;
2150
2151        // Rejoin the project as client B
2152        let _project_b = Project::remote(
2153            project_id,
2154            client_b.clone(),
2155            client_b.user_store.clone(),
2156            lang_registry.clone(),
2157            fs.clone(),
2158            &mut cx_b.to_async(),
2159        )
2160        .await
2161        .unwrap();
2162
2163        // Client A sees that a guest has re-joined.
2164        project_a
2165            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2166            .await;
2167
2168        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2169        client_b.wait_for_current_user(cx_b).await;
2170        server.disconnect_client(client_b.current_user_id(cx_b));
2171        cx_a.foreground().advance_clock(Duration::from_secs(3));
2172        project_a
2173            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2174            .await;
2175    }
2176
2177    #[gpui::test(iterations = 10)]
2178    async fn test_collaborating_with_diagnostics(
2179        cx_a: &mut TestAppContext,
2180        cx_b: &mut TestAppContext,
2181    ) {
2182        cx_a.foreground().forbid_parking();
2183        let lang_registry = Arc::new(LanguageRegistry::test());
2184        let fs = FakeFs::new(cx_a.background());
2185
2186        // Set up a fake language server.
2187        let mut language = Language::new(
2188            LanguageConfig {
2189                name: "Rust".into(),
2190                path_suffixes: vec!["rs".to_string()],
2191                ..Default::default()
2192            },
2193            Some(tree_sitter_rust::language()),
2194        );
2195        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2196        lang_registry.add(Arc::new(language));
2197
2198        // Connect to a server as 2 clients.
2199        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2200        let client_a = server.create_client(cx_a, "user_a").await;
2201        let client_b = server.create_client(cx_b, "user_b").await;
2202
2203        // Share a project as client A
2204        fs.insert_tree(
2205            "/a",
2206            json!({
2207                ".zed.toml": r#"collaborators = ["user_b"]"#,
2208                "a.rs": "let one = two",
2209                "other.rs": "",
2210            }),
2211        )
2212        .await;
2213        let project_a = cx_a.update(|cx| {
2214            Project::local(
2215                client_a.clone(),
2216                client_a.user_store.clone(),
2217                lang_registry.clone(),
2218                fs.clone(),
2219                cx,
2220            )
2221        });
2222        let (worktree_a, _) = project_a
2223            .update(cx_a, |p, cx| {
2224                p.find_or_create_local_worktree("/a", true, cx)
2225            })
2226            .await
2227            .unwrap();
2228        worktree_a
2229            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2230            .await;
2231        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2232        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2233        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2234
2235        // Cause the language server to start.
2236        let _ = cx_a
2237            .background()
2238            .spawn(project_a.update(cx_a, |project, cx| {
2239                project.open_buffer(
2240                    ProjectPath {
2241                        worktree_id,
2242                        path: Path::new("other.rs").into(),
2243                    },
2244                    cx,
2245                )
2246            }))
2247            .await
2248            .unwrap();
2249
2250        // Simulate a language server reporting errors for a file.
2251        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2252        fake_language_server
2253            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2254            .await;
2255        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2256            lsp::PublishDiagnosticsParams {
2257                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2258                version: None,
2259                diagnostics: vec![lsp::Diagnostic {
2260                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2261                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2262                    message: "message 1".to_string(),
2263                    ..Default::default()
2264                }],
2265            },
2266        );
2267
2268        // Wait for server to see the diagnostics update.
2269        server
2270            .condition(|store| {
2271                let worktree = store
2272                    .project(project_id)
2273                    .unwrap()
2274                    .share
2275                    .as_ref()
2276                    .unwrap()
2277                    .worktrees
2278                    .get(&worktree_id.to_proto())
2279                    .unwrap();
2280
2281                !worktree.diagnostic_summaries.is_empty()
2282            })
2283            .await;
2284
2285        // Join the worktree as client B.
2286        let project_b = Project::remote(
2287            project_id,
2288            client_b.clone(),
2289            client_b.user_store.clone(),
2290            lang_registry.clone(),
2291            fs.clone(),
2292            &mut cx_b.to_async(),
2293        )
2294        .await
2295        .unwrap();
2296
2297        project_b.read_with(cx_b, |project, cx| {
2298            assert_eq!(
2299                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2300                &[(
2301                    ProjectPath {
2302                        worktree_id,
2303                        path: Arc::from(Path::new("a.rs")),
2304                    },
2305                    DiagnosticSummary {
2306                        error_count: 1,
2307                        warning_count: 0,
2308                        ..Default::default()
2309                    },
2310                )]
2311            )
2312        });
2313
2314        // Simulate a language server reporting more errors for a file.
2315        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2316            lsp::PublishDiagnosticsParams {
2317                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2318                version: None,
2319                diagnostics: vec![
2320                    lsp::Diagnostic {
2321                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2322                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2323                        message: "message 1".to_string(),
2324                        ..Default::default()
2325                    },
2326                    lsp::Diagnostic {
2327                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2328                        range: lsp::Range::new(
2329                            lsp::Position::new(0, 10),
2330                            lsp::Position::new(0, 13),
2331                        ),
2332                        message: "message 2".to_string(),
2333                        ..Default::default()
2334                    },
2335                ],
2336            },
2337        );
2338
2339        // Client b gets the updated summaries
2340        project_b
2341            .condition(&cx_b, |project, cx| {
2342                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2343                    == &[(
2344                        ProjectPath {
2345                            worktree_id,
2346                            path: Arc::from(Path::new("a.rs")),
2347                        },
2348                        DiagnosticSummary {
2349                            error_count: 1,
2350                            warning_count: 1,
2351                            ..Default::default()
2352                        },
2353                    )]
2354            })
2355            .await;
2356
2357        // Open the file with the errors on client B. They should be present.
2358        let buffer_b = cx_b
2359            .background()
2360            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2361            .await
2362            .unwrap();
2363
2364        buffer_b.read_with(cx_b, |buffer, _| {
2365            assert_eq!(
2366                buffer
2367                    .snapshot()
2368                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2369                    .map(|entry| entry)
2370                    .collect::<Vec<_>>(),
2371                &[
2372                    DiagnosticEntry {
2373                        range: Point::new(0, 4)..Point::new(0, 7),
2374                        diagnostic: Diagnostic {
2375                            group_id: 0,
2376                            message: "message 1".to_string(),
2377                            severity: lsp::DiagnosticSeverity::ERROR,
2378                            is_primary: true,
2379                            ..Default::default()
2380                        }
2381                    },
2382                    DiagnosticEntry {
2383                        range: Point::new(0, 10)..Point::new(0, 13),
2384                        diagnostic: Diagnostic {
2385                            group_id: 1,
2386                            severity: lsp::DiagnosticSeverity::WARNING,
2387                            message: "message 2".to_string(),
2388                            is_primary: true,
2389                            ..Default::default()
2390                        }
2391                    }
2392                ]
2393            );
2394        });
2395
2396        // Simulate a language server reporting no errors for a file.
2397        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2398            lsp::PublishDiagnosticsParams {
2399                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2400                version: None,
2401                diagnostics: vec![],
2402            },
2403        );
2404        project_a
2405            .condition(cx_a, |project, cx| {
2406                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2407            })
2408            .await;
2409        project_b
2410            .condition(cx_b, |project, cx| {
2411                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2412            })
2413            .await;
2414    }
2415
2416    #[gpui::test(iterations = 10)]
2417    async fn test_collaborating_with_completion(
2418        cx_a: &mut TestAppContext,
2419        cx_b: &mut TestAppContext,
2420    ) {
2421        cx_a.foreground().forbid_parking();
2422        let lang_registry = Arc::new(LanguageRegistry::test());
2423        let fs = FakeFs::new(cx_a.background());
2424
2425        // Set up a fake language server.
2426        let mut language = Language::new(
2427            LanguageConfig {
2428                name: "Rust".into(),
2429                path_suffixes: vec!["rs".to_string()],
2430                ..Default::default()
2431            },
2432            Some(tree_sitter_rust::language()),
2433        );
2434        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2435            capabilities: lsp::ServerCapabilities {
2436                completion_provider: Some(lsp::CompletionOptions {
2437                    trigger_characters: Some(vec![".".to_string()]),
2438                    ..Default::default()
2439                }),
2440                ..Default::default()
2441            },
2442            ..Default::default()
2443        });
2444        lang_registry.add(Arc::new(language));
2445
2446        // Connect to a server as 2 clients.
2447        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2448        let client_a = server.create_client(cx_a, "user_a").await;
2449        let client_b = server.create_client(cx_b, "user_b").await;
2450
2451        // Share a project as client A
2452        fs.insert_tree(
2453            "/a",
2454            json!({
2455                ".zed.toml": r#"collaborators = ["user_b"]"#,
2456                "main.rs": "fn main() { a }",
2457                "other.rs": "",
2458            }),
2459        )
2460        .await;
2461        let project_a = cx_a.update(|cx| {
2462            Project::local(
2463                client_a.clone(),
2464                client_a.user_store.clone(),
2465                lang_registry.clone(),
2466                fs.clone(),
2467                cx,
2468            )
2469        });
2470        let (worktree_a, _) = project_a
2471            .update(cx_a, |p, cx| {
2472                p.find_or_create_local_worktree("/a", true, cx)
2473            })
2474            .await
2475            .unwrap();
2476        worktree_a
2477            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2478            .await;
2479        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2480        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2481        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2482
2483        // Join the worktree as client B.
2484        let project_b = Project::remote(
2485            project_id,
2486            client_b.clone(),
2487            client_b.user_store.clone(),
2488            lang_registry.clone(),
2489            fs.clone(),
2490            &mut cx_b.to_async(),
2491        )
2492        .await
2493        .unwrap();
2494
2495        // Open a file in an editor as the guest.
2496        let buffer_b = project_b
2497            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2498            .await
2499            .unwrap();
2500        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2501        let editor_b = cx_b.add_view(window_b, |cx| {
2502            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2503        });
2504
2505        let fake_language_server = fake_language_servers.next().await.unwrap();
2506        buffer_b
2507            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2508            .await;
2509
2510        // Type a completion trigger character as the guest.
2511        editor_b.update(cx_b, |editor, cx| {
2512            editor.select_ranges([13..13], None, cx);
2513            editor.handle_input(&Input(".".into()), cx);
2514            cx.focus(&editor_b);
2515        });
2516
2517        // Receive a completion request as the host's language server.
2518        // Return some completions from the host's language server.
2519        cx_a.foreground().start_waiting();
2520        fake_language_server
2521            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2522                assert_eq!(
2523                    params.text_document_position.text_document.uri,
2524                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2525                );
2526                assert_eq!(
2527                    params.text_document_position.position,
2528                    lsp::Position::new(0, 14),
2529                );
2530
2531                Ok(Some(lsp::CompletionResponse::Array(vec![
2532                    lsp::CompletionItem {
2533                        label: "first_method(…)".into(),
2534                        detail: Some("fn(&mut self, B) -> C".into()),
2535                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2536                            new_text: "first_method($1)".to_string(),
2537                            range: lsp::Range::new(
2538                                lsp::Position::new(0, 14),
2539                                lsp::Position::new(0, 14),
2540                            ),
2541                        })),
2542                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2543                        ..Default::default()
2544                    },
2545                    lsp::CompletionItem {
2546                        label: "second_method(…)".into(),
2547                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2548                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2549                            new_text: "second_method()".to_string(),
2550                            range: lsp::Range::new(
2551                                lsp::Position::new(0, 14),
2552                                lsp::Position::new(0, 14),
2553                            ),
2554                        })),
2555                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2556                        ..Default::default()
2557                    },
2558                ])))
2559            })
2560            .next()
2561            .await
2562            .unwrap();
2563        cx_a.foreground().finish_waiting();
2564
2565        // Open the buffer on the host.
2566        let buffer_a = project_a
2567            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2568            .await
2569            .unwrap();
2570        buffer_a
2571            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2572            .await;
2573
2574        // Confirm a completion on the guest.
2575        editor_b
2576            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2577            .await;
2578        editor_b.update(cx_b, |editor, cx| {
2579            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2580            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2581        });
2582
2583        // Return a resolved completion from the host's language server.
2584        // The resolved completion has an additional text edit.
2585        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2586            |params, _| async move {
2587                assert_eq!(params.label, "first_method(…)");
2588                Ok(lsp::CompletionItem {
2589                    label: "first_method(…)".into(),
2590                    detail: Some("fn(&mut self, B) -> C".into()),
2591                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2592                        new_text: "first_method($1)".to_string(),
2593                        range: lsp::Range::new(
2594                            lsp::Position::new(0, 14),
2595                            lsp::Position::new(0, 14),
2596                        ),
2597                    })),
2598                    additional_text_edits: Some(vec![lsp::TextEdit {
2599                        new_text: "use d::SomeTrait;\n".to_string(),
2600                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2601                    }]),
2602                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2603                    ..Default::default()
2604                })
2605            },
2606        );
2607
2608        // The additional edit is applied.
2609        buffer_a
2610            .condition(&cx_a, |buffer, _| {
2611                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2612            })
2613            .await;
2614        buffer_b
2615            .condition(&cx_b, |buffer, _| {
2616                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2617            })
2618            .await;
2619    }
2620
2621    #[gpui::test(iterations = 10)]
2622    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2623        cx_a.foreground().forbid_parking();
2624        let lang_registry = Arc::new(LanguageRegistry::test());
2625        let fs = FakeFs::new(cx_a.background());
2626
2627        // Connect to a server as 2 clients.
2628        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2629        let client_a = server.create_client(cx_a, "user_a").await;
2630        let client_b = server.create_client(cx_b, "user_b").await;
2631
2632        // Share a project as client A
2633        fs.insert_tree(
2634            "/a",
2635            json!({
2636                ".zed.toml": r#"collaborators = ["user_b"]"#,
2637                "a.rs": "let one = 1;",
2638            }),
2639        )
2640        .await;
2641        let project_a = cx_a.update(|cx| {
2642            Project::local(
2643                client_a.clone(),
2644                client_a.user_store.clone(),
2645                lang_registry.clone(),
2646                fs.clone(),
2647                cx,
2648            )
2649        });
2650        let (worktree_a, _) = project_a
2651            .update(cx_a, |p, cx| {
2652                p.find_or_create_local_worktree("/a", true, cx)
2653            })
2654            .await
2655            .unwrap();
2656        worktree_a
2657            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2658            .await;
2659        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2660        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2661        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2662        let buffer_a = project_a
2663            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2664            .await
2665            .unwrap();
2666
2667        // Join the worktree as client B.
2668        let project_b = Project::remote(
2669            project_id,
2670            client_b.clone(),
2671            client_b.user_store.clone(),
2672            lang_registry.clone(),
2673            fs.clone(),
2674            &mut cx_b.to_async(),
2675        )
2676        .await
2677        .unwrap();
2678
2679        let buffer_b = cx_b
2680            .background()
2681            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2682            .await
2683            .unwrap();
2684        buffer_b.update(cx_b, |buffer, cx| {
2685            buffer.edit([4..7], "six", cx);
2686            buffer.edit([10..11], "6", cx);
2687            assert_eq!(buffer.text(), "let six = 6;");
2688            assert!(buffer.is_dirty());
2689            assert!(!buffer.has_conflict());
2690        });
2691        buffer_a
2692            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
2693            .await;
2694
2695        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
2696            .await
2697            .unwrap();
2698        buffer_a
2699            .condition(cx_a, |buffer, _| buffer.has_conflict())
2700            .await;
2701        buffer_b
2702            .condition(cx_b, |buffer, _| buffer.has_conflict())
2703            .await;
2704
2705        project_b
2706            .update(cx_b, |project, cx| {
2707                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
2708            })
2709            .await
2710            .unwrap();
2711        buffer_a.read_with(cx_a, |buffer, _| {
2712            assert_eq!(buffer.text(), "let seven = 7;");
2713            assert!(!buffer.is_dirty());
2714            assert!(!buffer.has_conflict());
2715        });
2716        buffer_b.read_with(cx_b, |buffer, _| {
2717            assert_eq!(buffer.text(), "let seven = 7;");
2718            assert!(!buffer.is_dirty());
2719            assert!(!buffer.has_conflict());
2720        });
2721
2722        buffer_a.update(cx_a, |buffer, cx| {
2723            // Undoing on the host is a no-op when the reload was initiated by the guest.
2724            buffer.undo(cx);
2725            assert_eq!(buffer.text(), "let seven = 7;");
2726            assert!(!buffer.is_dirty());
2727            assert!(!buffer.has_conflict());
2728        });
2729        buffer_b.update(cx_b, |buffer, cx| {
2730            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
2731            buffer.undo(cx);
2732            assert_eq!(buffer.text(), "let six = 6;");
2733            assert!(buffer.is_dirty());
2734            assert!(!buffer.has_conflict());
2735        });
2736    }
2737
2738    #[gpui::test(iterations = 10)]
2739    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2740        cx_a.foreground().forbid_parking();
2741        let lang_registry = Arc::new(LanguageRegistry::test());
2742        let fs = FakeFs::new(cx_a.background());
2743
2744        // Set up a fake language server.
2745        let mut language = Language::new(
2746            LanguageConfig {
2747                name: "Rust".into(),
2748                path_suffixes: vec!["rs".to_string()],
2749                ..Default::default()
2750            },
2751            Some(tree_sitter_rust::language()),
2752        );
2753        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2754        lang_registry.add(Arc::new(language));
2755
2756        // Connect to a server as 2 clients.
2757        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2758        let client_a = server.create_client(cx_a, "user_a").await;
2759        let client_b = server.create_client(cx_b, "user_b").await;
2760
2761        // Share a project as client A
2762        fs.insert_tree(
2763            "/a",
2764            json!({
2765                ".zed.toml": r#"collaborators = ["user_b"]"#,
2766                "a.rs": "let one = two",
2767            }),
2768        )
2769        .await;
2770        let project_a = cx_a.update(|cx| {
2771            Project::local(
2772                client_a.clone(),
2773                client_a.user_store.clone(),
2774                lang_registry.clone(),
2775                fs.clone(),
2776                cx,
2777            )
2778        });
2779        let (worktree_a, _) = project_a
2780            .update(cx_a, |p, cx| {
2781                p.find_or_create_local_worktree("/a", true, cx)
2782            })
2783            .await
2784            .unwrap();
2785        worktree_a
2786            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2787            .await;
2788        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2789        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2790        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2791
2792        // Join the worktree as client B.
2793        let project_b = Project::remote(
2794            project_id,
2795            client_b.clone(),
2796            client_b.user_store.clone(),
2797            lang_registry.clone(),
2798            fs.clone(),
2799            &mut cx_b.to_async(),
2800        )
2801        .await
2802        .unwrap();
2803
2804        let buffer_b = cx_b
2805            .background()
2806            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2807            .await
2808            .unwrap();
2809
2810        let fake_language_server = fake_language_servers.next().await.unwrap();
2811        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
2812            Ok(Some(vec![
2813                lsp::TextEdit {
2814                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2815                    new_text: "h".to_string(),
2816                },
2817                lsp::TextEdit {
2818                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2819                    new_text: "y".to_string(),
2820                },
2821            ]))
2822        });
2823
2824        project_b
2825            .update(cx_b, |project, cx| {
2826                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2827            })
2828            .await
2829            .unwrap();
2830        assert_eq!(
2831            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2832            "let honey = two"
2833        );
2834    }
2835
2836    #[gpui::test(iterations = 10)]
2837    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2838        cx_a.foreground().forbid_parking();
2839        let lang_registry = Arc::new(LanguageRegistry::test());
2840        let fs = FakeFs::new(cx_a.background());
2841        fs.insert_tree(
2842            "/root-1",
2843            json!({
2844                ".zed.toml": r#"collaborators = ["user_b"]"#,
2845                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2846            }),
2847        )
2848        .await;
2849        fs.insert_tree(
2850            "/root-2",
2851            json!({
2852                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2853            }),
2854        )
2855        .await;
2856
2857        // Set up a fake language server.
2858        let mut language = Language::new(
2859            LanguageConfig {
2860                name: "Rust".into(),
2861                path_suffixes: vec!["rs".to_string()],
2862                ..Default::default()
2863            },
2864            Some(tree_sitter_rust::language()),
2865        );
2866        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2867        lang_registry.add(Arc::new(language));
2868
2869        // Connect to a server as 2 clients.
2870        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2871        let client_a = server.create_client(cx_a, "user_a").await;
2872        let client_b = server.create_client(cx_b, "user_b").await;
2873
2874        // Share a project as client A
2875        let project_a = cx_a.update(|cx| {
2876            Project::local(
2877                client_a.clone(),
2878                client_a.user_store.clone(),
2879                lang_registry.clone(),
2880                fs.clone(),
2881                cx,
2882            )
2883        });
2884        let (worktree_a, _) = project_a
2885            .update(cx_a, |p, cx| {
2886                p.find_or_create_local_worktree("/root-1", true, cx)
2887            })
2888            .await
2889            .unwrap();
2890        worktree_a
2891            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2892            .await;
2893        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2894        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2895        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2896
2897        // Join the worktree as client B.
2898        let project_b = Project::remote(
2899            project_id,
2900            client_b.clone(),
2901            client_b.user_store.clone(),
2902            lang_registry.clone(),
2903            fs.clone(),
2904            &mut cx_b.to_async(),
2905        )
2906        .await
2907        .unwrap();
2908
2909        // Open the file on client B.
2910        let buffer_b = cx_b
2911            .background()
2912            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2913            .await
2914            .unwrap();
2915
2916        // Request the definition of a symbol as the guest.
2917        let fake_language_server = fake_language_servers.next().await.unwrap();
2918        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2919            |_, _| async move {
2920                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2921                    lsp::Location::new(
2922                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2923                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2924                    ),
2925                )))
2926            },
2927        );
2928
2929        let definitions_1 = project_b
2930            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2931            .await
2932            .unwrap();
2933        cx_b.read(|cx| {
2934            assert_eq!(definitions_1.len(), 1);
2935            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2936            let target_buffer = definitions_1[0].buffer.read(cx);
2937            assert_eq!(
2938                target_buffer.text(),
2939                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2940            );
2941            assert_eq!(
2942                definitions_1[0].range.to_point(target_buffer),
2943                Point::new(0, 6)..Point::new(0, 9)
2944            );
2945        });
2946
2947        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2948        // the previous call to `definition`.
2949        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2950            |_, _| async move {
2951                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2952                    lsp::Location::new(
2953                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2954                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2955                    ),
2956                )))
2957            },
2958        );
2959
2960        let definitions_2 = project_b
2961            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2962            .await
2963            .unwrap();
2964        cx_b.read(|cx| {
2965            assert_eq!(definitions_2.len(), 1);
2966            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2967            let target_buffer = definitions_2[0].buffer.read(cx);
2968            assert_eq!(
2969                target_buffer.text(),
2970                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2971            );
2972            assert_eq!(
2973                definitions_2[0].range.to_point(target_buffer),
2974                Point::new(1, 6)..Point::new(1, 11)
2975            );
2976        });
2977        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2978    }
2979
2980    #[gpui::test(iterations = 10)]
2981    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2982        cx_a.foreground().forbid_parking();
2983        let lang_registry = Arc::new(LanguageRegistry::test());
2984        let fs = FakeFs::new(cx_a.background());
2985        fs.insert_tree(
2986            "/root-1",
2987            json!({
2988                ".zed.toml": r#"collaborators = ["user_b"]"#,
2989                "one.rs": "const ONE: usize = 1;",
2990                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2991            }),
2992        )
2993        .await;
2994        fs.insert_tree(
2995            "/root-2",
2996            json!({
2997                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2998            }),
2999        )
3000        .await;
3001
3002        // Set up a fake language server.
3003        let mut language = Language::new(
3004            LanguageConfig {
3005                name: "Rust".into(),
3006                path_suffixes: vec!["rs".to_string()],
3007                ..Default::default()
3008            },
3009            Some(tree_sitter_rust::language()),
3010        );
3011        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3012        lang_registry.add(Arc::new(language));
3013
3014        // Connect to a server as 2 clients.
3015        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3016        let client_a = server.create_client(cx_a, "user_a").await;
3017        let client_b = server.create_client(cx_b, "user_b").await;
3018
3019        // Share a project as client A
3020        let project_a = cx_a.update(|cx| {
3021            Project::local(
3022                client_a.clone(),
3023                client_a.user_store.clone(),
3024                lang_registry.clone(),
3025                fs.clone(),
3026                cx,
3027            )
3028        });
3029        let (worktree_a, _) = project_a
3030            .update(cx_a, |p, cx| {
3031                p.find_or_create_local_worktree("/root-1", true, cx)
3032            })
3033            .await
3034            .unwrap();
3035        worktree_a
3036            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3037            .await;
3038        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3039        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3040        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3041
3042        // Join the worktree as client B.
3043        let project_b = Project::remote(
3044            project_id,
3045            client_b.clone(),
3046            client_b.user_store.clone(),
3047            lang_registry.clone(),
3048            fs.clone(),
3049            &mut cx_b.to_async(),
3050        )
3051        .await
3052        .unwrap();
3053
3054        // Open the file on client B.
3055        let buffer_b = cx_b
3056            .background()
3057            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3058            .await
3059            .unwrap();
3060
3061        // Request references to a symbol as the guest.
3062        let fake_language_server = fake_language_servers.next().await.unwrap();
3063        fake_language_server.handle_request::<lsp::request::References, _, _>(
3064            |params, _| async move {
3065                assert_eq!(
3066                    params.text_document_position.text_document.uri.as_str(),
3067                    "file:///root-1/one.rs"
3068                );
3069                Ok(Some(vec![
3070                    lsp::Location {
3071                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3072                        range: lsp::Range::new(
3073                            lsp::Position::new(0, 24),
3074                            lsp::Position::new(0, 27),
3075                        ),
3076                    },
3077                    lsp::Location {
3078                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3079                        range: lsp::Range::new(
3080                            lsp::Position::new(0, 35),
3081                            lsp::Position::new(0, 38),
3082                        ),
3083                    },
3084                    lsp::Location {
3085                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3086                        range: lsp::Range::new(
3087                            lsp::Position::new(0, 37),
3088                            lsp::Position::new(0, 40),
3089                        ),
3090                    },
3091                ]))
3092            },
3093        );
3094
3095        let references = project_b
3096            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3097            .await
3098            .unwrap();
3099        cx_b.read(|cx| {
3100            assert_eq!(references.len(), 3);
3101            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3102
3103            let two_buffer = references[0].buffer.read(cx);
3104            let three_buffer = references[2].buffer.read(cx);
3105            assert_eq!(
3106                two_buffer.file().unwrap().path().as_ref(),
3107                Path::new("two.rs")
3108            );
3109            assert_eq!(references[1].buffer, references[0].buffer);
3110            assert_eq!(
3111                three_buffer.file().unwrap().full_path(cx),
3112                Path::new("three.rs")
3113            );
3114
3115            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3116            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3117            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3118        });
3119    }
3120
3121    #[gpui::test(iterations = 10)]
3122    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3123        cx_a.foreground().forbid_parking();
3124        let lang_registry = Arc::new(LanguageRegistry::test());
3125        let fs = FakeFs::new(cx_a.background());
3126        fs.insert_tree(
3127            "/root-1",
3128            json!({
3129                ".zed.toml": r#"collaborators = ["user_b"]"#,
3130                "a": "hello world",
3131                "b": "goodnight moon",
3132                "c": "a world of goo",
3133                "d": "world champion of clown world",
3134            }),
3135        )
3136        .await;
3137        fs.insert_tree(
3138            "/root-2",
3139            json!({
3140                "e": "disney world is fun",
3141            }),
3142        )
3143        .await;
3144
3145        // Connect to a server as 2 clients.
3146        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3147        let client_a = server.create_client(cx_a, "user_a").await;
3148        let client_b = server.create_client(cx_b, "user_b").await;
3149
3150        // Share a project as client A
3151        let project_a = cx_a.update(|cx| {
3152            Project::local(
3153                client_a.clone(),
3154                client_a.user_store.clone(),
3155                lang_registry.clone(),
3156                fs.clone(),
3157                cx,
3158            )
3159        });
3160        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3161
3162        let (worktree_1, _) = project_a
3163            .update(cx_a, |p, cx| {
3164                p.find_or_create_local_worktree("/root-1", true, cx)
3165            })
3166            .await
3167            .unwrap();
3168        worktree_1
3169            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3170            .await;
3171        let (worktree_2, _) = project_a
3172            .update(cx_a, |p, cx| {
3173                p.find_or_create_local_worktree("/root-2", true, cx)
3174            })
3175            .await
3176            .unwrap();
3177        worktree_2
3178            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3179            .await;
3180
3181        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3182
3183        // Join the worktree as client B.
3184        let project_b = Project::remote(
3185            project_id,
3186            client_b.clone(),
3187            client_b.user_store.clone(),
3188            lang_registry.clone(),
3189            fs.clone(),
3190            &mut cx_b.to_async(),
3191        )
3192        .await
3193        .unwrap();
3194
3195        let results = project_b
3196            .update(cx_b, |project, cx| {
3197                project.search(SearchQuery::text("world", false, false), cx)
3198            })
3199            .await
3200            .unwrap();
3201
3202        let mut ranges_by_path = results
3203            .into_iter()
3204            .map(|(buffer, ranges)| {
3205                buffer.read_with(cx_b, |buffer, cx| {
3206                    let path = buffer.file().unwrap().full_path(cx);
3207                    let offset_ranges = ranges
3208                        .into_iter()
3209                        .map(|range| range.to_offset(buffer))
3210                        .collect::<Vec<_>>();
3211                    (path, offset_ranges)
3212                })
3213            })
3214            .collect::<Vec<_>>();
3215        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3216
3217        assert_eq!(
3218            ranges_by_path,
3219            &[
3220                (PathBuf::from("root-1/a"), vec![6..11]),
3221                (PathBuf::from("root-1/c"), vec![2..7]),
3222                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3223                (PathBuf::from("root-2/e"), vec![7..12]),
3224            ]
3225        );
3226    }
3227
3228    #[gpui::test(iterations = 10)]
3229    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3230        cx_a.foreground().forbid_parking();
3231        let lang_registry = Arc::new(LanguageRegistry::test());
3232        let fs = FakeFs::new(cx_a.background());
3233        fs.insert_tree(
3234            "/root-1",
3235            json!({
3236                ".zed.toml": r#"collaborators = ["user_b"]"#,
3237                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3238            }),
3239        )
3240        .await;
3241
3242        // Set up a fake language server.
3243        let mut language = Language::new(
3244            LanguageConfig {
3245                name: "Rust".into(),
3246                path_suffixes: vec!["rs".to_string()],
3247                ..Default::default()
3248            },
3249            Some(tree_sitter_rust::language()),
3250        );
3251        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3252        lang_registry.add(Arc::new(language));
3253
3254        // Connect to a server as 2 clients.
3255        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3256        let client_a = server.create_client(cx_a, "user_a").await;
3257        let client_b = server.create_client(cx_b, "user_b").await;
3258
3259        // Share a project as client A
3260        let project_a = cx_a.update(|cx| {
3261            Project::local(
3262                client_a.clone(),
3263                client_a.user_store.clone(),
3264                lang_registry.clone(),
3265                fs.clone(),
3266                cx,
3267            )
3268        });
3269        let (worktree_a, _) = project_a
3270            .update(cx_a, |p, cx| {
3271                p.find_or_create_local_worktree("/root-1", true, cx)
3272            })
3273            .await
3274            .unwrap();
3275        worktree_a
3276            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3277            .await;
3278        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3279        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3280        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3281
3282        // Join the worktree as client B.
3283        let project_b = Project::remote(
3284            project_id,
3285            client_b.clone(),
3286            client_b.user_store.clone(),
3287            lang_registry.clone(),
3288            fs.clone(),
3289            &mut cx_b.to_async(),
3290        )
3291        .await
3292        .unwrap();
3293
3294        // Open the file on client B.
3295        let buffer_b = cx_b
3296            .background()
3297            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3298            .await
3299            .unwrap();
3300
3301        // Request document highlights as the guest.
3302        let fake_language_server = fake_language_servers.next().await.unwrap();
3303        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3304            |params, _| async move {
3305                assert_eq!(
3306                    params
3307                        .text_document_position_params
3308                        .text_document
3309                        .uri
3310                        .as_str(),
3311                    "file:///root-1/main.rs"
3312                );
3313                assert_eq!(
3314                    params.text_document_position_params.position,
3315                    lsp::Position::new(0, 34)
3316                );
3317                Ok(Some(vec![
3318                    lsp::DocumentHighlight {
3319                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3320                        range: lsp::Range::new(
3321                            lsp::Position::new(0, 10),
3322                            lsp::Position::new(0, 16),
3323                        ),
3324                    },
3325                    lsp::DocumentHighlight {
3326                        kind: Some(lsp::DocumentHighlightKind::READ),
3327                        range: lsp::Range::new(
3328                            lsp::Position::new(0, 32),
3329                            lsp::Position::new(0, 38),
3330                        ),
3331                    },
3332                    lsp::DocumentHighlight {
3333                        kind: Some(lsp::DocumentHighlightKind::READ),
3334                        range: lsp::Range::new(
3335                            lsp::Position::new(0, 41),
3336                            lsp::Position::new(0, 47),
3337                        ),
3338                    },
3339                ]))
3340            },
3341        );
3342
3343        let highlights = project_b
3344            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3345            .await
3346            .unwrap();
3347        buffer_b.read_with(cx_b, |buffer, _| {
3348            let snapshot = buffer.snapshot();
3349
3350            let highlights = highlights
3351                .into_iter()
3352                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3353                .collect::<Vec<_>>();
3354            assert_eq!(
3355                highlights,
3356                &[
3357                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3358                    (lsp::DocumentHighlightKind::READ, 32..38),
3359                    (lsp::DocumentHighlightKind::READ, 41..47)
3360                ]
3361            )
3362        });
3363    }
3364
3365    #[gpui::test(iterations = 10)]
3366    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3367        cx_a.foreground().forbid_parking();
3368        let lang_registry = Arc::new(LanguageRegistry::test());
3369        let fs = FakeFs::new(cx_a.background());
3370        fs.insert_tree(
3371            "/code",
3372            json!({
3373                "crate-1": {
3374                    ".zed.toml": r#"collaborators = ["user_b"]"#,
3375                    "one.rs": "const ONE: usize = 1;",
3376                },
3377                "crate-2": {
3378                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3379                },
3380                "private": {
3381                    "passwords.txt": "the-password",
3382                }
3383            }),
3384        )
3385        .await;
3386
3387        // Set up a fake language server.
3388        let mut language = Language::new(
3389            LanguageConfig {
3390                name: "Rust".into(),
3391                path_suffixes: vec!["rs".to_string()],
3392                ..Default::default()
3393            },
3394            Some(tree_sitter_rust::language()),
3395        );
3396        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3397        lang_registry.add(Arc::new(language));
3398
3399        // Connect to a server as 2 clients.
3400        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3401        let client_a = server.create_client(cx_a, "user_a").await;
3402        let client_b = server.create_client(cx_b, "user_b").await;
3403
3404        // Share a project as client A
3405        let project_a = cx_a.update(|cx| {
3406            Project::local(
3407                client_a.clone(),
3408                client_a.user_store.clone(),
3409                lang_registry.clone(),
3410                fs.clone(),
3411                cx,
3412            )
3413        });
3414        let (worktree_a, _) = project_a
3415            .update(cx_a, |p, cx| {
3416                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3417            })
3418            .await
3419            .unwrap();
3420        worktree_a
3421            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3422            .await;
3423        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3424        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3425        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3426
3427        // Join the worktree as client B.
3428        let project_b = Project::remote(
3429            project_id,
3430            client_b.clone(),
3431            client_b.user_store.clone(),
3432            lang_registry.clone(),
3433            fs.clone(),
3434            &mut cx_b.to_async(),
3435        )
3436        .await
3437        .unwrap();
3438
3439        // Cause the language server to start.
3440        let _buffer = cx_b
3441            .background()
3442            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3443            .await
3444            .unwrap();
3445
3446        let fake_language_server = fake_language_servers.next().await.unwrap();
3447        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3448            |_, _| async move {
3449                #[allow(deprecated)]
3450                Ok(Some(vec![lsp::SymbolInformation {
3451                    name: "TWO".into(),
3452                    location: lsp::Location {
3453                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3454                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3455                    },
3456                    kind: lsp::SymbolKind::CONSTANT,
3457                    tags: None,
3458                    container_name: None,
3459                    deprecated: None,
3460                }]))
3461            },
3462        );
3463
3464        // Request the definition of a symbol as the guest.
3465        let symbols = project_b
3466            .update(cx_b, |p, cx| p.symbols("two", cx))
3467            .await
3468            .unwrap();
3469        assert_eq!(symbols.len(), 1);
3470        assert_eq!(symbols[0].name, "TWO");
3471
3472        // Open one of the returned symbols.
3473        let buffer_b_2 = project_b
3474            .update(cx_b, |project, cx| {
3475                project.open_buffer_for_symbol(&symbols[0], cx)
3476            })
3477            .await
3478            .unwrap();
3479        buffer_b_2.read_with(cx_b, |buffer, _| {
3480            assert_eq!(
3481                buffer.file().unwrap().path().as_ref(),
3482                Path::new("../crate-2/two.rs")
3483            );
3484        });
3485
3486        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3487        let mut fake_symbol = symbols[0].clone();
3488        fake_symbol.path = Path::new("/code/secrets").into();
3489        let error = project_b
3490            .update(cx_b, |project, cx| {
3491                project.open_buffer_for_symbol(&fake_symbol, cx)
3492            })
3493            .await
3494            .unwrap_err();
3495        assert!(error.to_string().contains("invalid symbol signature"));
3496    }
3497
3498    #[gpui::test(iterations = 10)]
3499    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3500        cx_a: &mut TestAppContext,
3501        cx_b: &mut TestAppContext,
3502        mut rng: StdRng,
3503    ) {
3504        cx_a.foreground().forbid_parking();
3505        let lang_registry = Arc::new(LanguageRegistry::test());
3506        let fs = FakeFs::new(cx_a.background());
3507        fs.insert_tree(
3508            "/root",
3509            json!({
3510                ".zed.toml": r#"collaborators = ["user_b"]"#,
3511                "a.rs": "const ONE: usize = b::TWO;",
3512                "b.rs": "const TWO: usize = 2",
3513            }),
3514        )
3515        .await;
3516
3517        // Set up a fake language server.
3518        let mut language = Language::new(
3519            LanguageConfig {
3520                name: "Rust".into(),
3521                path_suffixes: vec!["rs".to_string()],
3522                ..Default::default()
3523            },
3524            Some(tree_sitter_rust::language()),
3525        );
3526        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3527        lang_registry.add(Arc::new(language));
3528
3529        // Connect to a server as 2 clients.
3530        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3531        let client_a = server.create_client(cx_a, "user_a").await;
3532        let client_b = server.create_client(cx_b, "user_b").await;
3533
3534        // Share a project as client A
3535        let project_a = cx_a.update(|cx| {
3536            Project::local(
3537                client_a.clone(),
3538                client_a.user_store.clone(),
3539                lang_registry.clone(),
3540                fs.clone(),
3541                cx,
3542            )
3543        });
3544
3545        let (worktree_a, _) = project_a
3546            .update(cx_a, |p, cx| {
3547                p.find_or_create_local_worktree("/root", true, cx)
3548            })
3549            .await
3550            .unwrap();
3551        worktree_a
3552            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3553            .await;
3554        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3555        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3556        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3557
3558        // Join the worktree as client B.
3559        let project_b = Project::remote(
3560            project_id,
3561            client_b.clone(),
3562            client_b.user_store.clone(),
3563            lang_registry.clone(),
3564            fs.clone(),
3565            &mut cx_b.to_async(),
3566        )
3567        .await
3568        .unwrap();
3569
3570        let buffer_b1 = cx_b
3571            .background()
3572            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3573            .await
3574            .unwrap();
3575
3576        let fake_language_server = fake_language_servers.next().await.unwrap();
3577        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3578            |_, _| async move {
3579                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3580                    lsp::Location::new(
3581                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
3582                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3583                    ),
3584                )))
3585            },
3586        );
3587
3588        let definitions;
3589        let buffer_b2;
3590        if rng.gen() {
3591            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3592            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3593        } else {
3594            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3595            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3596        }
3597
3598        let buffer_b2 = buffer_b2.await.unwrap();
3599        let definitions = definitions.await.unwrap();
3600        assert_eq!(definitions.len(), 1);
3601        assert_eq!(definitions[0].buffer, buffer_b2);
3602    }
3603
3604    #[gpui::test(iterations = 10)]
3605    async fn test_collaborating_with_code_actions(
3606        cx_a: &mut TestAppContext,
3607        cx_b: &mut TestAppContext,
3608    ) {
3609        cx_a.foreground().forbid_parking();
3610        let lang_registry = Arc::new(LanguageRegistry::test());
3611        let fs = FakeFs::new(cx_a.background());
3612        cx_b.update(|cx| editor::init(cx));
3613
3614        // Set up a fake language server.
3615        let mut language = Language::new(
3616            LanguageConfig {
3617                name: "Rust".into(),
3618                path_suffixes: vec!["rs".to_string()],
3619                ..Default::default()
3620            },
3621            Some(tree_sitter_rust::language()),
3622        );
3623        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3624        lang_registry.add(Arc::new(language));
3625
3626        // Connect to a server as 2 clients.
3627        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3628        let client_a = server.create_client(cx_a, "user_a").await;
3629        let client_b = server.create_client(cx_b, "user_b").await;
3630
3631        // Share a project as client A
3632        fs.insert_tree(
3633            "/a",
3634            json!({
3635                ".zed.toml": r#"collaborators = ["user_b"]"#,
3636                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3637                "other.rs": "pub fn foo() -> usize { 4 }",
3638            }),
3639        )
3640        .await;
3641        let project_a = cx_a.update(|cx| {
3642            Project::local(
3643                client_a.clone(),
3644                client_a.user_store.clone(),
3645                lang_registry.clone(),
3646                fs.clone(),
3647                cx,
3648            )
3649        });
3650        let (worktree_a, _) = project_a
3651            .update(cx_a, |p, cx| {
3652                p.find_or_create_local_worktree("/a", true, cx)
3653            })
3654            .await
3655            .unwrap();
3656        worktree_a
3657            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3658            .await;
3659        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3660        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3661        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3662
3663        // Join the worktree as client B.
3664        let project_b = Project::remote(
3665            project_id,
3666            client_b.clone(),
3667            client_b.user_store.clone(),
3668            lang_registry.clone(),
3669            fs.clone(),
3670            &mut cx_b.to_async(),
3671        )
3672        .await
3673        .unwrap();
3674        let mut params = cx_b.update(WorkspaceParams::test);
3675        params.languages = lang_registry.clone();
3676        params.client = client_b.client.clone();
3677        params.user_store = client_b.user_store.clone();
3678        params.project = project_b;
3679
3680        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3681        let editor_b = workspace_b
3682            .update(cx_b, |workspace, cx| {
3683                workspace.open_path((worktree_id, "main.rs"), cx)
3684            })
3685            .await
3686            .unwrap()
3687            .downcast::<Editor>()
3688            .unwrap();
3689
3690        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3691        fake_language_server
3692            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3693                assert_eq!(
3694                    params.text_document.uri,
3695                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3696                );
3697                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3698                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3699                Ok(None)
3700            })
3701            .next()
3702            .await;
3703
3704        // Move cursor to a location that contains code actions.
3705        editor_b.update(cx_b, |editor, cx| {
3706            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3707            cx.focus(&editor_b);
3708        });
3709
3710        fake_language_server
3711            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3712                assert_eq!(
3713                    params.text_document.uri,
3714                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3715                );
3716                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3717                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3718
3719                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
3720                    lsp::CodeAction {
3721                        title: "Inline into all callers".to_string(),
3722                        edit: Some(lsp::WorkspaceEdit {
3723                            changes: Some(
3724                                [
3725                                    (
3726                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3727                                        vec![lsp::TextEdit::new(
3728                                            lsp::Range::new(
3729                                                lsp::Position::new(1, 22),
3730                                                lsp::Position::new(1, 34),
3731                                            ),
3732                                            "4".to_string(),
3733                                        )],
3734                                    ),
3735                                    (
3736                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3737                                        vec![lsp::TextEdit::new(
3738                                            lsp::Range::new(
3739                                                lsp::Position::new(0, 0),
3740                                                lsp::Position::new(0, 27),
3741                                            ),
3742                                            "".to_string(),
3743                                        )],
3744                                    ),
3745                                ]
3746                                .into_iter()
3747                                .collect(),
3748                            ),
3749                            ..Default::default()
3750                        }),
3751                        data: Some(json!({
3752                            "codeActionParams": {
3753                                "range": {
3754                                    "start": {"line": 1, "column": 31},
3755                                    "end": {"line": 1, "column": 31},
3756                                }
3757                            }
3758                        })),
3759                        ..Default::default()
3760                    },
3761                )]))
3762            })
3763            .next()
3764            .await;
3765
3766        // Toggle code actions and wait for them to display.
3767        editor_b.update(cx_b, |editor, cx| {
3768            editor.toggle_code_actions(
3769                &ToggleCodeActions {
3770                    deployed_from_indicator: false,
3771                },
3772                cx,
3773            );
3774        });
3775        editor_b
3776            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3777            .await;
3778
3779        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3780
3781        // Confirming the code action will trigger a resolve request.
3782        let confirm_action = workspace_b
3783            .update(cx_b, |workspace, cx| {
3784                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
3785            })
3786            .unwrap();
3787        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
3788            |_, _| async move {
3789                Ok(lsp::CodeAction {
3790                    title: "Inline into all callers".to_string(),
3791                    edit: Some(lsp::WorkspaceEdit {
3792                        changes: Some(
3793                            [
3794                                (
3795                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3796                                    vec![lsp::TextEdit::new(
3797                                        lsp::Range::new(
3798                                            lsp::Position::new(1, 22),
3799                                            lsp::Position::new(1, 34),
3800                                        ),
3801                                        "4".to_string(),
3802                                    )],
3803                                ),
3804                                (
3805                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
3806                                    vec![lsp::TextEdit::new(
3807                                        lsp::Range::new(
3808                                            lsp::Position::new(0, 0),
3809                                            lsp::Position::new(0, 27),
3810                                        ),
3811                                        "".to_string(),
3812                                    )],
3813                                ),
3814                            ]
3815                            .into_iter()
3816                            .collect(),
3817                        ),
3818                        ..Default::default()
3819                    }),
3820                    ..Default::default()
3821                })
3822            },
3823        );
3824
3825        // After the action is confirmed, an editor containing both modified files is opened.
3826        confirm_action.await.unwrap();
3827        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3828            workspace
3829                .active_item(cx)
3830                .unwrap()
3831                .downcast::<Editor>()
3832                .unwrap()
3833        });
3834        code_action_editor.update(cx_b, |editor, cx| {
3835            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
3836            editor.undo(&Undo, cx);
3837            assert_eq!(
3838                editor.text(cx),
3839                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
3840            );
3841            editor.redo(&Redo, cx);
3842            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
3843        });
3844    }
3845
3846    #[gpui::test(iterations = 10)]
3847    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3848        cx_a.foreground().forbid_parking();
3849        let lang_registry = Arc::new(LanguageRegistry::test());
3850        let fs = FakeFs::new(cx_a.background());
3851        cx_b.update(|cx| editor::init(cx));
3852
3853        // Set up a fake language server.
3854        let mut language = Language::new(
3855            LanguageConfig {
3856                name: "Rust".into(),
3857                path_suffixes: vec!["rs".to_string()],
3858                ..Default::default()
3859            },
3860            Some(tree_sitter_rust::language()),
3861        );
3862        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3863            capabilities: lsp::ServerCapabilities {
3864                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
3865                    prepare_provider: Some(true),
3866                    work_done_progress_options: Default::default(),
3867                })),
3868                ..Default::default()
3869            },
3870            ..Default::default()
3871        });
3872        lang_registry.add(Arc::new(language));
3873
3874        // Connect to a server as 2 clients.
3875        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3876        let client_a = server.create_client(cx_a, "user_a").await;
3877        let client_b = server.create_client(cx_b, "user_b").await;
3878
3879        // Share a project as client A
3880        fs.insert_tree(
3881            "/dir",
3882            json!({
3883                ".zed.toml": r#"collaborators = ["user_b"]"#,
3884                "one.rs": "const ONE: usize = 1;",
3885                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3886            }),
3887        )
3888        .await;
3889        let project_a = cx_a.update(|cx| {
3890            Project::local(
3891                client_a.clone(),
3892                client_a.user_store.clone(),
3893                lang_registry.clone(),
3894                fs.clone(),
3895                cx,
3896            )
3897        });
3898        let (worktree_a, _) = project_a
3899            .update(cx_a, |p, cx| {
3900                p.find_or_create_local_worktree("/dir", true, cx)
3901            })
3902            .await
3903            .unwrap();
3904        worktree_a
3905            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3906            .await;
3907        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3908        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3909        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3910
3911        // Join the worktree as client B.
3912        let project_b = Project::remote(
3913            project_id,
3914            client_b.clone(),
3915            client_b.user_store.clone(),
3916            lang_registry.clone(),
3917            fs.clone(),
3918            &mut cx_b.to_async(),
3919        )
3920        .await
3921        .unwrap();
3922        let mut params = cx_b.update(WorkspaceParams::test);
3923        params.languages = lang_registry.clone();
3924        params.client = client_b.client.clone();
3925        params.user_store = client_b.user_store.clone();
3926        params.project = project_b;
3927
3928        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3929        let editor_b = workspace_b
3930            .update(cx_b, |workspace, cx| {
3931                workspace.open_path((worktree_id, "one.rs"), cx)
3932            })
3933            .await
3934            .unwrap()
3935            .downcast::<Editor>()
3936            .unwrap();
3937        let fake_language_server = fake_language_servers.next().await.unwrap();
3938
3939        // Move cursor to a location that can be renamed.
3940        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3941            editor.select_ranges([7..7], None, cx);
3942            editor.rename(&Rename, cx).unwrap()
3943        });
3944
3945        fake_language_server
3946            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
3947                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3948                assert_eq!(params.position, lsp::Position::new(0, 7));
3949                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3950                    lsp::Position::new(0, 6),
3951                    lsp::Position::new(0, 9),
3952                ))))
3953            })
3954            .next()
3955            .await
3956            .unwrap();
3957        prepare_rename.await.unwrap();
3958        editor_b.update(cx_b, |editor, cx| {
3959            let rename = editor.pending_rename().unwrap();
3960            let buffer = editor.buffer().read(cx).snapshot(cx);
3961            assert_eq!(
3962                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3963                6..9
3964            );
3965            rename.editor.update(cx, |rename_editor, cx| {
3966                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3967                    rename_buffer.edit([0..3], "THREE", cx);
3968                });
3969            });
3970        });
3971
3972        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3973            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3974        });
3975        fake_language_server
3976            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
3977                assert_eq!(
3978                    params.text_document_position.text_document.uri.as_str(),
3979                    "file:///dir/one.rs"
3980                );
3981                assert_eq!(
3982                    params.text_document_position.position,
3983                    lsp::Position::new(0, 6)
3984                );
3985                assert_eq!(params.new_name, "THREE");
3986                Ok(Some(lsp::WorkspaceEdit {
3987                    changes: Some(
3988                        [
3989                            (
3990                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3991                                vec![lsp::TextEdit::new(
3992                                    lsp::Range::new(
3993                                        lsp::Position::new(0, 6),
3994                                        lsp::Position::new(0, 9),
3995                                    ),
3996                                    "THREE".to_string(),
3997                                )],
3998                            ),
3999                            (
4000                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4001                                vec![
4002                                    lsp::TextEdit::new(
4003                                        lsp::Range::new(
4004                                            lsp::Position::new(0, 24),
4005                                            lsp::Position::new(0, 27),
4006                                        ),
4007                                        "THREE".to_string(),
4008                                    ),
4009                                    lsp::TextEdit::new(
4010                                        lsp::Range::new(
4011                                            lsp::Position::new(0, 35),
4012                                            lsp::Position::new(0, 38),
4013                                        ),
4014                                        "THREE".to_string(),
4015                                    ),
4016                                ],
4017                            ),
4018                        ]
4019                        .into_iter()
4020                        .collect(),
4021                    ),
4022                    ..Default::default()
4023                }))
4024            })
4025            .next()
4026            .await
4027            .unwrap();
4028        confirm_rename.await.unwrap();
4029
4030        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4031            workspace
4032                .active_item(cx)
4033                .unwrap()
4034                .downcast::<Editor>()
4035                .unwrap()
4036        });
4037        rename_editor.update(cx_b, |editor, cx| {
4038            assert_eq!(
4039                editor.text(cx),
4040                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4041            );
4042            editor.undo(&Undo, cx);
4043            assert_eq!(
4044                editor.text(cx),
4045                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4046            );
4047            editor.redo(&Redo, cx);
4048            assert_eq!(
4049                editor.text(cx),
4050                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4051            );
4052        });
4053
4054        // Ensure temporary rename edits cannot be undone/redone.
4055        editor_b.update(cx_b, |editor, cx| {
4056            editor.undo(&Undo, cx);
4057            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4058            editor.undo(&Undo, cx);
4059            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4060            editor.redo(&Redo, cx);
4061            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4062        })
4063    }
4064
4065    #[gpui::test(iterations = 10)]
4066    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4067        cx_a.foreground().forbid_parking();
4068
4069        // Connect to a server as 2 clients.
4070        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4071        let client_a = server.create_client(cx_a, "user_a").await;
4072        let client_b = server.create_client(cx_b, "user_b").await;
4073
4074        // Create an org that includes these 2 users.
4075        let db = &server.app_state.db;
4076        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4077        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4078            .await
4079            .unwrap();
4080        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4081            .await
4082            .unwrap();
4083
4084        // Create a channel that includes all the users.
4085        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4086        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4087            .await
4088            .unwrap();
4089        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4090            .await
4091            .unwrap();
4092        db.create_channel_message(
4093            channel_id,
4094            client_b.current_user_id(&cx_b),
4095            "hello A, it's B.",
4096            OffsetDateTime::now_utc(),
4097            1,
4098        )
4099        .await
4100        .unwrap();
4101
4102        let channels_a = cx_a
4103            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4104        channels_a
4105            .condition(cx_a, |list, _| list.available_channels().is_some())
4106            .await;
4107        channels_a.read_with(cx_a, |list, _| {
4108            assert_eq!(
4109                list.available_channels().unwrap(),
4110                &[ChannelDetails {
4111                    id: channel_id.to_proto(),
4112                    name: "test-channel".to_string()
4113                }]
4114            )
4115        });
4116        let channel_a = channels_a.update(cx_a, |this, cx| {
4117            this.get_channel(channel_id.to_proto(), cx).unwrap()
4118        });
4119        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4120        channel_a
4121            .condition(&cx_a, |channel, _| {
4122                channel_messages(channel)
4123                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4124            })
4125            .await;
4126
4127        let channels_b = cx_b
4128            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4129        channels_b
4130            .condition(cx_b, |list, _| list.available_channels().is_some())
4131            .await;
4132        channels_b.read_with(cx_b, |list, _| {
4133            assert_eq!(
4134                list.available_channels().unwrap(),
4135                &[ChannelDetails {
4136                    id: channel_id.to_proto(),
4137                    name: "test-channel".to_string()
4138                }]
4139            )
4140        });
4141
4142        let channel_b = channels_b.update(cx_b, |this, cx| {
4143            this.get_channel(channel_id.to_proto(), cx).unwrap()
4144        });
4145        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4146        channel_b
4147            .condition(&cx_b, |channel, _| {
4148                channel_messages(channel)
4149                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4150            })
4151            .await;
4152
4153        channel_a
4154            .update(cx_a, |channel, cx| {
4155                channel
4156                    .send_message("oh, hi B.".to_string(), cx)
4157                    .unwrap()
4158                    .detach();
4159                let task = channel.send_message("sup".to_string(), cx).unwrap();
4160                assert_eq!(
4161                    channel_messages(channel),
4162                    &[
4163                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4164                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4165                        ("user_a".to_string(), "sup".to_string(), true)
4166                    ]
4167                );
4168                task
4169            })
4170            .await
4171            .unwrap();
4172
4173        channel_b
4174            .condition(&cx_b, |channel, _| {
4175                channel_messages(channel)
4176                    == [
4177                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4178                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4179                        ("user_a".to_string(), "sup".to_string(), false),
4180                    ]
4181            })
4182            .await;
4183
4184        assert_eq!(
4185            server
4186                .state()
4187                .await
4188                .channel(channel_id)
4189                .unwrap()
4190                .connection_ids
4191                .len(),
4192            2
4193        );
4194        cx_b.update(|_| drop(channel_b));
4195        server
4196            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4197            .await;
4198
4199        cx_a.update(|_| drop(channel_a));
4200        server
4201            .condition(|state| state.channel(channel_id).is_none())
4202            .await;
4203    }
4204
4205    #[gpui::test(iterations = 10)]
4206    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4207        cx_a.foreground().forbid_parking();
4208
4209        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4210        let client_a = server.create_client(cx_a, "user_a").await;
4211
4212        let db = &server.app_state.db;
4213        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4214        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4215        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4216            .await
4217            .unwrap();
4218        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4219            .await
4220            .unwrap();
4221
4222        let channels_a = cx_a
4223            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4224        channels_a
4225            .condition(cx_a, |list, _| list.available_channels().is_some())
4226            .await;
4227        let channel_a = channels_a.update(cx_a, |this, cx| {
4228            this.get_channel(channel_id.to_proto(), cx).unwrap()
4229        });
4230
4231        // Messages aren't allowed to be too long.
4232        channel_a
4233            .update(cx_a, |channel, cx| {
4234                let long_body = "this is long.\n".repeat(1024);
4235                channel.send_message(long_body, cx).unwrap()
4236            })
4237            .await
4238            .unwrap_err();
4239
4240        // Messages aren't allowed to be blank.
4241        channel_a.update(cx_a, |channel, cx| {
4242            channel.send_message(String::new(), cx).unwrap_err()
4243        });
4244
4245        // Leading and trailing whitespace are trimmed.
4246        channel_a
4247            .update(cx_a, |channel, cx| {
4248                channel
4249                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4250                    .unwrap()
4251            })
4252            .await
4253            .unwrap();
4254        assert_eq!(
4255            db.get_channel_messages(channel_id, 10, None)
4256                .await
4257                .unwrap()
4258                .iter()
4259                .map(|m| &m.body)
4260                .collect::<Vec<_>>(),
4261            &["surrounded by whitespace"]
4262        );
4263    }
4264
4265    #[gpui::test(iterations = 10)]
4266    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4267        cx_a.foreground().forbid_parking();
4268
4269        // Connect to a server as 2 clients.
4270        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4271        let client_a = server.create_client(cx_a, "user_a").await;
4272        let client_b = server.create_client(cx_b, "user_b").await;
4273        let mut status_b = client_b.status();
4274
4275        // Create an org that includes these 2 users.
4276        let db = &server.app_state.db;
4277        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4278        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4279            .await
4280            .unwrap();
4281        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4282            .await
4283            .unwrap();
4284
4285        // Create a channel that includes all the users.
4286        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4287        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4288            .await
4289            .unwrap();
4290        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4291            .await
4292            .unwrap();
4293        db.create_channel_message(
4294            channel_id,
4295            client_b.current_user_id(&cx_b),
4296            "hello A, it's B.",
4297            OffsetDateTime::now_utc(),
4298            2,
4299        )
4300        .await
4301        .unwrap();
4302
4303        let channels_a = cx_a
4304            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4305        channels_a
4306            .condition(cx_a, |list, _| list.available_channels().is_some())
4307            .await;
4308
4309        channels_a.read_with(cx_a, |list, _| {
4310            assert_eq!(
4311                list.available_channels().unwrap(),
4312                &[ChannelDetails {
4313                    id: channel_id.to_proto(),
4314                    name: "test-channel".to_string()
4315                }]
4316            )
4317        });
4318        let channel_a = channels_a.update(cx_a, |this, cx| {
4319            this.get_channel(channel_id.to_proto(), cx).unwrap()
4320        });
4321        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4322        channel_a
4323            .condition(&cx_a, |channel, _| {
4324                channel_messages(channel)
4325                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4326            })
4327            .await;
4328
4329        let channels_b = cx_b
4330            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4331        channels_b
4332            .condition(cx_b, |list, _| list.available_channels().is_some())
4333            .await;
4334        channels_b.read_with(cx_b, |list, _| {
4335            assert_eq!(
4336                list.available_channels().unwrap(),
4337                &[ChannelDetails {
4338                    id: channel_id.to_proto(),
4339                    name: "test-channel".to_string()
4340                }]
4341            )
4342        });
4343
4344        let channel_b = channels_b.update(cx_b, |this, cx| {
4345            this.get_channel(channel_id.to_proto(), cx).unwrap()
4346        });
4347        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4348        channel_b
4349            .condition(&cx_b, |channel, _| {
4350                channel_messages(channel)
4351                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4352            })
4353            .await;
4354
4355        // Disconnect client B, ensuring we can still access its cached channel data.
4356        server.forbid_connections();
4357        server.disconnect_client(client_b.current_user_id(&cx_b));
4358        cx_b.foreground().advance_clock(Duration::from_secs(3));
4359        while !matches!(
4360            status_b.next().await,
4361            Some(client::Status::ReconnectionError { .. })
4362        ) {}
4363
4364        channels_b.read_with(cx_b, |channels, _| {
4365            assert_eq!(
4366                channels.available_channels().unwrap(),
4367                [ChannelDetails {
4368                    id: channel_id.to_proto(),
4369                    name: "test-channel".to_string()
4370                }]
4371            )
4372        });
4373        channel_b.read_with(cx_b, |channel, _| {
4374            assert_eq!(
4375                channel_messages(channel),
4376                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4377            )
4378        });
4379
4380        // Send a message from client B while it is disconnected.
4381        channel_b
4382            .update(cx_b, |channel, cx| {
4383                let task = channel
4384                    .send_message("can you see this?".to_string(), cx)
4385                    .unwrap();
4386                assert_eq!(
4387                    channel_messages(channel),
4388                    &[
4389                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4390                        ("user_b".to_string(), "can you see this?".to_string(), true)
4391                    ]
4392                );
4393                task
4394            })
4395            .await
4396            .unwrap_err();
4397
4398        // Send a message from client A while B is disconnected.
4399        channel_a
4400            .update(cx_a, |channel, cx| {
4401                channel
4402                    .send_message("oh, hi B.".to_string(), cx)
4403                    .unwrap()
4404                    .detach();
4405                let task = channel.send_message("sup".to_string(), cx).unwrap();
4406                assert_eq!(
4407                    channel_messages(channel),
4408                    &[
4409                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4410                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4411                        ("user_a".to_string(), "sup".to_string(), true)
4412                    ]
4413                );
4414                task
4415            })
4416            .await
4417            .unwrap();
4418
4419        // Give client B a chance to reconnect.
4420        server.allow_connections();
4421        cx_b.foreground().advance_clock(Duration::from_secs(10));
4422
4423        // Verify that B sees the new messages upon reconnection, as well as the message client B
4424        // sent while offline.
4425        channel_b
4426            .condition(&cx_b, |channel, _| {
4427                channel_messages(channel)
4428                    == [
4429                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4430                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4431                        ("user_a".to_string(), "sup".to_string(), false),
4432                        ("user_b".to_string(), "can you see this?".to_string(), false),
4433                    ]
4434            })
4435            .await;
4436
4437        // Ensure client A and B can communicate normally after reconnection.
4438        channel_a
4439            .update(cx_a, |channel, cx| {
4440                channel.send_message("you online?".to_string(), cx).unwrap()
4441            })
4442            .await
4443            .unwrap();
4444        channel_b
4445            .condition(&cx_b, |channel, _| {
4446                channel_messages(channel)
4447                    == [
4448                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4449                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4450                        ("user_a".to_string(), "sup".to_string(), false),
4451                        ("user_b".to_string(), "can you see this?".to_string(), false),
4452                        ("user_a".to_string(), "you online?".to_string(), false),
4453                    ]
4454            })
4455            .await;
4456
4457        channel_b
4458            .update(cx_b, |channel, cx| {
4459                channel.send_message("yep".to_string(), cx).unwrap()
4460            })
4461            .await
4462            .unwrap();
4463        channel_a
4464            .condition(&cx_a, |channel, _| {
4465                channel_messages(channel)
4466                    == [
4467                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4468                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4469                        ("user_a".to_string(), "sup".to_string(), false),
4470                        ("user_b".to_string(), "can you see this?".to_string(), false),
4471                        ("user_a".to_string(), "you online?".to_string(), false),
4472                        ("user_b".to_string(), "yep".to_string(), false),
4473                    ]
4474            })
4475            .await;
4476    }
4477
4478    #[gpui::test(iterations = 10)]
4479    async fn test_contacts(
4480        cx_a: &mut TestAppContext,
4481        cx_b: &mut TestAppContext,
4482        cx_c: &mut TestAppContext,
4483    ) {
4484        cx_a.foreground().forbid_parking();
4485        let lang_registry = Arc::new(LanguageRegistry::test());
4486        let fs = FakeFs::new(cx_a.background());
4487
4488        // Connect to a server as 3 clients.
4489        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4490        let client_a = server.create_client(cx_a, "user_a").await;
4491        let client_b = server.create_client(cx_b, "user_b").await;
4492        let client_c = server.create_client(cx_c, "user_c").await;
4493
4494        // Share a worktree as client A.
4495        fs.insert_tree(
4496            "/a",
4497            json!({
4498                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4499            }),
4500        )
4501        .await;
4502
4503        let project_a = cx_a.update(|cx| {
4504            Project::local(
4505                client_a.clone(),
4506                client_a.user_store.clone(),
4507                lang_registry.clone(),
4508                fs.clone(),
4509                cx,
4510            )
4511        });
4512        let (worktree_a, _) = project_a
4513            .update(cx_a, |p, cx| {
4514                p.find_or_create_local_worktree("/a", true, cx)
4515            })
4516            .await
4517            .unwrap();
4518        worktree_a
4519            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4520            .await;
4521
4522        client_a
4523            .user_store
4524            .condition(&cx_a, |user_store, _| {
4525                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4526            })
4527            .await;
4528        client_b
4529            .user_store
4530            .condition(&cx_b, |user_store, _| {
4531                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4532            })
4533            .await;
4534        client_c
4535            .user_store
4536            .condition(&cx_c, |user_store, _| {
4537                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4538            })
4539            .await;
4540
4541        let project_id = project_a
4542            .update(cx_a, |project, _| project.next_remote_id())
4543            .await;
4544        project_a
4545            .update(cx_a, |project, cx| project.share(cx))
4546            .await
4547            .unwrap();
4548        client_a
4549            .user_store
4550            .condition(&cx_a, |user_store, _| {
4551                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4552            })
4553            .await;
4554        client_b
4555            .user_store
4556            .condition(&cx_b, |user_store, _| {
4557                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4558            })
4559            .await;
4560        client_c
4561            .user_store
4562            .condition(&cx_c, |user_store, _| {
4563                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4564            })
4565            .await;
4566
4567        let _project_b = Project::remote(
4568            project_id,
4569            client_b.clone(),
4570            client_b.user_store.clone(),
4571            lang_registry.clone(),
4572            fs.clone(),
4573            &mut cx_b.to_async(),
4574        )
4575        .await
4576        .unwrap();
4577
4578        client_a
4579            .user_store
4580            .condition(&cx_a, |user_store, _| {
4581                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4582            })
4583            .await;
4584        client_b
4585            .user_store
4586            .condition(&cx_b, |user_store, _| {
4587                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4588            })
4589            .await;
4590        client_c
4591            .user_store
4592            .condition(&cx_c, |user_store, _| {
4593                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4594            })
4595            .await;
4596
4597        project_a
4598            .condition(&cx_a, |project, _| {
4599                project.collaborators().contains_key(&client_b.peer_id)
4600            })
4601            .await;
4602
4603        cx_a.update(move |_| drop(project_a));
4604        client_a
4605            .user_store
4606            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4607            .await;
4608        client_b
4609            .user_store
4610            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4611            .await;
4612        client_c
4613            .user_store
4614            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4615            .await;
4616
4617        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
4618            user_store
4619                .contacts()
4620                .iter()
4621                .map(|contact| {
4622                    let worktrees = contact
4623                        .projects
4624                        .iter()
4625                        .map(|p| {
4626                            (
4627                                p.worktree_root_names[0].as_str(),
4628                                p.is_shared,
4629                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4630                            )
4631                        })
4632                        .collect();
4633                    (contact.user.github_login.as_str(), worktrees)
4634                })
4635                .collect()
4636        }
4637    }
4638
4639    #[gpui::test(iterations = 10)]
4640    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4641        cx_a.foreground().forbid_parking();
4642        let fs = FakeFs::new(cx_a.background());
4643
4644        // 2 clients connect to a server.
4645        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4646        let mut client_a = server.create_client(cx_a, "user_a").await;
4647        let mut client_b = server.create_client(cx_b, "user_b").await;
4648        cx_a.update(editor::init);
4649        cx_b.update(editor::init);
4650
4651        // Client A shares a project.
4652        fs.insert_tree(
4653            "/a",
4654            json!({
4655                ".zed.toml": r#"collaborators = ["user_b"]"#,
4656                "1.txt": "one",
4657                "2.txt": "two",
4658                "3.txt": "three",
4659            }),
4660        )
4661        .await;
4662        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4663        project_a
4664            .update(cx_a, |project, cx| project.share(cx))
4665            .await
4666            .unwrap();
4667
4668        // Client B joins the project.
4669        let project_b = client_b
4670            .build_remote_project(
4671                project_a
4672                    .read_with(cx_a, |project, _| project.remote_id())
4673                    .unwrap(),
4674                cx_b,
4675            )
4676            .await;
4677
4678        // Client A opens some editors.
4679        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4680        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4681        let editor_a1 = workspace_a
4682            .update(cx_a, |workspace, cx| {
4683                workspace.open_path((worktree_id, "1.txt"), cx)
4684            })
4685            .await
4686            .unwrap()
4687            .downcast::<Editor>()
4688            .unwrap();
4689        let editor_a2 = workspace_a
4690            .update(cx_a, |workspace, cx| {
4691                workspace.open_path((worktree_id, "2.txt"), cx)
4692            })
4693            .await
4694            .unwrap()
4695            .downcast::<Editor>()
4696            .unwrap();
4697
4698        // Client B opens an editor.
4699        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4700        let editor_b1 = workspace_b
4701            .update(cx_b, |workspace, cx| {
4702                workspace.open_path((worktree_id, "1.txt"), cx)
4703            })
4704            .await
4705            .unwrap()
4706            .downcast::<Editor>()
4707            .unwrap();
4708
4709        let client_a_id = project_b.read_with(cx_b, |project, _| {
4710            project.collaborators().values().next().unwrap().peer_id
4711        });
4712        let client_b_id = project_a.read_with(cx_a, |project, _| {
4713            project.collaborators().values().next().unwrap().peer_id
4714        });
4715
4716        // When client B starts following client A, all visible view states are replicated to client B.
4717        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4718        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4719        workspace_b
4720            .update(cx_b, |workspace, cx| {
4721                workspace
4722                    .toggle_follow(&ToggleFollow(client_a_id), cx)
4723                    .unwrap()
4724            })
4725            .await
4726            .unwrap();
4727        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4728            workspace
4729                .active_item(cx)
4730                .unwrap()
4731                .downcast::<Editor>()
4732                .unwrap()
4733        });
4734        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4735        assert_eq!(
4736            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4737            Some((worktree_id, "2.txt").into())
4738        );
4739        assert_eq!(
4740            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4741            vec![2..3]
4742        );
4743        assert_eq!(
4744            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4745            vec![0..1]
4746        );
4747
4748        // When client A activates a different editor, client B does so as well.
4749        workspace_a.update(cx_a, |workspace, cx| {
4750            workspace.activate_item(&editor_a1, cx)
4751        });
4752        workspace_b
4753            .condition(cx_b, |workspace, cx| {
4754                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4755            })
4756            .await;
4757
4758        // When client A navigates back and forth, client B does so as well.
4759        workspace_a
4760            .update(cx_a, |workspace, cx| {
4761                workspace::Pane::go_back(workspace, None, cx)
4762            })
4763            .await;
4764        workspace_b
4765            .condition(cx_b, |workspace, cx| {
4766                workspace.active_item(cx).unwrap().id() == editor_b2.id()
4767            })
4768            .await;
4769
4770        workspace_a
4771            .update(cx_a, |workspace, cx| {
4772                workspace::Pane::go_forward(workspace, None, cx)
4773            })
4774            .await;
4775        workspace_b
4776            .condition(cx_b, |workspace, cx| {
4777                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4778            })
4779            .await;
4780
4781        // Changes to client A's editor are reflected on client B.
4782        editor_a1.update(cx_a, |editor, cx| {
4783            editor.select_ranges([1..1, 2..2], None, cx);
4784        });
4785        editor_b1
4786            .condition(cx_b, |editor, cx| {
4787                editor.selected_ranges(cx) == vec![1..1, 2..2]
4788            })
4789            .await;
4790
4791        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
4792        editor_b1
4793            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
4794            .await;
4795
4796        editor_a1.update(cx_a, |editor, cx| {
4797            editor.select_ranges([3..3], None, cx);
4798            editor.set_scroll_position(vec2f(0., 100.), cx);
4799        });
4800        editor_b1
4801            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
4802            .await;
4803
4804        // After unfollowing, client B stops receiving updates from client A.
4805        workspace_b.update(cx_b, |workspace, cx| {
4806            workspace.unfollow(&workspace.active_pane().clone(), cx)
4807        });
4808        workspace_a.update(cx_a, |workspace, cx| {
4809            workspace.activate_item(&editor_a2, cx)
4810        });
4811        cx_a.foreground().run_until_parked();
4812        assert_eq!(
4813            workspace_b.read_with(cx_b, |workspace, cx| workspace
4814                .active_item(cx)
4815                .unwrap()
4816                .id()),
4817            editor_b1.id()
4818        );
4819
4820        // Client A starts following client B.
4821        workspace_a
4822            .update(cx_a, |workspace, cx| {
4823                workspace
4824                    .toggle_follow(&ToggleFollow(client_b_id), cx)
4825                    .unwrap()
4826            })
4827            .await
4828            .unwrap();
4829        assert_eq!(
4830            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4831            Some(client_b_id)
4832        );
4833        assert_eq!(
4834            workspace_a.read_with(cx_a, |workspace, cx| workspace
4835                .active_item(cx)
4836                .unwrap()
4837                .id()),
4838            editor_a1.id()
4839        );
4840
4841        // Following interrupts when client B disconnects.
4842        client_b.disconnect(&cx_b.to_async()).unwrap();
4843        cx_a.foreground().run_until_parked();
4844        assert_eq!(
4845            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4846            None
4847        );
4848    }
4849
4850    #[gpui::test(iterations = 10)]
4851    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4852        cx_a.foreground().forbid_parking();
4853        let fs = FakeFs::new(cx_a.background());
4854
4855        // 2 clients connect to a server.
4856        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4857        let mut client_a = server.create_client(cx_a, "user_a").await;
4858        let mut client_b = server.create_client(cx_b, "user_b").await;
4859        cx_a.update(editor::init);
4860        cx_b.update(editor::init);
4861
4862        // Client A shares a project.
4863        fs.insert_tree(
4864            "/a",
4865            json!({
4866                ".zed.toml": r#"collaborators = ["user_b"]"#,
4867                "1.txt": "one",
4868                "2.txt": "two",
4869                "3.txt": "three",
4870                "4.txt": "four",
4871            }),
4872        )
4873        .await;
4874        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4875        project_a
4876            .update(cx_a, |project, cx| project.share(cx))
4877            .await
4878            .unwrap();
4879
4880        // Client B joins the project.
4881        let project_b = client_b
4882            .build_remote_project(
4883                project_a
4884                    .read_with(cx_a, |project, _| project.remote_id())
4885                    .unwrap(),
4886                cx_b,
4887            )
4888            .await;
4889
4890        // Client A opens some editors.
4891        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4892        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4893        let _editor_a1 = workspace_a
4894            .update(cx_a, |workspace, cx| {
4895                workspace.open_path((worktree_id, "1.txt"), cx)
4896            })
4897            .await
4898            .unwrap()
4899            .downcast::<Editor>()
4900            .unwrap();
4901
4902        // Client B opens an editor.
4903        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4904        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
4905        let _editor_b1 = workspace_b
4906            .update(cx_b, |workspace, cx| {
4907                workspace.open_path((worktree_id, "2.txt"), cx)
4908            })
4909            .await
4910            .unwrap()
4911            .downcast::<Editor>()
4912            .unwrap();
4913
4914        // Clients A and B follow each other in split panes
4915        workspace_a
4916            .update(cx_a, |workspace, cx| {
4917                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4918                assert_ne!(*workspace.active_pane(), pane_a1);
4919                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
4920                workspace
4921                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4922                    .unwrap()
4923            })
4924            .await
4925            .unwrap();
4926        workspace_b
4927            .update(cx_b, |workspace, cx| {
4928                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4929                assert_ne!(*workspace.active_pane(), pane_b1);
4930                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
4931                workspace
4932                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4933                    .unwrap()
4934            })
4935            .await
4936            .unwrap();
4937
4938        workspace_a
4939            .update(cx_a, |workspace, cx| {
4940                workspace.activate_next_pane(cx);
4941                assert_eq!(*workspace.active_pane(), pane_a1);
4942                workspace.open_path((worktree_id, "3.txt"), cx)
4943            })
4944            .await
4945            .unwrap();
4946        workspace_b
4947            .update(cx_b, |workspace, cx| {
4948                workspace.activate_next_pane(cx);
4949                assert_eq!(*workspace.active_pane(), pane_b1);
4950                workspace.open_path((worktree_id, "4.txt"), cx)
4951            })
4952            .await
4953            .unwrap();
4954        cx_a.foreground().run_until_parked();
4955
4956        // Ensure leader updates don't change the active pane of followers
4957        workspace_a.read_with(cx_a, |workspace, _| {
4958            assert_eq!(*workspace.active_pane(), pane_a1);
4959        });
4960        workspace_b.read_with(cx_b, |workspace, _| {
4961            assert_eq!(*workspace.active_pane(), pane_b1);
4962        });
4963
4964        // Ensure peers following each other doesn't cause an infinite loop.
4965        assert_eq!(
4966            workspace_a.read_with(cx_a, |workspace, cx| workspace
4967                .active_item(cx)
4968                .unwrap()
4969                .project_path(cx)),
4970            Some((worktree_id, "3.txt").into())
4971        );
4972        workspace_a.update(cx_a, |workspace, cx| {
4973            assert_eq!(
4974                workspace.active_item(cx).unwrap().project_path(cx),
4975                Some((worktree_id, "3.txt").into())
4976            );
4977            workspace.activate_next_pane(cx);
4978            assert_eq!(
4979                workspace.active_item(cx).unwrap().project_path(cx),
4980                Some((worktree_id, "4.txt").into())
4981            );
4982        });
4983        workspace_b.update(cx_b, |workspace, cx| {
4984            assert_eq!(
4985                workspace.active_item(cx).unwrap().project_path(cx),
4986                Some((worktree_id, "4.txt").into())
4987            );
4988            workspace.activate_next_pane(cx);
4989            assert_eq!(
4990                workspace.active_item(cx).unwrap().project_path(cx),
4991                Some((worktree_id, "3.txt").into())
4992            );
4993        });
4994    }
4995
4996    #[gpui::test(iterations = 10)]
4997    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4998        cx_a.foreground().forbid_parking();
4999        let fs = FakeFs::new(cx_a.background());
5000
5001        // 2 clients connect to a server.
5002        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5003        let mut client_a = server.create_client(cx_a, "user_a").await;
5004        let mut client_b = server.create_client(cx_b, "user_b").await;
5005        cx_a.update(editor::init);
5006        cx_b.update(editor::init);
5007
5008        // Client A shares a project.
5009        fs.insert_tree(
5010            "/a",
5011            json!({
5012                ".zed.toml": r#"collaborators = ["user_b"]"#,
5013                "1.txt": "one",
5014                "2.txt": "two",
5015                "3.txt": "three",
5016            }),
5017        )
5018        .await;
5019        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5020        project_a
5021            .update(cx_a, |project, cx| project.share(cx))
5022            .await
5023            .unwrap();
5024
5025        // Client B joins the project.
5026        let project_b = client_b
5027            .build_remote_project(
5028                project_a
5029                    .read_with(cx_a, |project, _| project.remote_id())
5030                    .unwrap(),
5031                cx_b,
5032            )
5033            .await;
5034
5035        // Client A opens some editors.
5036        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5037        let _editor_a1 = workspace_a
5038            .update(cx_a, |workspace, cx| {
5039                workspace.open_path((worktree_id, "1.txt"), cx)
5040            })
5041            .await
5042            .unwrap()
5043            .downcast::<Editor>()
5044            .unwrap();
5045
5046        // Client B starts following client A.
5047        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5048        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5049        let leader_id = project_b.read_with(cx_b, |project, _| {
5050            project.collaborators().values().next().unwrap().peer_id
5051        });
5052        workspace_b
5053            .update(cx_b, |workspace, cx| {
5054                workspace
5055                    .toggle_follow(&ToggleFollow(leader_id), cx)
5056                    .unwrap()
5057            })
5058            .await
5059            .unwrap();
5060        assert_eq!(
5061            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5062            Some(leader_id)
5063        );
5064        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5065            workspace
5066                .active_item(cx)
5067                .unwrap()
5068                .downcast::<Editor>()
5069                .unwrap()
5070        });
5071
5072        // When client B moves, it automatically stops following client A.
5073        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5074        assert_eq!(
5075            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5076            None
5077        );
5078
5079        workspace_b
5080            .update(cx_b, |workspace, cx| {
5081                workspace
5082                    .toggle_follow(&ToggleFollow(leader_id), cx)
5083                    .unwrap()
5084            })
5085            .await
5086            .unwrap();
5087        assert_eq!(
5088            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5089            Some(leader_id)
5090        );
5091
5092        // When client B edits, it automatically stops following client A.
5093        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5094        assert_eq!(
5095            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5096            None
5097        );
5098
5099        workspace_b
5100            .update(cx_b, |workspace, cx| {
5101                workspace
5102                    .toggle_follow(&ToggleFollow(leader_id), cx)
5103                    .unwrap()
5104            })
5105            .await
5106            .unwrap();
5107        assert_eq!(
5108            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5109            Some(leader_id)
5110        );
5111
5112        // When client B scrolls, it automatically stops following client A.
5113        editor_b2.update(cx_b, |editor, cx| {
5114            editor.set_scroll_position(vec2f(0., 3.), cx)
5115        });
5116        assert_eq!(
5117            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5118            None
5119        );
5120
5121        workspace_b
5122            .update(cx_b, |workspace, cx| {
5123                workspace
5124                    .toggle_follow(&ToggleFollow(leader_id), cx)
5125                    .unwrap()
5126            })
5127            .await
5128            .unwrap();
5129        assert_eq!(
5130            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5131            Some(leader_id)
5132        );
5133
5134        // When client B activates a different pane, it continues following client A in the original pane.
5135        workspace_b.update(cx_b, |workspace, cx| {
5136            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5137        });
5138        assert_eq!(
5139            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5140            Some(leader_id)
5141        );
5142
5143        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5144        assert_eq!(
5145            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5146            Some(leader_id)
5147        );
5148
5149        // When client B activates a different item in the original pane, it automatically stops following client A.
5150        workspace_b
5151            .update(cx_b, |workspace, cx| {
5152                workspace.open_path((worktree_id, "2.txt"), cx)
5153            })
5154            .await
5155            .unwrap();
5156        assert_eq!(
5157            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5158            None
5159        );
5160    }
5161
5162    #[gpui::test(iterations = 100)]
5163    async fn test_random_collaboration(
5164        cx: &mut TestAppContext,
5165        deterministic: Arc<Deterministic>,
5166        rng: StdRng,
5167    ) {
5168        cx.foreground().forbid_parking();
5169        let max_peers = env::var("MAX_PEERS")
5170            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5171            .unwrap_or(5);
5172        assert!(max_peers <= 5);
5173
5174        let max_operations = env::var("OPERATIONS")
5175            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5176            .unwrap_or(10);
5177
5178        let rng = Arc::new(Mutex::new(rng));
5179
5180        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5181        let host_language_registry = Arc::new(LanguageRegistry::test());
5182
5183        let fs = FakeFs::new(cx.background());
5184        fs.insert_tree(
5185            "/_collab",
5186            json!({
5187                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5188            }),
5189        )
5190        .await;
5191
5192        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5193        let mut clients = Vec::new();
5194        let mut user_ids = Vec::new();
5195        let mut op_start_signals = Vec::new();
5196        let files = Arc::new(Mutex::new(Vec::new()));
5197
5198        let mut next_entity_id = 100000;
5199        let mut host_cx = TestAppContext::new(
5200            cx.foreground_platform(),
5201            cx.platform(),
5202            deterministic.build_foreground(next_entity_id),
5203            deterministic.build_background(),
5204            cx.font_cache(),
5205            cx.leak_detector(),
5206            next_entity_id,
5207        );
5208        let host = server.create_client(&mut host_cx, "host").await;
5209        let host_project = host_cx.update(|cx| {
5210            Project::local(
5211                host.client.clone(),
5212                host.user_store.clone(),
5213                host_language_registry.clone(),
5214                fs.clone(),
5215                cx,
5216            )
5217        });
5218        let host_project_id = host_project
5219            .update(&mut host_cx, |p, _| p.next_remote_id())
5220            .await;
5221
5222        let (collab_worktree, _) = host_project
5223            .update(&mut host_cx, |project, cx| {
5224                project.find_or_create_local_worktree("/_collab", true, cx)
5225            })
5226            .await
5227            .unwrap();
5228        collab_worktree
5229            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5230            .await;
5231        host_project
5232            .update(&mut host_cx, |project, cx| project.share(cx))
5233            .await
5234            .unwrap();
5235
5236        // Set up fake language servers.
5237        let mut language = Language::new(
5238            LanguageConfig {
5239                name: "Rust".into(),
5240                path_suffixes: vec!["rs".to_string()],
5241                ..Default::default()
5242            },
5243            None,
5244        );
5245        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5246            name: "the-fake-language-server",
5247            capabilities: lsp::LanguageServer::full_capabilities(),
5248            initializer: Some(Box::new({
5249                let rng = rng.clone();
5250                let files = files.clone();
5251                let project = host_project.downgrade();
5252                move |fake_server: &mut FakeLanguageServer| {
5253                    fake_server.handle_request::<lsp::request::Completion, _, _>(
5254                        |_, _| async move {
5255                            Ok(Some(lsp::CompletionResponse::Array(vec![
5256                                lsp::CompletionItem {
5257                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5258                                        range: lsp::Range::new(
5259                                            lsp::Position::new(0, 0),
5260                                            lsp::Position::new(0, 0),
5261                                        ),
5262                                        new_text: "the-new-text".to_string(),
5263                                    })),
5264                                    ..Default::default()
5265                                },
5266                            ])))
5267                        },
5268                    );
5269
5270                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5271                        |_, _| async move {
5272                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5273                                lsp::CodeAction {
5274                                    title: "the-code-action".to_string(),
5275                                    ..Default::default()
5276                                },
5277                            )]))
5278                        },
5279                    );
5280
5281                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5282                        |params, _| async move {
5283                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5284                                params.position,
5285                                params.position,
5286                            ))))
5287                        },
5288                    );
5289
5290                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5291                        let files = files.clone();
5292                        let rng = rng.clone();
5293                        move |_, _| {
5294                            let files = files.clone();
5295                            let rng = rng.clone();
5296                            async move {
5297                                let files = files.lock();
5298                                let mut rng = rng.lock();
5299                                let count = rng.gen_range::<usize, _>(1..3);
5300                                let files = (0..count)
5301                                    .map(|_| files.choose(&mut *rng).unwrap())
5302                                    .collect::<Vec<_>>();
5303                                log::info!("LSP: Returning definitions in files {:?}", &files);
5304                                Ok(Some(lsp::GotoDefinitionResponse::Array(
5305                                    files
5306                                        .into_iter()
5307                                        .map(|file| lsp::Location {
5308                                            uri: lsp::Url::from_file_path(file).unwrap(),
5309                                            range: Default::default(),
5310                                        })
5311                                        .collect(),
5312                                )))
5313                            }
5314                        }
5315                    });
5316
5317                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5318                        let rng = rng.clone();
5319                        let project = project.clone();
5320                        move |params, mut cx| {
5321                            let highlights = if let Some(project) = project.upgrade(&cx) {
5322                                project.update(&mut cx, |project, cx| {
5323                                    let path = params
5324                                        .text_document_position_params
5325                                        .text_document
5326                                        .uri
5327                                        .to_file_path()
5328                                        .unwrap();
5329                                    let (worktree, relative_path) =
5330                                        project.find_local_worktree(&path, cx)?;
5331                                    let project_path =
5332                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
5333                                    let buffer =
5334                                        project.get_open_buffer(&project_path, cx)?.read(cx);
5335
5336                                    let mut highlights = Vec::new();
5337                                    let highlight_count = rng.lock().gen_range(1..=5);
5338                                    let mut prev_end = 0;
5339                                    for _ in 0..highlight_count {
5340                                        let range =
5341                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
5342
5343                                        highlights.push(lsp::DocumentHighlight {
5344                                            range: range_to_lsp(range.to_point_utf16(buffer)),
5345                                            kind: Some(lsp::DocumentHighlightKind::READ),
5346                                        });
5347                                        prev_end = range.end;
5348                                    }
5349                                    Some(highlights)
5350                                })
5351                            } else {
5352                                None
5353                            };
5354                            async move { Ok(highlights) }
5355                        }
5356                    });
5357                }
5358            })),
5359            ..Default::default()
5360        });
5361        host_language_registry.add(Arc::new(language));
5362
5363        let op_start_signal = futures::channel::mpsc::unbounded();
5364        user_ids.push(host.current_user_id(&host_cx));
5365        op_start_signals.push(op_start_signal.0);
5366        clients.push(host_cx.foreground().spawn(host.simulate_host(
5367            host_project,
5368            files,
5369            op_start_signal.1,
5370            rng.clone(),
5371            host_cx,
5372        )));
5373
5374        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5375            rng.lock().gen_range(0..max_operations)
5376        } else {
5377            max_operations
5378        };
5379        let mut available_guests = vec![
5380            "guest-1".to_string(),
5381            "guest-2".to_string(),
5382            "guest-3".to_string(),
5383            "guest-4".to_string(),
5384        ];
5385        let mut operations = 0;
5386        while operations < max_operations {
5387            if operations == disconnect_host_at {
5388                server.disconnect_client(user_ids[0]);
5389                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5390                drop(op_start_signals);
5391                let mut clients = futures::future::join_all(clients).await;
5392                cx.foreground().run_until_parked();
5393
5394                let (host, mut host_cx, host_err) = clients.remove(0);
5395                if let Some(host_err) = host_err {
5396                    log::error!("host error - {}", host_err);
5397                }
5398                host.project
5399                    .as_ref()
5400                    .unwrap()
5401                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5402                for (guest, mut guest_cx, guest_err) in clients {
5403                    if let Some(guest_err) = guest_err {
5404                        log::error!("{} error - {}", guest.username, guest_err);
5405                    }
5406                    let contacts = server
5407                        .store
5408                        .read()
5409                        .await
5410                        .contacts_for_user(guest.current_user_id(&guest_cx));
5411                    assert!(!contacts
5412                        .iter()
5413                        .flat_map(|contact| &contact.projects)
5414                        .any(|project| project.id == host_project_id));
5415                    guest
5416                        .project
5417                        .as_ref()
5418                        .unwrap()
5419                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5420                    guest_cx.update(|_| drop(guest));
5421                }
5422                host_cx.update(|_| drop(host));
5423
5424                return;
5425            }
5426
5427            let distribution = rng.lock().gen_range(0..100);
5428            match distribution {
5429                0..=19 if !available_guests.is_empty() => {
5430                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
5431                    let guest_username = available_guests.remove(guest_ix);
5432                    log::info!("Adding new connection for {}", guest_username);
5433                    next_entity_id += 100000;
5434                    let mut guest_cx = TestAppContext::new(
5435                        cx.foreground_platform(),
5436                        cx.platform(),
5437                        deterministic.build_foreground(next_entity_id),
5438                        deterministic.build_background(),
5439                        cx.font_cache(),
5440                        cx.leak_detector(),
5441                        next_entity_id,
5442                    );
5443                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
5444                    let guest_project = Project::remote(
5445                        host_project_id,
5446                        guest.client.clone(),
5447                        guest.user_store.clone(),
5448                        guest_lang_registry.clone(),
5449                        FakeFs::new(cx.background()),
5450                        &mut guest_cx.to_async(),
5451                    )
5452                    .await
5453                    .unwrap();
5454                    let op_start_signal = futures::channel::mpsc::unbounded();
5455                    user_ids.push(guest.current_user_id(&guest_cx));
5456                    op_start_signals.push(op_start_signal.0);
5457                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5458                        guest_username.clone(),
5459                        guest_project,
5460                        op_start_signal.1,
5461                        rng.clone(),
5462                        guest_cx,
5463                    )));
5464
5465                    log::info!("Added connection for {}", guest_username);
5466                    operations += 1;
5467                }
5468                20..=29 if clients.len() > 1 => {
5469                    log::info!("Removing guest");
5470                    let guest_ix = rng.lock().gen_range(1..clients.len());
5471                    let removed_guest_id = user_ids.remove(guest_ix);
5472                    let guest = clients.remove(guest_ix);
5473                    op_start_signals.remove(guest_ix);
5474                    server.disconnect_client(removed_guest_id);
5475                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5476                    let (guest, mut guest_cx, guest_err) = guest.await;
5477                    if let Some(guest_err) = guest_err {
5478                        log::error!("{} error - {}", guest.username, guest_err);
5479                    }
5480                    guest
5481                        .project
5482                        .as_ref()
5483                        .unwrap()
5484                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5485                    for user_id in &user_ids {
5486                        for contact in server.store.read().await.contacts_for_user(*user_id) {
5487                            assert_ne!(
5488                                contact.user_id, removed_guest_id.0 as u64,
5489                                "removed guest is still a contact of another peer"
5490                            );
5491                            for project in contact.projects {
5492                                for project_guest_id in project.guests {
5493                                    assert_ne!(
5494                                        project_guest_id, removed_guest_id.0 as u64,
5495                                        "removed guest appears as still participating on a project"
5496                                    );
5497                                }
5498                            }
5499                        }
5500                    }
5501
5502                    log::info!("{} removed", guest.username);
5503                    available_guests.push(guest.username.clone());
5504                    guest_cx.update(|_| drop(guest));
5505
5506                    operations += 1;
5507                }
5508                _ => {
5509                    while operations < max_operations && rng.lock().gen_bool(0.7) {
5510                        op_start_signals
5511                            .choose(&mut *rng.lock())
5512                            .unwrap()
5513                            .unbounded_send(())
5514                            .unwrap();
5515                        operations += 1;
5516                    }
5517
5518                    if rng.lock().gen_bool(0.8) {
5519                        cx.foreground().run_until_parked();
5520                    }
5521                }
5522            }
5523        }
5524
5525        drop(op_start_signals);
5526        let mut clients = futures::future::join_all(clients).await;
5527        cx.foreground().run_until_parked();
5528
5529        let (host_client, mut host_cx, host_err) = clients.remove(0);
5530        if let Some(host_err) = host_err {
5531            panic!("host error - {}", host_err);
5532        }
5533        let host_project = host_client.project.as_ref().unwrap();
5534        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5535            project
5536                .worktrees(cx)
5537                .map(|worktree| {
5538                    let snapshot = worktree.read(cx).snapshot();
5539                    (snapshot.id(), snapshot)
5540                })
5541                .collect::<BTreeMap<_, _>>()
5542        });
5543
5544        host_client
5545            .project
5546            .as_ref()
5547            .unwrap()
5548            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5549
5550        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5551            if let Some(guest_err) = guest_err {
5552                panic!("{} error - {}", guest_client.username, guest_err);
5553            }
5554            let worktree_snapshots =
5555                guest_client
5556                    .project
5557                    .as_ref()
5558                    .unwrap()
5559                    .read_with(&guest_cx, |project, cx| {
5560                        project
5561                            .worktrees(cx)
5562                            .map(|worktree| {
5563                                let worktree = worktree.read(cx);
5564                                (worktree.id(), worktree.snapshot())
5565                            })
5566                            .collect::<BTreeMap<_, _>>()
5567                    });
5568
5569            assert_eq!(
5570                worktree_snapshots.keys().collect::<Vec<_>>(),
5571                host_worktree_snapshots.keys().collect::<Vec<_>>(),
5572                "{} has different worktrees than the host",
5573                guest_client.username
5574            );
5575            for (id, host_snapshot) in &host_worktree_snapshots {
5576                let guest_snapshot = &worktree_snapshots[id];
5577                assert_eq!(
5578                    guest_snapshot.root_name(),
5579                    host_snapshot.root_name(),
5580                    "{} has different root name than the host for worktree {}",
5581                    guest_client.username,
5582                    id
5583                );
5584                assert_eq!(
5585                    guest_snapshot.entries(false).collect::<Vec<_>>(),
5586                    host_snapshot.entries(false).collect::<Vec<_>>(),
5587                    "{} has different snapshot than the host for worktree {}",
5588                    guest_client.username,
5589                    id
5590                );
5591            }
5592
5593            guest_client
5594                .project
5595                .as_ref()
5596                .unwrap()
5597                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5598
5599            for guest_buffer in &guest_client.buffers {
5600                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5601                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5602                    project.buffer_for_id(buffer_id, cx).expect(&format!(
5603                        "host does not have buffer for guest:{}, peer:{}, id:{}",
5604                        guest_client.username, guest_client.peer_id, buffer_id
5605                    ))
5606                });
5607                let path = host_buffer
5608                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5609
5610                assert_eq!(
5611                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5612                    0,
5613                    "{}, buffer {}, path {:?} has deferred operations",
5614                    guest_client.username,
5615                    buffer_id,
5616                    path,
5617                );
5618                assert_eq!(
5619                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5620                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5621                    "{}, buffer {}, path {:?}, differs from the host's buffer",
5622                    guest_client.username,
5623                    buffer_id,
5624                    path
5625                );
5626            }
5627
5628            guest_cx.update(|_| drop(guest_client));
5629        }
5630
5631        host_cx.update(|_| drop(host_client));
5632    }
5633
5634    struct TestServer {
5635        peer: Arc<Peer>,
5636        app_state: Arc<AppState>,
5637        server: Arc<Server>,
5638        foreground: Rc<executor::Foreground>,
5639        notifications: mpsc::UnboundedReceiver<()>,
5640        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5641        forbid_connections: Arc<AtomicBool>,
5642        _test_db: TestDb,
5643    }
5644
5645    impl TestServer {
5646        async fn start(
5647            foreground: Rc<executor::Foreground>,
5648            background: Arc<executor::Background>,
5649        ) -> Self {
5650            let test_db = TestDb::fake(background);
5651            let app_state = Self::build_app_state(&test_db).await;
5652            let peer = Peer::new();
5653            let notifications = mpsc::unbounded();
5654            let server = Server::new(app_state.clone(), Some(notifications.0));
5655            Self {
5656                peer,
5657                app_state,
5658                server,
5659                foreground,
5660                notifications: notifications.1,
5661                connection_killers: Default::default(),
5662                forbid_connections: Default::default(),
5663                _test_db: test_db,
5664            }
5665        }
5666
5667        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
5668            cx.update(|cx| {
5669                let settings = Settings::test(cx);
5670                cx.set_global(settings);
5671            });
5672
5673            let http = FakeHttpClient::with_404_response();
5674            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
5675            let client_name = name.to_string();
5676            let mut client = Client::new(http.clone());
5677            let server = self.server.clone();
5678            let connection_killers = self.connection_killers.clone();
5679            let forbid_connections = self.forbid_connections.clone();
5680            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
5681
5682            Arc::get_mut(&mut client)
5683                .unwrap()
5684                .override_authenticate(move |cx| {
5685                    cx.spawn(|_| async move {
5686                        let access_token = "the-token".to_string();
5687                        Ok(Credentials {
5688                            user_id: user_id.0 as u64,
5689                            access_token,
5690                        })
5691                    })
5692                })
5693                .override_establish_connection(move |credentials, cx| {
5694                    assert_eq!(credentials.user_id, user_id.0 as u64);
5695                    assert_eq!(credentials.access_token, "the-token");
5696
5697                    let server = server.clone();
5698                    let connection_killers = connection_killers.clone();
5699                    let forbid_connections = forbid_connections.clone();
5700                    let client_name = client_name.clone();
5701                    let connection_id_tx = connection_id_tx.clone();
5702                    cx.spawn(move |cx| async move {
5703                        if forbid_connections.load(SeqCst) {
5704                            Err(EstablishConnectionError::other(anyhow!(
5705                                "server is forbidding connections"
5706                            )))
5707                        } else {
5708                            let (client_conn, server_conn, killed) =
5709                                Connection::in_memory(cx.background());
5710                            connection_killers.lock().insert(user_id, killed);
5711                            cx.background()
5712                                .spawn(server.handle_connection(
5713                                    server_conn,
5714                                    client_name,
5715                                    user_id,
5716                                    Some(connection_id_tx),
5717                                    cx.background(),
5718                                ))
5719                                .detach();
5720                            Ok(client_conn)
5721                        }
5722                    })
5723                });
5724
5725            client
5726                .authenticate_and_connect(false, &cx.to_async())
5727                .await
5728                .unwrap();
5729
5730            Channel::init(&client);
5731            Project::init(&client);
5732            cx.update(|cx| {
5733                workspace::init(&client, cx);
5734            });
5735
5736            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5737            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5738
5739            let client = TestClient {
5740                client,
5741                peer_id,
5742                username: name.to_string(),
5743                user_store,
5744                language_registry: Arc::new(LanguageRegistry::test()),
5745                project: Default::default(),
5746                buffers: Default::default(),
5747            };
5748            client.wait_for_current_user(cx).await;
5749            client
5750        }
5751
5752        fn disconnect_client(&self, user_id: UserId) {
5753            self.connection_killers
5754                .lock()
5755                .remove(&user_id)
5756                .unwrap()
5757                .store(true, SeqCst);
5758        }
5759
5760        fn forbid_connections(&self) {
5761            self.forbid_connections.store(true, SeqCst);
5762        }
5763
5764        fn allow_connections(&self) {
5765            self.forbid_connections.store(false, SeqCst);
5766        }
5767
5768        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
5769            Arc::new(AppState {
5770                db: test_db.db().clone(),
5771                api_token: Default::default(),
5772            })
5773        }
5774
5775        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
5776            self.server.store.read().await
5777        }
5778
5779        async fn condition<F>(&mut self, mut predicate: F)
5780        where
5781            F: FnMut(&Store) -> bool,
5782        {
5783            assert!(
5784                self.foreground.parking_forbidden(),
5785                "you must call forbid_parking to use server conditions so we don't block indefinitely"
5786            );
5787            while !(predicate)(&*self.server.store.read().await) {
5788                self.foreground.start_waiting();
5789                self.notifications.next().await;
5790                self.foreground.finish_waiting();
5791            }
5792        }
5793    }
5794
5795    impl Deref for TestServer {
5796        type Target = Server;
5797
5798        fn deref(&self) -> &Self::Target {
5799            &self.server
5800        }
5801    }
5802
5803    impl Drop for TestServer {
5804        fn drop(&mut self) {
5805            self.peer.reset();
5806        }
5807    }
5808
5809    struct TestClient {
5810        client: Arc<Client>,
5811        username: String,
5812        pub peer_id: PeerId,
5813        pub user_store: ModelHandle<UserStore>,
5814        language_registry: Arc<LanguageRegistry>,
5815        project: Option<ModelHandle<Project>>,
5816        buffers: HashSet<ModelHandle<language::Buffer>>,
5817    }
5818
5819    impl Deref for TestClient {
5820        type Target = Arc<Client>;
5821
5822        fn deref(&self) -> &Self::Target {
5823            &self.client
5824        }
5825    }
5826
5827    impl TestClient {
5828        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
5829            UserId::from_proto(
5830                self.user_store
5831                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
5832            )
5833        }
5834
5835        async fn wait_for_current_user(&self, cx: &TestAppContext) {
5836            let mut authed_user = self
5837                .user_store
5838                .read_with(cx, |user_store, _| user_store.watch_current_user());
5839            while authed_user.next().await.unwrap().is_none() {}
5840        }
5841
5842        async fn build_local_project(
5843            &mut self,
5844            fs: Arc<FakeFs>,
5845            root_path: impl AsRef<Path>,
5846            cx: &mut TestAppContext,
5847        ) -> (ModelHandle<Project>, WorktreeId) {
5848            let project = cx.update(|cx| {
5849                Project::local(
5850                    self.client.clone(),
5851                    self.user_store.clone(),
5852                    self.language_registry.clone(),
5853                    fs,
5854                    cx,
5855                )
5856            });
5857            self.project = Some(project.clone());
5858            let (worktree, _) = project
5859                .update(cx, |p, cx| {
5860                    p.find_or_create_local_worktree(root_path, true, cx)
5861                })
5862                .await
5863                .unwrap();
5864            worktree
5865                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
5866                .await;
5867            project
5868                .update(cx, |project, _| project.next_remote_id())
5869                .await;
5870            (project, worktree.read_with(cx, |tree, _| tree.id()))
5871        }
5872
5873        async fn build_remote_project(
5874            &mut self,
5875            project_id: u64,
5876            cx: &mut TestAppContext,
5877        ) -> ModelHandle<Project> {
5878            let project = Project::remote(
5879                project_id,
5880                self.client.clone(),
5881                self.user_store.clone(),
5882                self.language_registry.clone(),
5883                FakeFs::new(cx.background()),
5884                &mut cx.to_async(),
5885            )
5886            .await
5887            .unwrap();
5888            self.project = Some(project.clone());
5889            project
5890        }
5891
5892        fn build_workspace(
5893            &self,
5894            project: &ModelHandle<Project>,
5895            cx: &mut TestAppContext,
5896        ) -> ViewHandle<Workspace> {
5897            let (window_id, _) = cx.add_window(|_| EmptyView);
5898            cx.add_view(window_id, |cx| {
5899                let fs = project.read(cx).fs().clone();
5900                Workspace::new(
5901                    &WorkspaceParams {
5902                        fs,
5903                        project: project.clone(),
5904                        user_store: self.user_store.clone(),
5905                        languages: self.language_registry.clone(),
5906                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
5907                        channel_list: cx.add_model(|cx| {
5908                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
5909                        }),
5910                        client: self.client.clone(),
5911                    },
5912                    cx,
5913                )
5914            })
5915        }
5916
5917        async fn simulate_host(
5918            mut self,
5919            project: ModelHandle<Project>,
5920            files: Arc<Mutex<Vec<PathBuf>>>,
5921            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5922            rng: Arc<Mutex<StdRng>>,
5923            mut cx: TestAppContext,
5924        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
5925            async fn simulate_host_internal(
5926                client: &mut TestClient,
5927                project: ModelHandle<Project>,
5928                files: Arc<Mutex<Vec<PathBuf>>>,
5929                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5930                rng: Arc<Mutex<StdRng>>,
5931                cx: &mut TestAppContext,
5932            ) -> anyhow::Result<()> {
5933                let fs = project.read_with(cx, |project, _| project.fs().clone());
5934
5935                while op_start_signal.next().await.is_some() {
5936                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
5937                    match distribution {
5938                        0..=20 if !files.lock().is_empty() => {
5939                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5940                            let mut path = path.as_path();
5941                            while let Some(parent_path) = path.parent() {
5942                                path = parent_path;
5943                                if rng.lock().gen() {
5944                                    break;
5945                                }
5946                            }
5947
5948                            log::info!("Host: find/create local worktree {:?}", path);
5949                            let find_or_create_worktree = project.update(cx, |project, cx| {
5950                                project.find_or_create_local_worktree(path, true, cx)
5951                            });
5952                            if rng.lock().gen() {
5953                                cx.background().spawn(find_or_create_worktree).detach();
5954                            } else {
5955                                find_or_create_worktree.await?;
5956                            }
5957                        }
5958                        10..=80 if !files.lock().is_empty() => {
5959                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
5960                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5961                                let (worktree, path) = project
5962                                    .update(cx, |project, cx| {
5963                                        project.find_or_create_local_worktree(
5964                                            file.clone(),
5965                                            true,
5966                                            cx,
5967                                        )
5968                                    })
5969                                    .await?;
5970                                let project_path =
5971                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
5972                                log::info!(
5973                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
5974                                    file,
5975                                    project_path.0,
5976                                    project_path.1
5977                                );
5978                                let buffer = project
5979                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
5980                                    .await
5981                                    .unwrap();
5982                                client.buffers.insert(buffer.clone());
5983                                buffer
5984                            } else {
5985                                client
5986                                    .buffers
5987                                    .iter()
5988                                    .choose(&mut *rng.lock())
5989                                    .unwrap()
5990                                    .clone()
5991                            };
5992
5993                            if rng.lock().gen_bool(0.1) {
5994                                cx.update(|cx| {
5995                                    log::info!(
5996                                        "Host: dropping buffer {:?}",
5997                                        buffer.read(cx).file().unwrap().full_path(cx)
5998                                    );
5999                                    client.buffers.remove(&buffer);
6000                                    drop(buffer);
6001                                });
6002                            } else {
6003                                buffer.update(cx, |buffer, cx| {
6004                                    log::info!(
6005                                        "Host: updating buffer {:?} ({})",
6006                                        buffer.file().unwrap().full_path(cx),
6007                                        buffer.remote_id()
6008                                    );
6009
6010                                    if rng.lock().gen_bool(0.7) {
6011                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6012                                    } else {
6013                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6014                                    }
6015                                });
6016                            }
6017                        }
6018                        _ => loop {
6019                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6020                            let mut path = PathBuf::new();
6021                            path.push("/");
6022                            for _ in 0..path_component_count {
6023                                let letter = rng.lock().gen_range(b'a'..=b'z');
6024                                path.push(std::str::from_utf8(&[letter]).unwrap());
6025                            }
6026                            path.set_extension("rs");
6027                            let parent_path = path.parent().unwrap();
6028
6029                            log::info!("Host: creating file {:?}", path,);
6030
6031                            if fs.create_dir(&parent_path).await.is_ok()
6032                                && fs.create_file(&path, Default::default()).await.is_ok()
6033                            {
6034                                files.lock().push(path);
6035                                break;
6036                            } else {
6037                                log::info!("Host: cannot create file");
6038                            }
6039                        },
6040                    }
6041
6042                    cx.background().simulate_random_delay().await;
6043                }
6044
6045                Ok(())
6046            }
6047
6048            let result = simulate_host_internal(
6049                &mut self,
6050                project.clone(),
6051                files,
6052                op_start_signal,
6053                rng,
6054                &mut cx,
6055            )
6056            .await;
6057            log::info!("Host done");
6058            self.project = Some(project);
6059            (self, cx, result.err())
6060        }
6061
6062        pub async fn simulate_guest(
6063            mut self,
6064            guest_username: String,
6065            project: ModelHandle<Project>,
6066            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6067            rng: Arc<Mutex<StdRng>>,
6068            mut cx: TestAppContext,
6069        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6070            async fn simulate_guest_internal(
6071                client: &mut TestClient,
6072                guest_username: &str,
6073                project: ModelHandle<Project>,
6074                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6075                rng: Arc<Mutex<StdRng>>,
6076                cx: &mut TestAppContext,
6077            ) -> anyhow::Result<()> {
6078                while op_start_signal.next().await.is_some() {
6079                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6080                        let worktree = if let Some(worktree) =
6081                            project.read_with(cx, |project, cx| {
6082                                project
6083                                    .worktrees(&cx)
6084                                    .filter(|worktree| {
6085                                        let worktree = worktree.read(cx);
6086                                        worktree.is_visible()
6087                                            && worktree.entries(false).any(|e| e.is_file())
6088                                    })
6089                                    .choose(&mut *rng.lock())
6090                            }) {
6091                            worktree
6092                        } else {
6093                            cx.background().simulate_random_delay().await;
6094                            continue;
6095                        };
6096
6097                        let (worktree_root_name, project_path) =
6098                            worktree.read_with(cx, |worktree, _| {
6099                                let entry = worktree
6100                                    .entries(false)
6101                                    .filter(|e| e.is_file())
6102                                    .choose(&mut *rng.lock())
6103                                    .unwrap();
6104                                (
6105                                    worktree.root_name().to_string(),
6106                                    (worktree.id(), entry.path.clone()),
6107                                )
6108                            });
6109                        log::info!(
6110                            "{}: opening path {:?} in worktree {} ({})",
6111                            guest_username,
6112                            project_path.1,
6113                            project_path.0,
6114                            worktree_root_name,
6115                        );
6116                        let buffer = project
6117                            .update(cx, |project, cx| {
6118                                project.open_buffer(project_path.clone(), cx)
6119                            })
6120                            .await?;
6121                        log::info!(
6122                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6123                            guest_username,
6124                            project_path.1,
6125                            project_path.0,
6126                            worktree_root_name,
6127                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6128                        );
6129                        client.buffers.insert(buffer.clone());
6130                        buffer
6131                    } else {
6132                        client
6133                            .buffers
6134                            .iter()
6135                            .choose(&mut *rng.lock())
6136                            .unwrap()
6137                            .clone()
6138                    };
6139
6140                    let choice = rng.lock().gen_range(0..100);
6141                    match choice {
6142                        0..=9 => {
6143                            cx.update(|cx| {
6144                                log::info!(
6145                                    "{}: dropping buffer {:?}",
6146                                    guest_username,
6147                                    buffer.read(cx).file().unwrap().full_path(cx)
6148                                );
6149                                client.buffers.remove(&buffer);
6150                                drop(buffer);
6151                            });
6152                        }
6153                        10..=19 => {
6154                            let completions = project.update(cx, |project, cx| {
6155                                log::info!(
6156                                    "{}: requesting completions for buffer {} ({:?})",
6157                                    guest_username,
6158                                    buffer.read(cx).remote_id(),
6159                                    buffer.read(cx).file().unwrap().full_path(cx)
6160                                );
6161                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6162                                project.completions(&buffer, offset, cx)
6163                            });
6164                            let completions = cx.background().spawn(async move {
6165                                completions
6166                                    .await
6167                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6168                            });
6169                            if rng.lock().gen_bool(0.3) {
6170                                log::info!("{}: detaching completions request", guest_username);
6171                                cx.update(|cx| completions.detach_and_log_err(cx));
6172                            } else {
6173                                completions.await?;
6174                            }
6175                        }
6176                        20..=29 => {
6177                            let code_actions = project.update(cx, |project, cx| {
6178                                log::info!(
6179                                    "{}: requesting code actions for buffer {} ({:?})",
6180                                    guest_username,
6181                                    buffer.read(cx).remote_id(),
6182                                    buffer.read(cx).file().unwrap().full_path(cx)
6183                                );
6184                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6185                                project.code_actions(&buffer, range, cx)
6186                            });
6187                            let code_actions = cx.background().spawn(async move {
6188                                code_actions.await.map_err(|err| {
6189                                    anyhow!("code actions request failed: {:?}", err)
6190                                })
6191                            });
6192                            if rng.lock().gen_bool(0.3) {
6193                                log::info!("{}: detaching code actions request", guest_username);
6194                                cx.update(|cx| code_actions.detach_and_log_err(cx));
6195                            } else {
6196                                code_actions.await?;
6197                            }
6198                        }
6199                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6200                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6201                                log::info!(
6202                                    "{}: saving buffer {} ({:?})",
6203                                    guest_username,
6204                                    buffer.remote_id(),
6205                                    buffer.file().unwrap().full_path(cx)
6206                                );
6207                                (buffer.version(), buffer.save(cx))
6208                            });
6209                            let save = cx.background().spawn(async move {
6210                                let (saved_version, _) = save
6211                                    .await
6212                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6213                                assert!(saved_version.observed_all(&requested_version));
6214                                Ok::<_, anyhow::Error>(())
6215                            });
6216                            if rng.lock().gen_bool(0.3) {
6217                                log::info!("{}: detaching save request", guest_username);
6218                                cx.update(|cx| save.detach_and_log_err(cx));
6219                            } else {
6220                                save.await?;
6221                            }
6222                        }
6223                        40..=44 => {
6224                            let prepare_rename = project.update(cx, |project, cx| {
6225                                log::info!(
6226                                    "{}: preparing rename for buffer {} ({:?})",
6227                                    guest_username,
6228                                    buffer.read(cx).remote_id(),
6229                                    buffer.read(cx).file().unwrap().full_path(cx)
6230                                );
6231                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6232                                project.prepare_rename(buffer, offset, cx)
6233                            });
6234                            let prepare_rename = cx.background().spawn(async move {
6235                                prepare_rename.await.map_err(|err| {
6236                                    anyhow!("prepare rename request failed: {:?}", err)
6237                                })
6238                            });
6239                            if rng.lock().gen_bool(0.3) {
6240                                log::info!("{}: detaching prepare rename request", guest_username);
6241                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6242                            } else {
6243                                prepare_rename.await?;
6244                            }
6245                        }
6246                        45..=49 => {
6247                            let definitions = project.update(cx, |project, cx| {
6248                                log::info!(
6249                                    "{}: requesting definitions for buffer {} ({:?})",
6250                                    guest_username,
6251                                    buffer.read(cx).remote_id(),
6252                                    buffer.read(cx).file().unwrap().full_path(cx)
6253                                );
6254                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6255                                project.definition(&buffer, offset, cx)
6256                            });
6257                            let definitions = cx.background().spawn(async move {
6258                                definitions
6259                                    .await
6260                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6261                            });
6262                            if rng.lock().gen_bool(0.3) {
6263                                log::info!("{}: detaching definitions request", guest_username);
6264                                cx.update(|cx| definitions.detach_and_log_err(cx));
6265                            } else {
6266                                client
6267                                    .buffers
6268                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6269                            }
6270                        }
6271                        50..=54 => {
6272                            let highlights = project.update(cx, |project, cx| {
6273                                log::info!(
6274                                    "{}: requesting highlights for buffer {} ({:?})",
6275                                    guest_username,
6276                                    buffer.read(cx).remote_id(),
6277                                    buffer.read(cx).file().unwrap().full_path(cx)
6278                                );
6279                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6280                                project.document_highlights(&buffer, offset, cx)
6281                            });
6282                            let highlights = cx.background().spawn(async move {
6283                                highlights
6284                                    .await
6285                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6286                            });
6287                            if rng.lock().gen_bool(0.3) {
6288                                log::info!("{}: detaching highlights request", guest_username);
6289                                cx.update(|cx| highlights.detach_and_log_err(cx));
6290                            } else {
6291                                highlights.await?;
6292                            }
6293                        }
6294                        55..=59 => {
6295                            let search = project.update(cx, |project, cx| {
6296                                let query = rng.lock().gen_range('a'..='z');
6297                                log::info!("{}: project-wide search {:?}", guest_username, query);
6298                                project.search(SearchQuery::text(query, false, false), cx)
6299                            });
6300                            let search = cx.background().spawn(async move {
6301                                search
6302                                    .await
6303                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
6304                            });
6305                            if rng.lock().gen_bool(0.3) {
6306                                log::info!("{}: detaching search request", guest_username);
6307                                cx.update(|cx| search.detach_and_log_err(cx));
6308                            } else {
6309                                client.buffers.extend(search.await?.into_keys());
6310                            }
6311                        }
6312                        _ => {
6313                            buffer.update(cx, |buffer, cx| {
6314                                log::info!(
6315                                    "{}: updating buffer {} ({:?})",
6316                                    guest_username,
6317                                    buffer.remote_id(),
6318                                    buffer.file().unwrap().full_path(cx)
6319                                );
6320                                if rng.lock().gen_bool(0.7) {
6321                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6322                                } else {
6323                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6324                                }
6325                            });
6326                        }
6327                    }
6328                    cx.background().simulate_random_delay().await;
6329                }
6330                Ok(())
6331            }
6332
6333            let result = simulate_guest_internal(
6334                &mut self,
6335                &guest_username,
6336                project.clone(),
6337                op_start_signal,
6338                rng,
6339                &mut cx,
6340            )
6341            .await;
6342            log::info!("{}: done", guest_username);
6343
6344            self.project = Some(project);
6345            (self, cx, result.err())
6346        }
6347    }
6348
6349    impl Drop for TestClient {
6350        fn drop(&mut self) {
6351            self.client.tear_down();
6352        }
6353    }
6354
6355    impl Executor for Arc<gpui::executor::Background> {
6356        type Sleep = gpui::executor::Timer;
6357
6358        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6359            self.spawn(future).detach();
6360        }
6361
6362        fn sleep(&self, duration: Duration) -> Self::Sleep {
6363            self.as_ref().timer(duration)
6364        }
6365    }
6366
6367    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6368        channel
6369            .messages()
6370            .cursor::<()>()
6371            .map(|m| {
6372                (
6373                    m.sender.github_login.clone(),
6374                    m.body.clone(),
6375                    m.is_pending(),
6376                )
6377            })
6378            .collect()
6379    }
6380
6381    struct EmptyView;
6382
6383    impl gpui::Entity for EmptyView {
6384        type Event = ();
6385    }
6386
6387    impl gpui::View for EmptyView {
6388        fn ui_name() -> &'static str {
6389            "empty view"
6390        }
6391
6392        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6393            gpui::Element::boxed(gpui::elements::Empty)
6394        }
6395    }
6396}