rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::{IntoResponse, Response},
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::{HashMap, HashSet};
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use log::{as_debug, as_display};
  29use rpc::{
  30    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  31    Connection, ConnectionId, Peer, TypedEnvelope,
  32};
  33use std::{
  34    any::TypeId,
  35    future::Future,
  36    marker::PhantomData,
  37    net::SocketAddr,
  38    ops::{Deref, DerefMut},
  39    rc::Rc,
  40    sync::Arc,
  41    time::{Duration, Instant},
  42};
  43use store::{Store, Worktree};
  44use time::OffsetDateTime;
  45use tokio::{
  46    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  47    time::Sleep,
  48};
  49use tower::ServiceBuilder;
  50use tracing::{info_span, Instrument};
  51use util::ResultExt;
  52
  53type MessageHandler = Box<
  54    dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, Result<()>>,
  55>;
  56
  57pub struct Server {
  58    peer: Arc<Peer>,
  59    store: RwLock<Store>,
  60    app_state: Arc<AppState>,
  61    handlers: HashMap<TypeId, MessageHandler>,
  62    notifications: Option<mpsc::UnboundedSender<()>>,
  63}
  64
  65pub trait Executor: Send + Clone {
  66    type Sleep: Send + Future;
  67    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  68    fn sleep(&self, duration: Duration) -> Self::Sleep;
  69}
  70
  71#[derive(Clone)]
  72pub struct RealExecutor;
  73
  74const MESSAGE_COUNT_PER_PAGE: usize = 100;
  75const MAX_MESSAGE_LEN: usize = 1024;
  76
  77struct StoreReadGuard<'a> {
  78    guard: RwLockReadGuard<'a, Store>,
  79    _not_send: PhantomData<Rc<()>>,
  80}
  81
  82struct StoreWriteGuard<'a> {
  83    guard: RwLockWriteGuard<'a, Store>,
  84    _not_send: PhantomData<Rc<()>>,
  85}
  86
  87impl Server {
  88    pub fn new(
  89        app_state: Arc<AppState>,
  90        notifications: Option<mpsc::UnboundedSender<()>>,
  91    ) -> Arc<Self> {
  92        let mut server = Self {
  93            peer: Peer::new(),
  94            app_state,
  95            store: Default::default(),
  96            handlers: Default::default(),
  97            notifications,
  98        };
  99
 100        server
 101            .add_request_handler(Server::ping)
 102            .add_request_handler(Server::register_project)
 103            .add_message_handler(Server::unregister_project)
 104            .add_request_handler(Server::share_project)
 105            .add_message_handler(Server::unshare_project)
 106            .add_sync_request_handler(Server::join_project)
 107            .add_message_handler(Server::leave_project)
 108            .add_request_handler(Server::register_worktree)
 109            .add_message_handler(Server::unregister_worktree)
 110            .add_request_handler(Server::update_worktree)
 111            .add_message_handler(Server::start_language_server)
 112            .add_message_handler(Server::update_language_server)
 113            .add_message_handler(Server::update_diagnostic_summary)
 114            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 115            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 116            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 117            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 118            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 119            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 120            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 121            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 122            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 123            .add_request_handler(
 124                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 125            )
 126            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 127            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 128            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 129            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 130            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 131            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 132            .add_request_handler(Server::update_buffer)
 133            .add_message_handler(Server::update_buffer_file)
 134            .add_message_handler(Server::buffer_reloaded)
 135            .add_message_handler(Server::buffer_saved)
 136            .add_request_handler(Server::save_buffer)
 137            .add_request_handler(Server::get_channels)
 138            .add_request_handler(Server::get_users)
 139            .add_request_handler(Server::join_channel)
 140            .add_message_handler(Server::leave_channel)
 141            .add_request_handler(Server::send_channel_message)
 142            .add_request_handler(Server::follow)
 143            .add_message_handler(Server::unfollow)
 144            .add_message_handler(Server::update_followers)
 145            .add_request_handler(Server::get_channel_messages);
 146
 147        Arc::new(server)
 148    }
 149
 150    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 151    where
 152        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 153        Fut: 'static + Send + Future<Output = Result<()>>,
 154        M: EnvelopedMessage,
 155    {
 156        let prev_handler = self.handlers.insert(
 157            TypeId::of::<M>(),
 158            Box::new(move |server, envelope| {
 159                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 160                let span = info_span!(
 161                    "handle message",
 162                    payload_type = envelope.payload_type_name()
 163                );
 164                (handler)(server, *envelope).instrument(span).boxed()
 165            }),
 166        );
 167        if prev_handler.is_some() {
 168            panic!("registered a handler for the same message twice");
 169        }
 170        self
 171    }
 172
 173    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 174    where
 175        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 176        Fut: 'static + Send + Future<Output = Result<M::Response>>,
 177        M: RequestMessage,
 178    {
 179        self.add_message_handler(move |server, envelope| {
 180            let receipt = envelope.receipt();
 181            let response = (handler)(server.clone(), envelope);
 182            async move {
 183                match response.await {
 184                    Ok(response) => {
 185                        server.peer.respond(receipt, response)?;
 186                        Ok(())
 187                    }
 188                    Err(error) => {
 189                        server.peer.respond_with_error(
 190                            receipt,
 191                            proto::Error {
 192                                message: error.to_string(),
 193                            },
 194                        )?;
 195                        Err(error)
 196                    }
 197                }
 198            }
 199        })
 200    }
 201
 202    /// Handle a request while holding a lock to the store. This is useful when we're registering
 203    /// a connection but we want to respond on the connection before anybody else can send on it.
 204    fn add_sync_request_handler<F, M>(&mut self, handler: F) -> &mut Self
 205    where
 206        F: 'static
 207            + Send
 208            + Sync
 209            + Fn(Arc<Self>, &mut Store, TypedEnvelope<M>) -> Result<M::Response>,
 210        M: RequestMessage,
 211    {
 212        let handler = Arc::new(handler);
 213        self.add_message_handler(move |server, envelope| {
 214            let receipt = envelope.receipt();
 215            let handler = handler.clone();
 216            async move {
 217                let mut store = server.store.write().await;
 218                let response = (handler)(server.clone(), &mut *store, envelope);
 219                match response {
 220                    Ok(response) => {
 221                        server.peer.respond(receipt, response)?;
 222                        Ok(())
 223                    }
 224                    Err(error) => {
 225                        server.peer.respond_with_error(
 226                            receipt,
 227                            proto::Error {
 228                                message: error.to_string(),
 229                            },
 230                        )?;
 231                        Err(error)
 232                    }
 233                }
 234            }
 235        })
 236    }
 237
 238    pub fn handle_connection<E: Executor>(
 239        self: &Arc<Self>,
 240        connection: Connection,
 241        addr: String,
 242        user_id: UserId,
 243        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 244        executor: E,
 245    ) -> impl Future<Output = ()> {
 246        let mut this = self.clone();
 247        async move {
 248            let (connection_id, handle_io, mut incoming_rx) = this
 249                .peer
 250                .add_connection(connection, {
 251                    let executor = executor.clone();
 252                    move |duration| {
 253                        let timer = executor.sleep(duration);
 254                        async move {
 255                            timer.await;
 256                        }
 257                    }
 258                })
 259                .await;
 260
 261            if let Some(send_connection_id) = send_connection_id.as_mut() {
 262                let _ = send_connection_id.send(connection_id).await;
 263            }
 264
 265            {
 266                let mut state = this.state_mut().await;
 267                state.add_connection(connection_id, user_id);
 268                this.update_contacts_for_users(&*state, &[user_id]);
 269            }
 270
 271            let handle_io = handle_io.fuse();
 272            futures::pin_mut!(handle_io);
 273            loop {
 274                let next_message = incoming_rx.next().fuse();
 275                futures::pin_mut!(next_message);
 276                futures::select_biased! {
 277                    result = handle_io => {
 278                        if let Err(err) = result {
 279                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 280                        }
 281                        break;
 282                    }
 283                    message = next_message => {
 284                        if let Some(message) = message {
 285                            let start_time = Instant::now();
 286                            let type_name = message.payload_type_name();
 287                            log::info!(connection_id = connection_id.0, type_name = type_name; "rpc message received");
 288                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 289                                let notifications = this.notifications.clone();
 290                                let is_background = message.is_background();
 291                                let handle_message = (handler)(this.clone(), message);
 292                                let handle_message = async move {
 293                                    if let Err(err) = handle_message.await {
 294                                        log::error!(connection_id = connection_id.0, type = type_name, error = as_display!(err); "rpc message error");
 295                                    } else {
 296                                        log::info!(connection_id = connection_id.0, type = type_name, duration = as_debug!(start_time.elapsed()); "rpc message handled");
 297                                    }
 298                                    if let Some(mut notifications) = notifications {
 299                                        let _ = notifications.send(()).await;
 300                                    }
 301                                };
 302                                if is_background {
 303                                    executor.spawn_detached(handle_message);
 304                                } else {
 305                                    handle_message.await;
 306                                }
 307                            } else {
 308                                log::warn!("unhandled message: {}", type_name);
 309                            }
 310                        } else {
 311                            log::info!(address = as_debug!(addr); "rpc connection closed");
 312                            break;
 313                        }
 314                    }
 315                }
 316            }
 317
 318            if let Err(err) = this.sign_out(connection_id).await {
 319                log::error!("error signing out connection {:?} - {:?}", addr, err);
 320            }
 321        }
 322    }
 323
 324    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 325        self.peer.disconnect(connection_id);
 326        let mut state = self.state_mut().await;
 327        let removed_connection = state.remove_connection(connection_id)?;
 328
 329        for (project_id, project) in removed_connection.hosted_projects {
 330            if let Some(share) = project.share {
 331                broadcast(
 332                    connection_id,
 333                    share.guests.keys().copied().collect(),
 334                    |conn_id| {
 335                        self.peer
 336                            .send(conn_id, proto::UnshareProject { project_id })
 337                    },
 338                );
 339            }
 340        }
 341
 342        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 343            broadcast(connection_id, peer_ids, |conn_id| {
 344                self.peer.send(
 345                    conn_id,
 346                    proto::RemoveProjectCollaborator {
 347                        project_id,
 348                        peer_id: connection_id.0,
 349                    },
 350                )
 351            });
 352        }
 353
 354        self.update_contacts_for_users(&*state, removed_connection.contact_ids.iter());
 355        Ok(())
 356    }
 357
 358    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> Result<proto::Ack> {
 359        Ok(proto::Ack {})
 360    }
 361
 362    async fn register_project(
 363        self: Arc<Server>,
 364        request: TypedEnvelope<proto::RegisterProject>,
 365    ) -> Result<proto::RegisterProjectResponse> {
 366        let project_id = {
 367            let mut state = self.state_mut().await;
 368            let user_id = state.user_id_for_connection(request.sender_id)?;
 369            state.register_project(request.sender_id, user_id)
 370        };
 371        Ok(proto::RegisterProjectResponse { project_id })
 372    }
 373
 374    async fn unregister_project(
 375        self: Arc<Server>,
 376        request: TypedEnvelope<proto::UnregisterProject>,
 377    ) -> Result<()> {
 378        let mut state = self.state_mut().await;
 379        let project = state.unregister_project(request.payload.project_id, request.sender_id)?;
 380        self.update_contacts_for_users(&*state, &project.authorized_user_ids());
 381        Ok(())
 382    }
 383
 384    async fn share_project(
 385        self: Arc<Server>,
 386        request: TypedEnvelope<proto::ShareProject>,
 387    ) -> Result<proto::Ack> {
 388        let mut state = self.state_mut().await;
 389        let project = state.share_project(request.payload.project_id, request.sender_id)?;
 390        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 391        Ok(proto::Ack {})
 392    }
 393
 394    async fn unshare_project(
 395        self: Arc<Server>,
 396        request: TypedEnvelope<proto::UnshareProject>,
 397    ) -> Result<()> {
 398        let project_id = request.payload.project_id;
 399        let mut state = self.state_mut().await;
 400        let project = state.unshare_project(project_id, request.sender_id)?;
 401        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 402            self.peer
 403                .send(conn_id, proto::UnshareProject { project_id })
 404        });
 405        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 406        Ok(())
 407    }
 408
 409    fn join_project(
 410        self: Arc<Server>,
 411        state: &mut Store,
 412        request: TypedEnvelope<proto::JoinProject>,
 413    ) -> Result<proto::JoinProjectResponse> {
 414        let project_id = request.payload.project_id;
 415
 416        let user_id = state.user_id_for_connection(request.sender_id)?;
 417        let (response, connection_ids, contact_user_ids) = state
 418            .join_project(request.sender_id, user_id, project_id)
 419            .and_then(|joined| {
 420                let share = joined.project.share()?;
 421                let peer_count = share.guests.len();
 422                let mut collaborators = Vec::with_capacity(peer_count);
 423                collaborators.push(proto::Collaborator {
 424                    peer_id: joined.project.host_connection_id.0,
 425                    replica_id: 0,
 426                    user_id: joined.project.host_user_id.to_proto(),
 427                });
 428                let worktrees = share
 429                    .worktrees
 430                    .iter()
 431                    .filter_map(|(id, shared_worktree)| {
 432                        let worktree = joined.project.worktrees.get(&id)?;
 433                        Some(proto::Worktree {
 434                            id: *id,
 435                            root_name: worktree.root_name.clone(),
 436                            entries: shared_worktree.entries.values().cloned().collect(),
 437                            diagnostic_summaries: shared_worktree
 438                                .diagnostic_summaries
 439                                .values()
 440                                .cloned()
 441                                .collect(),
 442                            visible: worktree.visible,
 443                        })
 444                    })
 445                    .collect();
 446                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 447                    if *peer_conn_id != request.sender_id {
 448                        collaborators.push(proto::Collaborator {
 449                            peer_id: peer_conn_id.0,
 450                            replica_id: *peer_replica_id as u32,
 451                            user_id: peer_user_id.to_proto(),
 452                        });
 453                    }
 454                }
 455                let response = proto::JoinProjectResponse {
 456                    worktrees,
 457                    replica_id: joined.replica_id as u32,
 458                    collaborators,
 459                    language_servers: joined.project.language_servers.clone(),
 460                };
 461                let connection_ids = joined.project.connection_ids();
 462                let contact_user_ids = joined.project.authorized_user_ids();
 463                Ok((response, connection_ids, contact_user_ids))
 464            })?;
 465
 466        broadcast(request.sender_id, connection_ids, |conn_id| {
 467            self.peer.send(
 468                conn_id,
 469                proto::AddProjectCollaborator {
 470                    project_id,
 471                    collaborator: Some(proto::Collaborator {
 472                        peer_id: request.sender_id.0,
 473                        replica_id: response.replica_id,
 474                        user_id: user_id.to_proto(),
 475                    }),
 476                },
 477            )
 478        });
 479        self.update_contacts_for_users(state, &contact_user_ids);
 480        Ok(response)
 481    }
 482
 483    async fn leave_project(
 484        self: Arc<Server>,
 485        request: TypedEnvelope<proto::LeaveProject>,
 486    ) -> Result<()> {
 487        let sender_id = request.sender_id;
 488        let project_id = request.payload.project_id;
 489        let mut state = self.state_mut().await;
 490        let worktree = state.leave_project(sender_id, project_id)?;
 491        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 492            self.peer.send(
 493                conn_id,
 494                proto::RemoveProjectCollaborator {
 495                    project_id,
 496                    peer_id: sender_id.0,
 497                },
 498            )
 499        });
 500        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 501        Ok(())
 502    }
 503
 504    async fn register_worktree(
 505        self: Arc<Server>,
 506        request: TypedEnvelope<proto::RegisterWorktree>,
 507    ) -> Result<proto::Ack> {
 508        let mut contact_user_ids = HashSet::default();
 509        for github_login in &request.payload.authorized_logins {
 510            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 511            contact_user_ids.insert(contact_user_id);
 512        }
 513
 514        let mut state = self.state_mut().await;
 515        let host_user_id = state.user_id_for_connection(request.sender_id)?;
 516        contact_user_ids.insert(host_user_id);
 517
 518        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 519        let guest_connection_ids = state
 520            .read_project(request.payload.project_id, request.sender_id)?
 521            .guest_connection_ids();
 522        state.register_worktree(
 523            request.payload.project_id,
 524            request.payload.worktree_id,
 525            request.sender_id,
 526            Worktree {
 527                authorized_user_ids: contact_user_ids.clone(),
 528                root_name: request.payload.root_name.clone(),
 529                visible: request.payload.visible,
 530            },
 531        )?;
 532
 533        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 534            self.peer
 535                .forward_send(request.sender_id, connection_id, request.payload.clone())
 536        });
 537        self.update_contacts_for_users(&*state, &contact_user_ids);
 538        Ok(proto::Ack {})
 539    }
 540
 541    async fn unregister_worktree(
 542        self: Arc<Server>,
 543        request: TypedEnvelope<proto::UnregisterWorktree>,
 544    ) -> Result<()> {
 545        let project_id = request.payload.project_id;
 546        let worktree_id = request.payload.worktree_id;
 547        let mut state = self.state_mut().await;
 548        let (worktree, guest_connection_ids) =
 549            state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 550        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 551            self.peer.send(
 552                conn_id,
 553                proto::UnregisterWorktree {
 554                    project_id,
 555                    worktree_id,
 556                },
 557            )
 558        });
 559        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 560        Ok(())
 561    }
 562
 563    async fn update_worktree(
 564        self: Arc<Server>,
 565        request: TypedEnvelope<proto::UpdateWorktree>,
 566    ) -> Result<proto::Ack> {
 567        let connection_ids = self.state_mut().await.update_worktree(
 568            request.sender_id,
 569            request.payload.project_id,
 570            request.payload.worktree_id,
 571            &request.payload.removed_entries,
 572            &request.payload.updated_entries,
 573        )?;
 574
 575        broadcast(request.sender_id, connection_ids, |connection_id| {
 576            self.peer
 577                .forward_send(request.sender_id, connection_id, request.payload.clone())
 578        });
 579
 580        Ok(proto::Ack {})
 581    }
 582
 583    async fn update_diagnostic_summary(
 584        self: Arc<Server>,
 585        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 586    ) -> Result<()> {
 587        let summary = request
 588            .payload
 589            .summary
 590            .clone()
 591            .ok_or_else(|| anyhow!("invalid summary"))?;
 592        let receiver_ids = self.state_mut().await.update_diagnostic_summary(
 593            request.payload.project_id,
 594            request.payload.worktree_id,
 595            request.sender_id,
 596            summary,
 597        )?;
 598
 599        broadcast(request.sender_id, receiver_ids, |connection_id| {
 600            self.peer
 601                .forward_send(request.sender_id, connection_id, request.payload.clone())
 602        });
 603        Ok(())
 604    }
 605
 606    async fn start_language_server(
 607        self: Arc<Server>,
 608        request: TypedEnvelope<proto::StartLanguageServer>,
 609    ) -> Result<()> {
 610        let receiver_ids = self.state_mut().await.start_language_server(
 611            request.payload.project_id,
 612            request.sender_id,
 613            request
 614                .payload
 615                .server
 616                .clone()
 617                .ok_or_else(|| anyhow!("invalid language server"))?,
 618        )?;
 619        broadcast(request.sender_id, receiver_ids, |connection_id| {
 620            self.peer
 621                .forward_send(request.sender_id, connection_id, request.payload.clone())
 622        });
 623        Ok(())
 624    }
 625
 626    async fn update_language_server(
 627        self: Arc<Server>,
 628        request: TypedEnvelope<proto::UpdateLanguageServer>,
 629    ) -> Result<()> {
 630        let receiver_ids = self
 631            .state()
 632            .await
 633            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 634        broadcast(request.sender_id, receiver_ids, |connection_id| {
 635            self.peer
 636                .forward_send(request.sender_id, connection_id, request.payload.clone())
 637        });
 638        Ok(())
 639    }
 640
 641    async fn forward_project_request<T>(
 642        self: Arc<Server>,
 643        request: TypedEnvelope<T>,
 644    ) -> Result<T::Response>
 645    where
 646        T: EntityMessage + RequestMessage,
 647    {
 648        let host_connection_id = self
 649            .state()
 650            .await
 651            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 652            .host_connection_id;
 653        Ok(self
 654            .peer
 655            .forward_request(request.sender_id, host_connection_id, request.payload)
 656            .await?)
 657    }
 658
 659    async fn save_buffer(
 660        self: Arc<Server>,
 661        request: TypedEnvelope<proto::SaveBuffer>,
 662    ) -> Result<proto::BufferSaved> {
 663        let host = self
 664            .state()
 665            .await
 666            .read_project(request.payload.project_id, request.sender_id)?
 667            .host_connection_id;
 668        let response = self
 669            .peer
 670            .forward_request(request.sender_id, host, request.payload.clone())
 671            .await?;
 672
 673        let mut guests = self
 674            .state()
 675            .await
 676            .read_project(request.payload.project_id, request.sender_id)?
 677            .connection_ids();
 678        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 679        broadcast(host, guests, |conn_id| {
 680            self.peer.forward_send(host, conn_id, response.clone())
 681        });
 682
 683        Ok(response)
 684    }
 685
 686    async fn update_buffer(
 687        self: Arc<Server>,
 688        request: TypedEnvelope<proto::UpdateBuffer>,
 689    ) -> Result<proto::Ack> {
 690        let receiver_ids = self
 691            .state()
 692            .await
 693            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 694        broadcast(request.sender_id, receiver_ids, |connection_id| {
 695            self.peer
 696                .forward_send(request.sender_id, connection_id, request.payload.clone())
 697        });
 698        Ok(proto::Ack {})
 699    }
 700
 701    async fn update_buffer_file(
 702        self: Arc<Server>,
 703        request: TypedEnvelope<proto::UpdateBufferFile>,
 704    ) -> Result<()> {
 705        let receiver_ids = self
 706            .state()
 707            .await
 708            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 709        broadcast(request.sender_id, receiver_ids, |connection_id| {
 710            self.peer
 711                .forward_send(request.sender_id, connection_id, request.payload.clone())
 712        });
 713        Ok(())
 714    }
 715
 716    async fn buffer_reloaded(
 717        self: Arc<Server>,
 718        request: TypedEnvelope<proto::BufferReloaded>,
 719    ) -> Result<()> {
 720        let receiver_ids = self
 721            .state()
 722            .await
 723            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 724        broadcast(request.sender_id, receiver_ids, |connection_id| {
 725            self.peer
 726                .forward_send(request.sender_id, connection_id, request.payload.clone())
 727        });
 728        Ok(())
 729    }
 730
 731    async fn buffer_saved(
 732        self: Arc<Server>,
 733        request: TypedEnvelope<proto::BufferSaved>,
 734    ) -> Result<()> {
 735        let receiver_ids = self
 736            .state()
 737            .await
 738            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 739        broadcast(request.sender_id, receiver_ids, |connection_id| {
 740            self.peer
 741                .forward_send(request.sender_id, connection_id, request.payload.clone())
 742        });
 743        Ok(())
 744    }
 745
 746    async fn follow(
 747        self: Arc<Self>,
 748        request: TypedEnvelope<proto::Follow>,
 749    ) -> Result<proto::FollowResponse> {
 750        let leader_id = ConnectionId(request.payload.leader_id);
 751        let follower_id = request.sender_id;
 752        if !self
 753            .state()
 754            .await
 755            .project_connection_ids(request.payload.project_id, follower_id)?
 756            .contains(&leader_id)
 757        {
 758            Err(anyhow!("no such peer"))?;
 759        }
 760        let mut response = self
 761            .peer
 762            .forward_request(request.sender_id, leader_id, request.payload)
 763            .await?;
 764        response
 765            .views
 766            .retain(|view| view.leader_id != Some(follower_id.0));
 767        Ok(response)
 768    }
 769
 770    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 771        let leader_id = ConnectionId(request.payload.leader_id);
 772        if !self
 773            .state()
 774            .await
 775            .project_connection_ids(request.payload.project_id, request.sender_id)?
 776            .contains(&leader_id)
 777        {
 778            Err(anyhow!("no such peer"))?;
 779        }
 780        self.peer
 781            .forward_send(request.sender_id, leader_id, request.payload)?;
 782        Ok(())
 783    }
 784
 785    async fn update_followers(
 786        self: Arc<Self>,
 787        request: TypedEnvelope<proto::UpdateFollowers>,
 788    ) -> Result<()> {
 789        let connection_ids = self
 790            .state()
 791            .await
 792            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 793        let leader_id = request
 794            .payload
 795            .variant
 796            .as_ref()
 797            .and_then(|variant| match variant {
 798                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 799                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 800                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 801            });
 802        for follower_id in &request.payload.follower_ids {
 803            let follower_id = ConnectionId(*follower_id);
 804            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 805                self.peer
 806                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 807            }
 808        }
 809        Ok(())
 810    }
 811
 812    async fn get_channels(
 813        self: Arc<Server>,
 814        request: TypedEnvelope<proto::GetChannels>,
 815    ) -> Result<proto::GetChannelsResponse> {
 816        let user_id = self
 817            .state()
 818            .await
 819            .user_id_for_connection(request.sender_id)?;
 820        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 821        Ok(proto::GetChannelsResponse {
 822            channels: channels
 823                .into_iter()
 824                .map(|chan| proto::Channel {
 825                    id: chan.id.to_proto(),
 826                    name: chan.name,
 827                })
 828                .collect(),
 829        })
 830    }
 831
 832    async fn get_users(
 833        self: Arc<Server>,
 834        request: TypedEnvelope<proto::GetUsers>,
 835    ) -> Result<proto::GetUsersResponse> {
 836        let user_ids = request
 837            .payload
 838            .user_ids
 839            .into_iter()
 840            .map(UserId::from_proto)
 841            .collect();
 842        let users = self
 843            .app_state
 844            .db
 845            .get_users_by_ids(user_ids)
 846            .await?
 847            .into_iter()
 848            .map(|user| proto::User {
 849                id: user.id.to_proto(),
 850                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 851                github_login: user.github_login,
 852            })
 853            .collect();
 854        Ok(proto::GetUsersResponse { users })
 855    }
 856
 857    fn update_contacts_for_users<'a>(
 858        self: &Arc<Self>,
 859        state: &Store,
 860        user_ids: impl IntoIterator<Item = &'a UserId>,
 861    ) {
 862        for user_id in user_ids {
 863            let contacts = state.contacts_for_user(*user_id);
 864            for connection_id in state.connection_ids_for_user(*user_id) {
 865                self.peer
 866                    .send(
 867                        connection_id,
 868                        proto::UpdateContacts {
 869                            contacts: contacts.clone(),
 870                        },
 871                    )
 872                    .log_err();
 873            }
 874        }
 875    }
 876
 877    async fn join_channel(
 878        self: Arc<Self>,
 879        request: TypedEnvelope<proto::JoinChannel>,
 880    ) -> Result<proto::JoinChannelResponse> {
 881        let user_id = self
 882            .state()
 883            .await
 884            .user_id_for_connection(request.sender_id)?;
 885        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 886        if !self
 887            .app_state
 888            .db
 889            .can_user_access_channel(user_id, channel_id)
 890            .await?
 891        {
 892            Err(anyhow!("access denied"))?;
 893        }
 894
 895        self.state_mut()
 896            .await
 897            .join_channel(request.sender_id, channel_id);
 898        let messages = self
 899            .app_state
 900            .db
 901            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 902            .await?
 903            .into_iter()
 904            .map(|msg| proto::ChannelMessage {
 905                id: msg.id.to_proto(),
 906                body: msg.body,
 907                timestamp: msg.sent_at.unix_timestamp() as u64,
 908                sender_id: msg.sender_id.to_proto(),
 909                nonce: Some(msg.nonce.as_u128().into()),
 910            })
 911            .collect::<Vec<_>>();
 912        Ok(proto::JoinChannelResponse {
 913            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 914            messages,
 915        })
 916    }
 917
 918    async fn leave_channel(
 919        self: Arc<Self>,
 920        request: TypedEnvelope<proto::LeaveChannel>,
 921    ) -> Result<()> {
 922        let user_id = self
 923            .state()
 924            .await
 925            .user_id_for_connection(request.sender_id)?;
 926        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 927        if !self
 928            .app_state
 929            .db
 930            .can_user_access_channel(user_id, channel_id)
 931            .await?
 932        {
 933            Err(anyhow!("access denied"))?;
 934        }
 935
 936        self.state_mut()
 937            .await
 938            .leave_channel(request.sender_id, channel_id);
 939
 940        Ok(())
 941    }
 942
 943    async fn send_channel_message(
 944        self: Arc<Self>,
 945        request: TypedEnvelope<proto::SendChannelMessage>,
 946    ) -> Result<proto::SendChannelMessageResponse> {
 947        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 948        let user_id;
 949        let connection_ids;
 950        {
 951            let state = self.state().await;
 952            user_id = state.user_id_for_connection(request.sender_id)?;
 953            connection_ids = state.channel_connection_ids(channel_id)?;
 954        }
 955
 956        // Validate the message body.
 957        let body = request.payload.body.trim().to_string();
 958        if body.len() > MAX_MESSAGE_LEN {
 959            return Err(anyhow!("message is too long"))?;
 960        }
 961        if body.is_empty() {
 962            return Err(anyhow!("message can't be blank"))?;
 963        }
 964
 965        let timestamp = OffsetDateTime::now_utc();
 966        let nonce = request
 967            .payload
 968            .nonce
 969            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 970
 971        let message_id = self
 972            .app_state
 973            .db
 974            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 975            .await?
 976            .to_proto();
 977        let message = proto::ChannelMessage {
 978            sender_id: user_id.to_proto(),
 979            id: message_id,
 980            body,
 981            timestamp: timestamp.unix_timestamp() as u64,
 982            nonce: Some(nonce),
 983        };
 984        broadcast(request.sender_id, connection_ids, |conn_id| {
 985            self.peer.send(
 986                conn_id,
 987                proto::ChannelMessageSent {
 988                    channel_id: channel_id.to_proto(),
 989                    message: Some(message.clone()),
 990                },
 991            )
 992        });
 993        Ok(proto::SendChannelMessageResponse {
 994            message: Some(message),
 995        })
 996    }
 997
 998    async fn get_channel_messages(
 999        self: Arc<Self>,
1000        request: TypedEnvelope<proto::GetChannelMessages>,
1001    ) -> Result<proto::GetChannelMessagesResponse> {
1002        let user_id = self
1003            .state()
1004            .await
1005            .user_id_for_connection(request.sender_id)?;
1006        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1007        if !self
1008            .app_state
1009            .db
1010            .can_user_access_channel(user_id, channel_id)
1011            .await?
1012        {
1013            Err(anyhow!("access denied"))?;
1014        }
1015
1016        let messages = self
1017            .app_state
1018            .db
1019            .get_channel_messages(
1020                channel_id,
1021                MESSAGE_COUNT_PER_PAGE,
1022                Some(MessageId::from_proto(request.payload.before_message_id)),
1023            )
1024            .await?
1025            .into_iter()
1026            .map(|msg| proto::ChannelMessage {
1027                id: msg.id.to_proto(),
1028                body: msg.body,
1029                timestamp: msg.sent_at.unix_timestamp() as u64,
1030                sender_id: msg.sender_id.to_proto(),
1031                nonce: Some(msg.nonce.as_u128().into()),
1032            })
1033            .collect::<Vec<_>>();
1034
1035        Ok(proto::GetChannelMessagesResponse {
1036            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1037            messages,
1038        })
1039    }
1040
1041    async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1042        #[cfg(test)]
1043        tokio::task::yield_now().await;
1044        let guard = self.store.read().await;
1045        #[cfg(test)]
1046        tokio::task::yield_now().await;
1047        StoreReadGuard {
1048            guard,
1049            _not_send: PhantomData,
1050        }
1051    }
1052
1053    async fn state_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1054        #[cfg(test)]
1055        tokio::task::yield_now().await;
1056        let guard = self.store.write().await;
1057        #[cfg(test)]
1058        tokio::task::yield_now().await;
1059        StoreWriteGuard {
1060            guard,
1061            _not_send: PhantomData,
1062        }
1063    }
1064}
1065
1066impl<'a> Deref for StoreReadGuard<'a> {
1067    type Target = Store;
1068
1069    fn deref(&self) -> &Self::Target {
1070        &*self.guard
1071    }
1072}
1073
1074impl<'a> Deref for StoreWriteGuard<'a> {
1075    type Target = Store;
1076
1077    fn deref(&self) -> &Self::Target {
1078        &*self.guard
1079    }
1080}
1081
1082impl<'a> DerefMut for StoreWriteGuard<'a> {
1083    fn deref_mut(&mut self) -> &mut Self::Target {
1084        &mut *self.guard
1085    }
1086}
1087
1088impl<'a> Drop for StoreWriteGuard<'a> {
1089    fn drop(&mut self) {
1090        #[cfg(test)]
1091        self.check_invariants();
1092    }
1093}
1094
1095impl Executor for RealExecutor {
1096    type Sleep = Sleep;
1097
1098    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1099        tokio::task::spawn(future);
1100    }
1101
1102    fn sleep(&self, duration: Duration) -> Self::Sleep {
1103        tokio::time::sleep(duration)
1104    }
1105}
1106
1107fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1108where
1109    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1110{
1111    for receiver_id in receiver_ids {
1112        if receiver_id != sender_id {
1113            f(receiver_id).log_err();
1114        }
1115    }
1116}
1117
1118lazy_static! {
1119    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1120}
1121
1122pub struct ProtocolVersion(u32);
1123
1124impl Header for ProtocolVersion {
1125    fn name() -> &'static HeaderName {
1126        &ZED_PROTOCOL_VERSION
1127    }
1128
1129    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1130    where
1131        Self: Sized,
1132        I: Iterator<Item = &'i axum::http::HeaderValue>,
1133    {
1134        let version = values
1135            .next()
1136            .ok_or_else(|| axum::headers::Error::invalid())?
1137            .to_str()
1138            .map_err(|_| axum::headers::Error::invalid())?
1139            .parse()
1140            .map_err(|_| axum::headers::Error::invalid())?;
1141        Ok(Self(version))
1142    }
1143
1144    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1145        values.extend([self.0.to_string().parse().unwrap()]);
1146    }
1147}
1148
1149pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1150    let server = Server::new(app_state.clone(), None);
1151    Router::new()
1152        .route("/rpc", get(handle_websocket_request))
1153        .layer(
1154            ServiceBuilder::new()
1155                .layer(Extension(app_state))
1156                .layer(middleware::from_fn(auth::validate_header))
1157                .layer(Extension(server)),
1158        )
1159}
1160
1161pub async fn handle_websocket_request(
1162    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1163    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1164    Extension(server): Extension<Arc<Server>>,
1165    Extension(user_id): Extension<UserId>,
1166    ws: WebSocketUpgrade,
1167) -> Response {
1168    if protocol_version != rpc::PROTOCOL_VERSION {
1169        return (
1170            StatusCode::UPGRADE_REQUIRED,
1171            "client must be upgraded".to_string(),
1172        )
1173            .into_response();
1174    }
1175    let socket_address = socket_address.to_string();
1176    ws.on_upgrade(move |socket| {
1177        let socket = socket
1178            .map_ok(to_tungstenite_message)
1179            .err_into()
1180            .with(|message| async move { Ok(to_axum_message(message)) });
1181        let connection = Connection::new(Box::pin(socket));
1182        server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
1183    })
1184}
1185
1186fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1187    match message {
1188        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1189        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1190        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1191        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1192        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1193            code: frame.code.into(),
1194            reason: frame.reason,
1195        })),
1196    }
1197}
1198
1199fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1200    match message {
1201        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1202        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1203        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1204        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1205        AxumMessage::Close(frame) => {
1206            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1207                code: frame.code.into(),
1208                reason: frame.reason,
1209            }))
1210        }
1211    }
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216    use super::*;
1217    use crate::{
1218        db::{tests::TestDb, UserId},
1219        AppState,
1220    };
1221    use ::rpc::Peer;
1222    use client::{
1223        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1224        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1225    };
1226    use collections::BTreeMap;
1227    use editor::{
1228        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1229        ToOffset, ToggleCodeActions, Undo,
1230    };
1231    use gpui::{
1232        executor::{self, Deterministic},
1233        geometry::vector::vec2f,
1234        ModelHandle, TestAppContext, ViewHandle,
1235    };
1236    use language::{
1237        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1238        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1239    };
1240    use lsp::{self, FakeLanguageServer};
1241    use parking_lot::Mutex;
1242    use project::{
1243        fs::{FakeFs, Fs as _},
1244        search::SearchQuery,
1245        worktree::WorktreeHandle,
1246        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1247    };
1248    use rand::prelude::*;
1249    use rpc::PeerId;
1250    use serde_json::json;
1251    use settings::Settings;
1252    use sqlx::types::time::OffsetDateTime;
1253    use std::{
1254        env,
1255        ops::Deref,
1256        path::{Path, PathBuf},
1257        rc::Rc,
1258        sync::{
1259            atomic::{AtomicBool, Ordering::SeqCst},
1260            Arc,
1261        },
1262        time::Duration,
1263    };
1264    use theme::ThemeRegistry;
1265    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1266
1267    #[cfg(test)]
1268    #[ctor::ctor]
1269    fn init_logger() {
1270        if std::env::var("RUST_LOG").is_ok() {
1271            env_logger::init();
1272        }
1273    }
1274
1275    #[gpui::test(iterations = 10)]
1276    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1277        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1278        let lang_registry = Arc::new(LanguageRegistry::test());
1279        let fs = FakeFs::new(cx_a.background());
1280        cx_a.foreground().forbid_parking();
1281
1282        // Connect to a server as 2 clients.
1283        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1284        let client_a = server.create_client(cx_a, "user_a").await;
1285        let client_b = server.create_client(cx_b, "user_b").await;
1286
1287        // Share a project as client A
1288        fs.insert_tree(
1289            "/a",
1290            json!({
1291                ".zed.toml": r#"collaborators = ["user_b"]"#,
1292                "a.txt": "a-contents",
1293                "b.txt": "b-contents",
1294            }),
1295        )
1296        .await;
1297        let project_a = cx_a.update(|cx| {
1298            Project::local(
1299                client_a.clone(),
1300                client_a.user_store.clone(),
1301                lang_registry.clone(),
1302                fs.clone(),
1303                cx,
1304            )
1305        });
1306        let (worktree_a, _) = project_a
1307            .update(cx_a, |p, cx| {
1308                p.find_or_create_local_worktree("/a", true, cx)
1309            })
1310            .await
1311            .unwrap();
1312        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1313        worktree_a
1314            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1315            .await;
1316        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1317        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1318
1319        // Join that project as client B
1320        let project_b = Project::remote(
1321            project_id,
1322            client_b.clone(),
1323            client_b.user_store.clone(),
1324            lang_registry.clone(),
1325            fs.clone(),
1326            &mut cx_b.to_async(),
1327        )
1328        .await
1329        .unwrap();
1330
1331        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1332            assert_eq!(
1333                project
1334                    .collaborators()
1335                    .get(&client_a.peer_id)
1336                    .unwrap()
1337                    .user
1338                    .github_login,
1339                "user_a"
1340            );
1341            project.replica_id()
1342        });
1343        project_a
1344            .condition(&cx_a, |tree, _| {
1345                tree.collaborators()
1346                    .get(&client_b.peer_id)
1347                    .map_or(false, |collaborator| {
1348                        collaborator.replica_id == replica_id_b
1349                            && collaborator.user.github_login == "user_b"
1350                    })
1351            })
1352            .await;
1353
1354        // Open the same file as client B and client A.
1355        let buffer_b = project_b
1356            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1357            .await
1358            .unwrap();
1359        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1360        project_a.read_with(cx_a, |project, cx| {
1361            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1362        });
1363        let buffer_a = project_a
1364            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1365            .await
1366            .unwrap();
1367
1368        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1369
1370        // TODO
1371        // // Create a selection set as client B and see that selection set as client A.
1372        // buffer_a
1373        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1374        //     .await;
1375
1376        // Edit the buffer as client B and see that edit as client A.
1377        editor_b.update(cx_b, |editor, cx| {
1378            editor.handle_input(&Input("ok, ".into()), cx)
1379        });
1380        buffer_a
1381            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1382            .await;
1383
1384        // TODO
1385        // // Remove the selection set as client B, see those selections disappear as client A.
1386        cx_b.update(move |_| drop(editor_b));
1387        // buffer_a
1388        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1389        //     .await;
1390
1391        // Dropping the client B's project removes client B from client A's collaborators.
1392        cx_b.update(move |_| drop(project_b));
1393        project_a
1394            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1395            .await;
1396    }
1397
1398    #[gpui::test(iterations = 10)]
1399    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1400        let lang_registry = Arc::new(LanguageRegistry::test());
1401        let fs = FakeFs::new(cx_a.background());
1402        cx_a.foreground().forbid_parking();
1403
1404        // Connect to a server as 2 clients.
1405        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1406        let client_a = server.create_client(cx_a, "user_a").await;
1407        let client_b = server.create_client(cx_b, "user_b").await;
1408
1409        // Share a project as client A
1410        fs.insert_tree(
1411            "/a",
1412            json!({
1413                ".zed.toml": r#"collaborators = ["user_b"]"#,
1414                "a.txt": "a-contents",
1415                "b.txt": "b-contents",
1416            }),
1417        )
1418        .await;
1419        let project_a = cx_a.update(|cx| {
1420            Project::local(
1421                client_a.clone(),
1422                client_a.user_store.clone(),
1423                lang_registry.clone(),
1424                fs.clone(),
1425                cx,
1426            )
1427        });
1428        let (worktree_a, _) = project_a
1429            .update(cx_a, |p, cx| {
1430                p.find_or_create_local_worktree("/a", true, cx)
1431            })
1432            .await
1433            .unwrap();
1434        worktree_a
1435            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1436            .await;
1437        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1438        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1439        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1440        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1441
1442        // Join that project as client B
1443        let project_b = Project::remote(
1444            project_id,
1445            client_b.clone(),
1446            client_b.user_store.clone(),
1447            lang_registry.clone(),
1448            fs.clone(),
1449            &mut cx_b.to_async(),
1450        )
1451        .await
1452        .unwrap();
1453        project_b
1454            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1455            .await
1456            .unwrap();
1457
1458        // Unshare the project as client A
1459        project_a.update(cx_a, |project, cx| project.unshare(cx));
1460        project_b
1461            .condition(cx_b, |project, _| project.is_read_only())
1462            .await;
1463        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1464        cx_b.update(|_| {
1465            drop(project_b);
1466        });
1467
1468        // Share the project again and ensure guests can still join.
1469        project_a
1470            .update(cx_a, |project, cx| project.share(cx))
1471            .await
1472            .unwrap();
1473        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1474
1475        let project_b2 = Project::remote(
1476            project_id,
1477            client_b.clone(),
1478            client_b.user_store.clone(),
1479            lang_registry.clone(),
1480            fs.clone(),
1481            &mut cx_b.to_async(),
1482        )
1483        .await
1484        .unwrap();
1485        project_b2
1486            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1487            .await
1488            .unwrap();
1489    }
1490
1491    #[gpui::test(iterations = 10)]
1492    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1493        let lang_registry = Arc::new(LanguageRegistry::test());
1494        let fs = FakeFs::new(cx_a.background());
1495        cx_a.foreground().forbid_parking();
1496
1497        // Connect to a server as 2 clients.
1498        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1499        let client_a = server.create_client(cx_a, "user_a").await;
1500        let client_b = server.create_client(cx_b, "user_b").await;
1501
1502        // Share a project as client A
1503        fs.insert_tree(
1504            "/a",
1505            json!({
1506                ".zed.toml": r#"collaborators = ["user_b"]"#,
1507                "a.txt": "a-contents",
1508                "b.txt": "b-contents",
1509            }),
1510        )
1511        .await;
1512        let project_a = cx_a.update(|cx| {
1513            Project::local(
1514                client_a.clone(),
1515                client_a.user_store.clone(),
1516                lang_registry.clone(),
1517                fs.clone(),
1518                cx,
1519            )
1520        });
1521        let (worktree_a, _) = project_a
1522            .update(cx_a, |p, cx| {
1523                p.find_or_create_local_worktree("/a", true, cx)
1524            })
1525            .await
1526            .unwrap();
1527        worktree_a
1528            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1529            .await;
1530        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1531        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1532        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1533        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1534
1535        // Join that project as client B
1536        let project_b = Project::remote(
1537            project_id,
1538            client_b.clone(),
1539            client_b.user_store.clone(),
1540            lang_registry.clone(),
1541            fs.clone(),
1542            &mut cx_b.to_async(),
1543        )
1544        .await
1545        .unwrap();
1546        project_b
1547            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1548            .await
1549            .unwrap();
1550
1551        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1552        server.disconnect_client(client_a.current_user_id(cx_a));
1553        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1554        project_a
1555            .condition(cx_a, |project, _| project.collaborators().is_empty())
1556            .await;
1557        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1558        project_b
1559            .condition(cx_b, |project, _| project.is_read_only())
1560            .await;
1561        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1562        cx_b.update(|_| {
1563            drop(project_b);
1564        });
1565
1566        // Await reconnection
1567        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1568
1569        // Share the project again and ensure guests can still join.
1570        project_a
1571            .update(cx_a, |project, cx| project.share(cx))
1572            .await
1573            .unwrap();
1574        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1575
1576        let project_b2 = Project::remote(
1577            project_id,
1578            client_b.clone(),
1579            client_b.user_store.clone(),
1580            lang_registry.clone(),
1581            fs.clone(),
1582            &mut cx_b.to_async(),
1583        )
1584        .await
1585        .unwrap();
1586        project_b2
1587            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1588            .await
1589            .unwrap();
1590    }
1591
1592    #[gpui::test(iterations = 10)]
1593    async fn test_propagate_saves_and_fs_changes(
1594        cx_a: &mut TestAppContext,
1595        cx_b: &mut TestAppContext,
1596        cx_c: &mut TestAppContext,
1597    ) {
1598        let lang_registry = Arc::new(LanguageRegistry::test());
1599        let fs = FakeFs::new(cx_a.background());
1600        cx_a.foreground().forbid_parking();
1601
1602        // Connect to a server as 3 clients.
1603        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1604        let client_a = server.create_client(cx_a, "user_a").await;
1605        let client_b = server.create_client(cx_b, "user_b").await;
1606        let client_c = server.create_client(cx_c, "user_c").await;
1607
1608        // Share a worktree as client A.
1609        fs.insert_tree(
1610            "/a",
1611            json!({
1612                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1613                "file1": "",
1614                "file2": ""
1615            }),
1616        )
1617        .await;
1618        let project_a = cx_a.update(|cx| {
1619            Project::local(
1620                client_a.clone(),
1621                client_a.user_store.clone(),
1622                lang_registry.clone(),
1623                fs.clone(),
1624                cx,
1625            )
1626        });
1627        let (worktree_a, _) = project_a
1628            .update(cx_a, |p, cx| {
1629                p.find_or_create_local_worktree("/a", true, cx)
1630            })
1631            .await
1632            .unwrap();
1633        worktree_a
1634            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1635            .await;
1636        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1637        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1638        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1639
1640        // Join that worktree as clients B and C.
1641        let project_b = Project::remote(
1642            project_id,
1643            client_b.clone(),
1644            client_b.user_store.clone(),
1645            lang_registry.clone(),
1646            fs.clone(),
1647            &mut cx_b.to_async(),
1648        )
1649        .await
1650        .unwrap();
1651        let project_c = Project::remote(
1652            project_id,
1653            client_c.clone(),
1654            client_c.user_store.clone(),
1655            lang_registry.clone(),
1656            fs.clone(),
1657            &mut cx_c.to_async(),
1658        )
1659        .await
1660        .unwrap();
1661        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1662        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1663
1664        // Open and edit a buffer as both guests B and C.
1665        let buffer_b = project_b
1666            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1667            .await
1668            .unwrap();
1669        let buffer_c = project_c
1670            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1671            .await
1672            .unwrap();
1673        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1674        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1675
1676        // Open and edit that buffer as the host.
1677        let buffer_a = project_a
1678            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1679            .await
1680            .unwrap();
1681
1682        buffer_a
1683            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1684            .await;
1685        buffer_a.update(cx_a, |buf, cx| {
1686            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1687        });
1688
1689        // Wait for edits to propagate
1690        buffer_a
1691            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1692            .await;
1693        buffer_b
1694            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1695            .await;
1696        buffer_c
1697            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1698            .await;
1699
1700        // Edit the buffer as the host and concurrently save as guest B.
1701        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1702        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1703        save_b.await.unwrap();
1704        assert_eq!(
1705            fs.load("/a/file1".as_ref()).await.unwrap(),
1706            "hi-a, i-am-c, i-am-b, i-am-a"
1707        );
1708        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1709        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1710        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1711
1712        worktree_a.flush_fs_events(cx_a).await;
1713
1714        // Make changes on host's file system, see those changes on guest worktrees.
1715        fs.rename(
1716            "/a/file1".as_ref(),
1717            "/a/file1-renamed".as_ref(),
1718            Default::default(),
1719        )
1720        .await
1721        .unwrap();
1722
1723        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1724            .await
1725            .unwrap();
1726        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1727
1728        worktree_a
1729            .condition(&cx_a, |tree, _| {
1730                tree.paths()
1731                    .map(|p| p.to_string_lossy())
1732                    .collect::<Vec<_>>()
1733                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1734            })
1735            .await;
1736        worktree_b
1737            .condition(&cx_b, |tree, _| {
1738                tree.paths()
1739                    .map(|p| p.to_string_lossy())
1740                    .collect::<Vec<_>>()
1741                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1742            })
1743            .await;
1744        worktree_c
1745            .condition(&cx_c, |tree, _| {
1746                tree.paths()
1747                    .map(|p| p.to_string_lossy())
1748                    .collect::<Vec<_>>()
1749                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1750            })
1751            .await;
1752
1753        // Ensure buffer files are updated as well.
1754        buffer_a
1755            .condition(&cx_a, |buf, _| {
1756                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1757            })
1758            .await;
1759        buffer_b
1760            .condition(&cx_b, |buf, _| {
1761                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1762            })
1763            .await;
1764        buffer_c
1765            .condition(&cx_c, |buf, _| {
1766                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1767            })
1768            .await;
1769    }
1770
1771    #[gpui::test(iterations = 10)]
1772    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1773        cx_a.foreground().forbid_parking();
1774        let lang_registry = Arc::new(LanguageRegistry::test());
1775        let fs = FakeFs::new(cx_a.background());
1776
1777        // Connect to a server as 2 clients.
1778        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1779        let client_a = server.create_client(cx_a, "user_a").await;
1780        let client_b = server.create_client(cx_b, "user_b").await;
1781
1782        // Share a project as client A
1783        fs.insert_tree(
1784            "/dir",
1785            json!({
1786                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1787                "a.txt": "a-contents",
1788            }),
1789        )
1790        .await;
1791
1792        let project_a = cx_a.update(|cx| {
1793            Project::local(
1794                client_a.clone(),
1795                client_a.user_store.clone(),
1796                lang_registry.clone(),
1797                fs.clone(),
1798                cx,
1799            )
1800        });
1801        let (worktree_a, _) = project_a
1802            .update(cx_a, |p, cx| {
1803                p.find_or_create_local_worktree("/dir", true, cx)
1804            })
1805            .await
1806            .unwrap();
1807        worktree_a
1808            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1809            .await;
1810        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1811        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1812        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1813
1814        // Join that project as client B
1815        let project_b = Project::remote(
1816            project_id,
1817            client_b.clone(),
1818            client_b.user_store.clone(),
1819            lang_registry.clone(),
1820            fs.clone(),
1821            &mut cx_b.to_async(),
1822        )
1823        .await
1824        .unwrap();
1825
1826        // Open a buffer as client B
1827        let buffer_b = project_b
1828            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1829            .await
1830            .unwrap();
1831
1832        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1833        buffer_b.read_with(cx_b, |buf, _| {
1834            assert!(buf.is_dirty());
1835            assert!(!buf.has_conflict());
1836        });
1837
1838        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1839        buffer_b
1840            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1841            .await;
1842        buffer_b.read_with(cx_b, |buf, _| {
1843            assert!(!buf.has_conflict());
1844        });
1845
1846        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1847        buffer_b.read_with(cx_b, |buf, _| {
1848            assert!(buf.is_dirty());
1849            assert!(!buf.has_conflict());
1850        });
1851    }
1852
1853    #[gpui::test(iterations = 10)]
1854    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1855        cx_a.foreground().forbid_parking();
1856        let lang_registry = Arc::new(LanguageRegistry::test());
1857        let fs = FakeFs::new(cx_a.background());
1858
1859        // Connect to a server as 2 clients.
1860        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1861        let client_a = server.create_client(cx_a, "user_a").await;
1862        let client_b = server.create_client(cx_b, "user_b").await;
1863
1864        // Share a project as client A
1865        fs.insert_tree(
1866            "/dir",
1867            json!({
1868                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1869                "a.txt": "a-contents",
1870            }),
1871        )
1872        .await;
1873
1874        let project_a = cx_a.update(|cx| {
1875            Project::local(
1876                client_a.clone(),
1877                client_a.user_store.clone(),
1878                lang_registry.clone(),
1879                fs.clone(),
1880                cx,
1881            )
1882        });
1883        let (worktree_a, _) = project_a
1884            .update(cx_a, |p, cx| {
1885                p.find_or_create_local_worktree("/dir", true, cx)
1886            })
1887            .await
1888            .unwrap();
1889        worktree_a
1890            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1891            .await;
1892        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1893        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1894        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1895
1896        // Join that project as client B
1897        let project_b = Project::remote(
1898            project_id,
1899            client_b.clone(),
1900            client_b.user_store.clone(),
1901            lang_registry.clone(),
1902            fs.clone(),
1903            &mut cx_b.to_async(),
1904        )
1905        .await
1906        .unwrap();
1907        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1908
1909        // Open a buffer as client B
1910        let buffer_b = project_b
1911            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1912            .await
1913            .unwrap();
1914        buffer_b.read_with(cx_b, |buf, _| {
1915            assert!(!buf.is_dirty());
1916            assert!(!buf.has_conflict());
1917        });
1918
1919        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1920            .await
1921            .unwrap();
1922        buffer_b
1923            .condition(&cx_b, |buf, _| {
1924                buf.text() == "new contents" && !buf.is_dirty()
1925            })
1926            .await;
1927        buffer_b.read_with(cx_b, |buf, _| {
1928            assert!(!buf.has_conflict());
1929        });
1930    }
1931
1932    #[gpui::test(iterations = 10)]
1933    async fn test_editing_while_guest_opens_buffer(
1934        cx_a: &mut TestAppContext,
1935        cx_b: &mut TestAppContext,
1936    ) {
1937        cx_a.foreground().forbid_parking();
1938        let lang_registry = Arc::new(LanguageRegistry::test());
1939        let fs = FakeFs::new(cx_a.background());
1940
1941        // Connect to a server as 2 clients.
1942        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1943        let client_a = server.create_client(cx_a, "user_a").await;
1944        let client_b = server.create_client(cx_b, "user_b").await;
1945
1946        // Share a project as client A
1947        fs.insert_tree(
1948            "/dir",
1949            json!({
1950                ".zed.toml": r#"collaborators = ["user_b"]"#,
1951                "a.txt": "a-contents",
1952            }),
1953        )
1954        .await;
1955        let project_a = cx_a.update(|cx| {
1956            Project::local(
1957                client_a.clone(),
1958                client_a.user_store.clone(),
1959                lang_registry.clone(),
1960                fs.clone(),
1961                cx,
1962            )
1963        });
1964        let (worktree_a, _) = project_a
1965            .update(cx_a, |p, cx| {
1966                p.find_or_create_local_worktree("/dir", true, cx)
1967            })
1968            .await
1969            .unwrap();
1970        worktree_a
1971            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1972            .await;
1973        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1974        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1975        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1976
1977        // Join that project as client B
1978        let project_b = Project::remote(
1979            project_id,
1980            client_b.clone(),
1981            client_b.user_store.clone(),
1982            lang_registry.clone(),
1983            fs.clone(),
1984            &mut cx_b.to_async(),
1985        )
1986        .await
1987        .unwrap();
1988
1989        // Open a buffer as client A
1990        let buffer_a = project_a
1991            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1992            .await
1993            .unwrap();
1994
1995        // Start opening the same buffer as client B
1996        let buffer_b = cx_b
1997            .background()
1998            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1999
2000        // Edit the buffer as client A while client B is still opening it.
2001        cx_b.background().simulate_random_delay().await;
2002        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
2003        cx_b.background().simulate_random_delay().await;
2004        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
2005
2006        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2007        let buffer_b = buffer_b.await.unwrap();
2008        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2009    }
2010
2011    #[gpui::test(iterations = 10)]
2012    async fn test_leaving_worktree_while_opening_buffer(
2013        cx_a: &mut TestAppContext,
2014        cx_b: &mut TestAppContext,
2015    ) {
2016        cx_a.foreground().forbid_parking();
2017        let lang_registry = Arc::new(LanguageRegistry::test());
2018        let fs = FakeFs::new(cx_a.background());
2019
2020        // Connect to a server as 2 clients.
2021        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2022        let client_a = server.create_client(cx_a, "user_a").await;
2023        let client_b = server.create_client(cx_b, "user_b").await;
2024
2025        // Share a project as client A
2026        fs.insert_tree(
2027            "/dir",
2028            json!({
2029                ".zed.toml": r#"collaborators = ["user_b"]"#,
2030                "a.txt": "a-contents",
2031            }),
2032        )
2033        .await;
2034        let project_a = cx_a.update(|cx| {
2035            Project::local(
2036                client_a.clone(),
2037                client_a.user_store.clone(),
2038                lang_registry.clone(),
2039                fs.clone(),
2040                cx,
2041            )
2042        });
2043        let (worktree_a, _) = project_a
2044            .update(cx_a, |p, cx| {
2045                p.find_or_create_local_worktree("/dir", true, cx)
2046            })
2047            .await
2048            .unwrap();
2049        worktree_a
2050            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2051            .await;
2052        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2053        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2054        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2055
2056        // Join that project as client B
2057        let project_b = Project::remote(
2058            project_id,
2059            client_b.clone(),
2060            client_b.user_store.clone(),
2061            lang_registry.clone(),
2062            fs.clone(),
2063            &mut cx_b.to_async(),
2064        )
2065        .await
2066        .unwrap();
2067
2068        // See that a guest has joined as client A.
2069        project_a
2070            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2071            .await;
2072
2073        // Begin opening a buffer as client B, but leave the project before the open completes.
2074        let buffer_b = cx_b
2075            .background()
2076            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2077        cx_b.update(|_| drop(project_b));
2078        drop(buffer_b);
2079
2080        // See that the guest has left.
2081        project_a
2082            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2083            .await;
2084    }
2085
2086    #[gpui::test(iterations = 10)]
2087    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2088        cx_a.foreground().forbid_parking();
2089        let lang_registry = Arc::new(LanguageRegistry::test());
2090        let fs = FakeFs::new(cx_a.background());
2091
2092        // Connect to a server as 2 clients.
2093        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2094        let client_a = server.create_client(cx_a, "user_a").await;
2095        let client_b = server.create_client(cx_b, "user_b").await;
2096
2097        // Share a project as client A
2098        fs.insert_tree(
2099            "/a",
2100            json!({
2101                ".zed.toml": r#"collaborators = ["user_b"]"#,
2102                "a.txt": "a-contents",
2103                "b.txt": "b-contents",
2104            }),
2105        )
2106        .await;
2107        let project_a = cx_a.update(|cx| {
2108            Project::local(
2109                client_a.clone(),
2110                client_a.user_store.clone(),
2111                lang_registry.clone(),
2112                fs.clone(),
2113                cx,
2114            )
2115        });
2116        let (worktree_a, _) = project_a
2117            .update(cx_a, |p, cx| {
2118                p.find_or_create_local_worktree("/a", true, cx)
2119            })
2120            .await
2121            .unwrap();
2122        worktree_a
2123            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2124            .await;
2125        let project_id = project_a
2126            .update(cx_a, |project, _| project.next_remote_id())
2127            .await;
2128        project_a
2129            .update(cx_a, |project, cx| project.share(cx))
2130            .await
2131            .unwrap();
2132
2133        // Join that project as client B
2134        let _project_b = Project::remote(
2135            project_id,
2136            client_b.clone(),
2137            client_b.user_store.clone(),
2138            lang_registry.clone(),
2139            fs.clone(),
2140            &mut cx_b.to_async(),
2141        )
2142        .await
2143        .unwrap();
2144
2145        // Client A sees that a guest has joined.
2146        project_a
2147            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2148            .await;
2149
2150        // Drop client B's connection and ensure client A observes client B leaving the project.
2151        client_b.disconnect(&cx_b.to_async()).unwrap();
2152        project_a
2153            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2154            .await;
2155
2156        // Rejoin the project as client B
2157        let _project_b = Project::remote(
2158            project_id,
2159            client_b.clone(),
2160            client_b.user_store.clone(),
2161            lang_registry.clone(),
2162            fs.clone(),
2163            &mut cx_b.to_async(),
2164        )
2165        .await
2166        .unwrap();
2167
2168        // Client A sees that a guest has re-joined.
2169        project_a
2170            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2171            .await;
2172
2173        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2174        client_b.wait_for_current_user(cx_b).await;
2175        server.disconnect_client(client_b.current_user_id(cx_b));
2176        cx_a.foreground().advance_clock(Duration::from_secs(3));
2177        project_a
2178            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2179            .await;
2180    }
2181
2182    #[gpui::test(iterations = 10)]
2183    async fn test_collaborating_with_diagnostics(
2184        cx_a: &mut TestAppContext,
2185        cx_b: &mut TestAppContext,
2186    ) {
2187        cx_a.foreground().forbid_parking();
2188        let lang_registry = Arc::new(LanguageRegistry::test());
2189        let fs = FakeFs::new(cx_a.background());
2190
2191        // Set up a fake language server.
2192        let mut language = Language::new(
2193            LanguageConfig {
2194                name: "Rust".into(),
2195                path_suffixes: vec!["rs".to_string()],
2196                ..Default::default()
2197            },
2198            Some(tree_sitter_rust::language()),
2199        );
2200        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2201        lang_registry.add(Arc::new(language));
2202
2203        // Connect to a server as 2 clients.
2204        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2205        let client_a = server.create_client(cx_a, "user_a").await;
2206        let client_b = server.create_client(cx_b, "user_b").await;
2207
2208        // Share a project as client A
2209        fs.insert_tree(
2210            "/a",
2211            json!({
2212                ".zed.toml": r#"collaborators = ["user_b"]"#,
2213                "a.rs": "let one = two",
2214                "other.rs": "",
2215            }),
2216        )
2217        .await;
2218        let project_a = cx_a.update(|cx| {
2219            Project::local(
2220                client_a.clone(),
2221                client_a.user_store.clone(),
2222                lang_registry.clone(),
2223                fs.clone(),
2224                cx,
2225            )
2226        });
2227        let (worktree_a, _) = project_a
2228            .update(cx_a, |p, cx| {
2229                p.find_or_create_local_worktree("/a", true, cx)
2230            })
2231            .await
2232            .unwrap();
2233        worktree_a
2234            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2235            .await;
2236        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2237        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2238        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2239
2240        // Cause the language server to start.
2241        let _ = cx_a
2242            .background()
2243            .spawn(project_a.update(cx_a, |project, cx| {
2244                project.open_buffer(
2245                    ProjectPath {
2246                        worktree_id,
2247                        path: Path::new("other.rs").into(),
2248                    },
2249                    cx,
2250                )
2251            }))
2252            .await
2253            .unwrap();
2254
2255        // Simulate a language server reporting errors for a file.
2256        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2257        fake_language_server
2258            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2259            .await;
2260        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2261            lsp::PublishDiagnosticsParams {
2262                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2263                version: None,
2264                diagnostics: vec![lsp::Diagnostic {
2265                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2266                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2267                    message: "message 1".to_string(),
2268                    ..Default::default()
2269                }],
2270            },
2271        );
2272
2273        // Wait for server to see the diagnostics update.
2274        server
2275            .condition(|store| {
2276                let worktree = store
2277                    .project(project_id)
2278                    .unwrap()
2279                    .share
2280                    .as_ref()
2281                    .unwrap()
2282                    .worktrees
2283                    .get(&worktree_id.to_proto())
2284                    .unwrap();
2285
2286                !worktree.diagnostic_summaries.is_empty()
2287            })
2288            .await;
2289
2290        // Join the worktree as client B.
2291        let project_b = Project::remote(
2292            project_id,
2293            client_b.clone(),
2294            client_b.user_store.clone(),
2295            lang_registry.clone(),
2296            fs.clone(),
2297            &mut cx_b.to_async(),
2298        )
2299        .await
2300        .unwrap();
2301
2302        project_b.read_with(cx_b, |project, cx| {
2303            assert_eq!(
2304                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2305                &[(
2306                    ProjectPath {
2307                        worktree_id,
2308                        path: Arc::from(Path::new("a.rs")),
2309                    },
2310                    DiagnosticSummary {
2311                        error_count: 1,
2312                        warning_count: 0,
2313                        ..Default::default()
2314                    },
2315                )]
2316            )
2317        });
2318
2319        // Simulate a language server reporting more errors for a file.
2320        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2321            lsp::PublishDiagnosticsParams {
2322                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2323                version: None,
2324                diagnostics: vec![
2325                    lsp::Diagnostic {
2326                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2327                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2328                        message: "message 1".to_string(),
2329                        ..Default::default()
2330                    },
2331                    lsp::Diagnostic {
2332                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2333                        range: lsp::Range::new(
2334                            lsp::Position::new(0, 10),
2335                            lsp::Position::new(0, 13),
2336                        ),
2337                        message: "message 2".to_string(),
2338                        ..Default::default()
2339                    },
2340                ],
2341            },
2342        );
2343
2344        // Client b gets the updated summaries
2345        project_b
2346            .condition(&cx_b, |project, cx| {
2347                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2348                    == &[(
2349                        ProjectPath {
2350                            worktree_id,
2351                            path: Arc::from(Path::new("a.rs")),
2352                        },
2353                        DiagnosticSummary {
2354                            error_count: 1,
2355                            warning_count: 1,
2356                            ..Default::default()
2357                        },
2358                    )]
2359            })
2360            .await;
2361
2362        // Open the file with the errors on client B. They should be present.
2363        let buffer_b = cx_b
2364            .background()
2365            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2366            .await
2367            .unwrap();
2368
2369        buffer_b.read_with(cx_b, |buffer, _| {
2370            assert_eq!(
2371                buffer
2372                    .snapshot()
2373                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2374                    .map(|entry| entry)
2375                    .collect::<Vec<_>>(),
2376                &[
2377                    DiagnosticEntry {
2378                        range: Point::new(0, 4)..Point::new(0, 7),
2379                        diagnostic: Diagnostic {
2380                            group_id: 0,
2381                            message: "message 1".to_string(),
2382                            severity: lsp::DiagnosticSeverity::ERROR,
2383                            is_primary: true,
2384                            ..Default::default()
2385                        }
2386                    },
2387                    DiagnosticEntry {
2388                        range: Point::new(0, 10)..Point::new(0, 13),
2389                        diagnostic: Diagnostic {
2390                            group_id: 1,
2391                            severity: lsp::DiagnosticSeverity::WARNING,
2392                            message: "message 2".to_string(),
2393                            is_primary: true,
2394                            ..Default::default()
2395                        }
2396                    }
2397                ]
2398            );
2399        });
2400
2401        // Simulate a language server reporting no errors for a file.
2402        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2403            lsp::PublishDiagnosticsParams {
2404                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2405                version: None,
2406                diagnostics: vec![],
2407            },
2408        );
2409        project_a
2410            .condition(cx_a, |project, cx| {
2411                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2412            })
2413            .await;
2414        project_b
2415            .condition(cx_b, |project, cx| {
2416                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2417            })
2418            .await;
2419    }
2420
2421    #[gpui::test(iterations = 10)]
2422    async fn test_collaborating_with_completion(
2423        cx_a: &mut TestAppContext,
2424        cx_b: &mut TestAppContext,
2425    ) {
2426        cx_a.foreground().forbid_parking();
2427        let lang_registry = Arc::new(LanguageRegistry::test());
2428        let fs = FakeFs::new(cx_a.background());
2429
2430        // Set up a fake language server.
2431        let mut language = Language::new(
2432            LanguageConfig {
2433                name: "Rust".into(),
2434                path_suffixes: vec!["rs".to_string()],
2435                ..Default::default()
2436            },
2437            Some(tree_sitter_rust::language()),
2438        );
2439        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2440            capabilities: lsp::ServerCapabilities {
2441                completion_provider: Some(lsp::CompletionOptions {
2442                    trigger_characters: Some(vec![".".to_string()]),
2443                    ..Default::default()
2444                }),
2445                ..Default::default()
2446            },
2447            ..Default::default()
2448        });
2449        lang_registry.add(Arc::new(language));
2450
2451        // Connect to a server as 2 clients.
2452        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2453        let client_a = server.create_client(cx_a, "user_a").await;
2454        let client_b = server.create_client(cx_b, "user_b").await;
2455
2456        // Share a project as client A
2457        fs.insert_tree(
2458            "/a",
2459            json!({
2460                ".zed.toml": r#"collaborators = ["user_b"]"#,
2461                "main.rs": "fn main() { a }",
2462                "other.rs": "",
2463            }),
2464        )
2465        .await;
2466        let project_a = cx_a.update(|cx| {
2467            Project::local(
2468                client_a.clone(),
2469                client_a.user_store.clone(),
2470                lang_registry.clone(),
2471                fs.clone(),
2472                cx,
2473            )
2474        });
2475        let (worktree_a, _) = project_a
2476            .update(cx_a, |p, cx| {
2477                p.find_or_create_local_worktree("/a", true, cx)
2478            })
2479            .await
2480            .unwrap();
2481        worktree_a
2482            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2483            .await;
2484        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2485        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2486        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2487
2488        // Join the worktree as client B.
2489        let project_b = Project::remote(
2490            project_id,
2491            client_b.clone(),
2492            client_b.user_store.clone(),
2493            lang_registry.clone(),
2494            fs.clone(),
2495            &mut cx_b.to_async(),
2496        )
2497        .await
2498        .unwrap();
2499
2500        // Open a file in an editor as the guest.
2501        let buffer_b = project_b
2502            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2503            .await
2504            .unwrap();
2505        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2506        let editor_b = cx_b.add_view(window_b, |cx| {
2507            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2508        });
2509
2510        let fake_language_server = fake_language_servers.next().await.unwrap();
2511        buffer_b
2512            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2513            .await;
2514
2515        // Type a completion trigger character as the guest.
2516        editor_b.update(cx_b, |editor, cx| {
2517            editor.select_ranges([13..13], None, cx);
2518            editor.handle_input(&Input(".".into()), cx);
2519            cx.focus(&editor_b);
2520        });
2521
2522        // Receive a completion request as the host's language server.
2523        // Return some completions from the host's language server.
2524        cx_a.foreground().start_waiting();
2525        fake_language_server
2526            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2527                assert_eq!(
2528                    params.text_document_position.text_document.uri,
2529                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2530                );
2531                assert_eq!(
2532                    params.text_document_position.position,
2533                    lsp::Position::new(0, 14),
2534                );
2535
2536                Ok(Some(lsp::CompletionResponse::Array(vec![
2537                    lsp::CompletionItem {
2538                        label: "first_method(…)".into(),
2539                        detail: Some("fn(&mut self, B) -> C".into()),
2540                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2541                            new_text: "first_method($1)".to_string(),
2542                            range: lsp::Range::new(
2543                                lsp::Position::new(0, 14),
2544                                lsp::Position::new(0, 14),
2545                            ),
2546                        })),
2547                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2548                        ..Default::default()
2549                    },
2550                    lsp::CompletionItem {
2551                        label: "second_method(…)".into(),
2552                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2553                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2554                            new_text: "second_method()".to_string(),
2555                            range: lsp::Range::new(
2556                                lsp::Position::new(0, 14),
2557                                lsp::Position::new(0, 14),
2558                            ),
2559                        })),
2560                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2561                        ..Default::default()
2562                    },
2563                ])))
2564            })
2565            .next()
2566            .await
2567            .unwrap();
2568        cx_a.foreground().finish_waiting();
2569
2570        // Open the buffer on the host.
2571        let buffer_a = project_a
2572            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2573            .await
2574            .unwrap();
2575        buffer_a
2576            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2577            .await;
2578
2579        // Confirm a completion on the guest.
2580        editor_b
2581            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2582            .await;
2583        editor_b.update(cx_b, |editor, cx| {
2584            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2585            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2586        });
2587
2588        // Return a resolved completion from the host's language server.
2589        // The resolved completion has an additional text edit.
2590        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2591            |params, _| async move {
2592                assert_eq!(params.label, "first_method(…)");
2593                Ok(lsp::CompletionItem {
2594                    label: "first_method(…)".into(),
2595                    detail: Some("fn(&mut self, B) -> C".into()),
2596                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2597                        new_text: "first_method($1)".to_string(),
2598                        range: lsp::Range::new(
2599                            lsp::Position::new(0, 14),
2600                            lsp::Position::new(0, 14),
2601                        ),
2602                    })),
2603                    additional_text_edits: Some(vec![lsp::TextEdit {
2604                        new_text: "use d::SomeTrait;\n".to_string(),
2605                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2606                    }]),
2607                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2608                    ..Default::default()
2609                })
2610            },
2611        );
2612
2613        // The additional edit is applied.
2614        buffer_a
2615            .condition(&cx_a, |buffer, _| {
2616                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2617            })
2618            .await;
2619        buffer_b
2620            .condition(&cx_b, |buffer, _| {
2621                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2622            })
2623            .await;
2624    }
2625
2626    #[gpui::test(iterations = 10)]
2627    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2628        cx_a.foreground().forbid_parking();
2629        let lang_registry = Arc::new(LanguageRegistry::test());
2630        let fs = FakeFs::new(cx_a.background());
2631
2632        // Connect to a server as 2 clients.
2633        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2634        let client_a = server.create_client(cx_a, "user_a").await;
2635        let client_b = server.create_client(cx_b, "user_b").await;
2636
2637        // Share a project as client A
2638        fs.insert_tree(
2639            "/a",
2640            json!({
2641                ".zed.toml": r#"collaborators = ["user_b"]"#,
2642                "a.rs": "let one = 1;",
2643            }),
2644        )
2645        .await;
2646        let project_a = cx_a.update(|cx| {
2647            Project::local(
2648                client_a.clone(),
2649                client_a.user_store.clone(),
2650                lang_registry.clone(),
2651                fs.clone(),
2652                cx,
2653            )
2654        });
2655        let (worktree_a, _) = project_a
2656            .update(cx_a, |p, cx| {
2657                p.find_or_create_local_worktree("/a", true, cx)
2658            })
2659            .await
2660            .unwrap();
2661        worktree_a
2662            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2663            .await;
2664        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2665        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2666        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2667        let buffer_a = project_a
2668            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2669            .await
2670            .unwrap();
2671
2672        // Join the worktree as client B.
2673        let project_b = Project::remote(
2674            project_id,
2675            client_b.clone(),
2676            client_b.user_store.clone(),
2677            lang_registry.clone(),
2678            fs.clone(),
2679            &mut cx_b.to_async(),
2680        )
2681        .await
2682        .unwrap();
2683
2684        let buffer_b = cx_b
2685            .background()
2686            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2687            .await
2688            .unwrap();
2689        buffer_b.update(cx_b, |buffer, cx| {
2690            buffer.edit([4..7], "six", cx);
2691            buffer.edit([10..11], "6", cx);
2692            assert_eq!(buffer.text(), "let six = 6;");
2693            assert!(buffer.is_dirty());
2694            assert!(!buffer.has_conflict());
2695        });
2696        buffer_a
2697            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
2698            .await;
2699
2700        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
2701            .await
2702            .unwrap();
2703        buffer_a
2704            .condition(cx_a, |buffer, _| buffer.has_conflict())
2705            .await;
2706        buffer_b
2707            .condition(cx_b, |buffer, _| buffer.has_conflict())
2708            .await;
2709
2710        project_b
2711            .update(cx_b, |project, cx| {
2712                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
2713            })
2714            .await
2715            .unwrap();
2716        buffer_a.read_with(cx_a, |buffer, _| {
2717            assert_eq!(buffer.text(), "let seven = 7;");
2718            assert!(!buffer.is_dirty());
2719            assert!(!buffer.has_conflict());
2720        });
2721        buffer_b.read_with(cx_b, |buffer, _| {
2722            assert_eq!(buffer.text(), "let seven = 7;");
2723            assert!(!buffer.is_dirty());
2724            assert!(!buffer.has_conflict());
2725        });
2726
2727        buffer_a.update(cx_a, |buffer, cx| {
2728            // Undoing on the host is a no-op when the reload was initiated by the guest.
2729            buffer.undo(cx);
2730            assert_eq!(buffer.text(), "let seven = 7;");
2731            assert!(!buffer.is_dirty());
2732            assert!(!buffer.has_conflict());
2733        });
2734        buffer_b.update(cx_b, |buffer, cx| {
2735            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
2736            buffer.undo(cx);
2737            assert_eq!(buffer.text(), "let six = 6;");
2738            assert!(buffer.is_dirty());
2739            assert!(!buffer.has_conflict());
2740        });
2741    }
2742
2743    #[gpui::test(iterations = 10)]
2744    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2745        cx_a.foreground().forbid_parking();
2746        let lang_registry = Arc::new(LanguageRegistry::test());
2747        let fs = FakeFs::new(cx_a.background());
2748
2749        // Set up a fake language server.
2750        let mut language = Language::new(
2751            LanguageConfig {
2752                name: "Rust".into(),
2753                path_suffixes: vec!["rs".to_string()],
2754                ..Default::default()
2755            },
2756            Some(tree_sitter_rust::language()),
2757        );
2758        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2759        lang_registry.add(Arc::new(language));
2760
2761        // Connect to a server as 2 clients.
2762        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2763        let client_a = server.create_client(cx_a, "user_a").await;
2764        let client_b = server.create_client(cx_b, "user_b").await;
2765
2766        // Share a project as client A
2767        fs.insert_tree(
2768            "/a",
2769            json!({
2770                ".zed.toml": r#"collaborators = ["user_b"]"#,
2771                "a.rs": "let one = two",
2772            }),
2773        )
2774        .await;
2775        let project_a = cx_a.update(|cx| {
2776            Project::local(
2777                client_a.clone(),
2778                client_a.user_store.clone(),
2779                lang_registry.clone(),
2780                fs.clone(),
2781                cx,
2782            )
2783        });
2784        let (worktree_a, _) = project_a
2785            .update(cx_a, |p, cx| {
2786                p.find_or_create_local_worktree("/a", true, cx)
2787            })
2788            .await
2789            .unwrap();
2790        worktree_a
2791            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2792            .await;
2793        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2794        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2795        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2796
2797        // Join the worktree as client B.
2798        let project_b = Project::remote(
2799            project_id,
2800            client_b.clone(),
2801            client_b.user_store.clone(),
2802            lang_registry.clone(),
2803            fs.clone(),
2804            &mut cx_b.to_async(),
2805        )
2806        .await
2807        .unwrap();
2808
2809        let buffer_b = cx_b
2810            .background()
2811            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2812            .await
2813            .unwrap();
2814
2815        let fake_language_server = fake_language_servers.next().await.unwrap();
2816        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
2817            Ok(Some(vec![
2818                lsp::TextEdit {
2819                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2820                    new_text: "h".to_string(),
2821                },
2822                lsp::TextEdit {
2823                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2824                    new_text: "y".to_string(),
2825                },
2826            ]))
2827        });
2828
2829        project_b
2830            .update(cx_b, |project, cx| {
2831                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2832            })
2833            .await
2834            .unwrap();
2835        assert_eq!(
2836            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2837            "let honey = two"
2838        );
2839    }
2840
2841    #[gpui::test(iterations = 10)]
2842    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2843        cx_a.foreground().forbid_parking();
2844        let lang_registry = Arc::new(LanguageRegistry::test());
2845        let fs = FakeFs::new(cx_a.background());
2846        fs.insert_tree(
2847            "/root-1",
2848            json!({
2849                ".zed.toml": r#"collaborators = ["user_b"]"#,
2850                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2851            }),
2852        )
2853        .await;
2854        fs.insert_tree(
2855            "/root-2",
2856            json!({
2857                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2858            }),
2859        )
2860        .await;
2861
2862        // Set up a fake language server.
2863        let mut language = Language::new(
2864            LanguageConfig {
2865                name: "Rust".into(),
2866                path_suffixes: vec!["rs".to_string()],
2867                ..Default::default()
2868            },
2869            Some(tree_sitter_rust::language()),
2870        );
2871        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2872        lang_registry.add(Arc::new(language));
2873
2874        // Connect to a server as 2 clients.
2875        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2876        let client_a = server.create_client(cx_a, "user_a").await;
2877        let client_b = server.create_client(cx_b, "user_b").await;
2878
2879        // Share a project as client A
2880        let project_a = cx_a.update(|cx| {
2881            Project::local(
2882                client_a.clone(),
2883                client_a.user_store.clone(),
2884                lang_registry.clone(),
2885                fs.clone(),
2886                cx,
2887            )
2888        });
2889        let (worktree_a, _) = project_a
2890            .update(cx_a, |p, cx| {
2891                p.find_or_create_local_worktree("/root-1", true, cx)
2892            })
2893            .await
2894            .unwrap();
2895        worktree_a
2896            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2897            .await;
2898        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2899        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2900        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2901
2902        // Join the worktree as client B.
2903        let project_b = Project::remote(
2904            project_id,
2905            client_b.clone(),
2906            client_b.user_store.clone(),
2907            lang_registry.clone(),
2908            fs.clone(),
2909            &mut cx_b.to_async(),
2910        )
2911        .await
2912        .unwrap();
2913
2914        // Open the file on client B.
2915        let buffer_b = cx_b
2916            .background()
2917            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2918            .await
2919            .unwrap();
2920
2921        // Request the definition of a symbol as the guest.
2922        let fake_language_server = fake_language_servers.next().await.unwrap();
2923        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2924            |_, _| async move {
2925                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2926                    lsp::Location::new(
2927                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2928                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2929                    ),
2930                )))
2931            },
2932        );
2933
2934        let definitions_1 = project_b
2935            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2936            .await
2937            .unwrap();
2938        cx_b.read(|cx| {
2939            assert_eq!(definitions_1.len(), 1);
2940            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2941            let target_buffer = definitions_1[0].buffer.read(cx);
2942            assert_eq!(
2943                target_buffer.text(),
2944                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2945            );
2946            assert_eq!(
2947                definitions_1[0].range.to_point(target_buffer),
2948                Point::new(0, 6)..Point::new(0, 9)
2949            );
2950        });
2951
2952        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2953        // the previous call to `definition`.
2954        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2955            |_, _| async move {
2956                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2957                    lsp::Location::new(
2958                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2959                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2960                    ),
2961                )))
2962            },
2963        );
2964
2965        let definitions_2 = project_b
2966            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2967            .await
2968            .unwrap();
2969        cx_b.read(|cx| {
2970            assert_eq!(definitions_2.len(), 1);
2971            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2972            let target_buffer = definitions_2[0].buffer.read(cx);
2973            assert_eq!(
2974                target_buffer.text(),
2975                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2976            );
2977            assert_eq!(
2978                definitions_2[0].range.to_point(target_buffer),
2979                Point::new(1, 6)..Point::new(1, 11)
2980            );
2981        });
2982        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2983    }
2984
2985    #[gpui::test(iterations = 10)]
2986    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2987        cx_a.foreground().forbid_parking();
2988        let lang_registry = Arc::new(LanguageRegistry::test());
2989        let fs = FakeFs::new(cx_a.background());
2990        fs.insert_tree(
2991            "/root-1",
2992            json!({
2993                ".zed.toml": r#"collaborators = ["user_b"]"#,
2994                "one.rs": "const ONE: usize = 1;",
2995                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2996            }),
2997        )
2998        .await;
2999        fs.insert_tree(
3000            "/root-2",
3001            json!({
3002                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3003            }),
3004        )
3005        .await;
3006
3007        // Set up a fake language server.
3008        let mut language = Language::new(
3009            LanguageConfig {
3010                name: "Rust".into(),
3011                path_suffixes: vec!["rs".to_string()],
3012                ..Default::default()
3013            },
3014            Some(tree_sitter_rust::language()),
3015        );
3016        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3017        lang_registry.add(Arc::new(language));
3018
3019        // Connect to a server as 2 clients.
3020        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3021        let client_a = server.create_client(cx_a, "user_a").await;
3022        let client_b = server.create_client(cx_b, "user_b").await;
3023
3024        // Share a project as client A
3025        let project_a = cx_a.update(|cx| {
3026            Project::local(
3027                client_a.clone(),
3028                client_a.user_store.clone(),
3029                lang_registry.clone(),
3030                fs.clone(),
3031                cx,
3032            )
3033        });
3034        let (worktree_a, _) = project_a
3035            .update(cx_a, |p, cx| {
3036                p.find_or_create_local_worktree("/root-1", true, cx)
3037            })
3038            .await
3039            .unwrap();
3040        worktree_a
3041            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3042            .await;
3043        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3044        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3045        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3046
3047        // Join the worktree as client B.
3048        let project_b = Project::remote(
3049            project_id,
3050            client_b.clone(),
3051            client_b.user_store.clone(),
3052            lang_registry.clone(),
3053            fs.clone(),
3054            &mut cx_b.to_async(),
3055        )
3056        .await
3057        .unwrap();
3058
3059        // Open the file on client B.
3060        let buffer_b = cx_b
3061            .background()
3062            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3063            .await
3064            .unwrap();
3065
3066        // Request references to a symbol as the guest.
3067        let fake_language_server = fake_language_servers.next().await.unwrap();
3068        fake_language_server.handle_request::<lsp::request::References, _, _>(
3069            |params, _| async move {
3070                assert_eq!(
3071                    params.text_document_position.text_document.uri.as_str(),
3072                    "file:///root-1/one.rs"
3073                );
3074                Ok(Some(vec![
3075                    lsp::Location {
3076                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3077                        range: lsp::Range::new(
3078                            lsp::Position::new(0, 24),
3079                            lsp::Position::new(0, 27),
3080                        ),
3081                    },
3082                    lsp::Location {
3083                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3084                        range: lsp::Range::new(
3085                            lsp::Position::new(0, 35),
3086                            lsp::Position::new(0, 38),
3087                        ),
3088                    },
3089                    lsp::Location {
3090                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3091                        range: lsp::Range::new(
3092                            lsp::Position::new(0, 37),
3093                            lsp::Position::new(0, 40),
3094                        ),
3095                    },
3096                ]))
3097            },
3098        );
3099
3100        let references = project_b
3101            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3102            .await
3103            .unwrap();
3104        cx_b.read(|cx| {
3105            assert_eq!(references.len(), 3);
3106            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3107
3108            let two_buffer = references[0].buffer.read(cx);
3109            let three_buffer = references[2].buffer.read(cx);
3110            assert_eq!(
3111                two_buffer.file().unwrap().path().as_ref(),
3112                Path::new("two.rs")
3113            );
3114            assert_eq!(references[1].buffer, references[0].buffer);
3115            assert_eq!(
3116                three_buffer.file().unwrap().full_path(cx),
3117                Path::new("three.rs")
3118            );
3119
3120            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3121            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3122            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3123        });
3124    }
3125
3126    #[gpui::test(iterations = 10)]
3127    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3128        cx_a.foreground().forbid_parking();
3129        let lang_registry = Arc::new(LanguageRegistry::test());
3130        let fs = FakeFs::new(cx_a.background());
3131        fs.insert_tree(
3132            "/root-1",
3133            json!({
3134                ".zed.toml": r#"collaborators = ["user_b"]"#,
3135                "a": "hello world",
3136                "b": "goodnight moon",
3137                "c": "a world of goo",
3138                "d": "world champion of clown world",
3139            }),
3140        )
3141        .await;
3142        fs.insert_tree(
3143            "/root-2",
3144            json!({
3145                "e": "disney world is fun",
3146            }),
3147        )
3148        .await;
3149
3150        // Connect to a server as 2 clients.
3151        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3152        let client_a = server.create_client(cx_a, "user_a").await;
3153        let client_b = server.create_client(cx_b, "user_b").await;
3154
3155        // Share a project as client A
3156        let project_a = cx_a.update(|cx| {
3157            Project::local(
3158                client_a.clone(),
3159                client_a.user_store.clone(),
3160                lang_registry.clone(),
3161                fs.clone(),
3162                cx,
3163            )
3164        });
3165        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3166
3167        let (worktree_1, _) = project_a
3168            .update(cx_a, |p, cx| {
3169                p.find_or_create_local_worktree("/root-1", true, cx)
3170            })
3171            .await
3172            .unwrap();
3173        worktree_1
3174            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3175            .await;
3176        let (worktree_2, _) = project_a
3177            .update(cx_a, |p, cx| {
3178                p.find_or_create_local_worktree("/root-2", true, cx)
3179            })
3180            .await
3181            .unwrap();
3182        worktree_2
3183            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3184            .await;
3185
3186        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3187
3188        // Join the worktree as client B.
3189        let project_b = Project::remote(
3190            project_id,
3191            client_b.clone(),
3192            client_b.user_store.clone(),
3193            lang_registry.clone(),
3194            fs.clone(),
3195            &mut cx_b.to_async(),
3196        )
3197        .await
3198        .unwrap();
3199
3200        let results = project_b
3201            .update(cx_b, |project, cx| {
3202                project.search(SearchQuery::text("world", false, false), cx)
3203            })
3204            .await
3205            .unwrap();
3206
3207        let mut ranges_by_path = results
3208            .into_iter()
3209            .map(|(buffer, ranges)| {
3210                buffer.read_with(cx_b, |buffer, cx| {
3211                    let path = buffer.file().unwrap().full_path(cx);
3212                    let offset_ranges = ranges
3213                        .into_iter()
3214                        .map(|range| range.to_offset(buffer))
3215                        .collect::<Vec<_>>();
3216                    (path, offset_ranges)
3217                })
3218            })
3219            .collect::<Vec<_>>();
3220        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3221
3222        assert_eq!(
3223            ranges_by_path,
3224            &[
3225                (PathBuf::from("root-1/a"), vec![6..11]),
3226                (PathBuf::from("root-1/c"), vec![2..7]),
3227                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3228                (PathBuf::from("root-2/e"), vec![7..12]),
3229            ]
3230        );
3231    }
3232
3233    #[gpui::test(iterations = 10)]
3234    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3235        cx_a.foreground().forbid_parking();
3236        let lang_registry = Arc::new(LanguageRegistry::test());
3237        let fs = FakeFs::new(cx_a.background());
3238        fs.insert_tree(
3239            "/root-1",
3240            json!({
3241                ".zed.toml": r#"collaborators = ["user_b"]"#,
3242                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3243            }),
3244        )
3245        .await;
3246
3247        // Set up a fake language server.
3248        let mut language = Language::new(
3249            LanguageConfig {
3250                name: "Rust".into(),
3251                path_suffixes: vec!["rs".to_string()],
3252                ..Default::default()
3253            },
3254            Some(tree_sitter_rust::language()),
3255        );
3256        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3257        lang_registry.add(Arc::new(language));
3258
3259        // Connect to a server as 2 clients.
3260        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3261        let client_a = server.create_client(cx_a, "user_a").await;
3262        let client_b = server.create_client(cx_b, "user_b").await;
3263
3264        // Share a project as client A
3265        let project_a = cx_a.update(|cx| {
3266            Project::local(
3267                client_a.clone(),
3268                client_a.user_store.clone(),
3269                lang_registry.clone(),
3270                fs.clone(),
3271                cx,
3272            )
3273        });
3274        let (worktree_a, _) = project_a
3275            .update(cx_a, |p, cx| {
3276                p.find_or_create_local_worktree("/root-1", true, cx)
3277            })
3278            .await
3279            .unwrap();
3280        worktree_a
3281            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3282            .await;
3283        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3284        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3285        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3286
3287        // Join the worktree as client B.
3288        let project_b = Project::remote(
3289            project_id,
3290            client_b.clone(),
3291            client_b.user_store.clone(),
3292            lang_registry.clone(),
3293            fs.clone(),
3294            &mut cx_b.to_async(),
3295        )
3296        .await
3297        .unwrap();
3298
3299        // Open the file on client B.
3300        let buffer_b = cx_b
3301            .background()
3302            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3303            .await
3304            .unwrap();
3305
3306        // Request document highlights as the guest.
3307        let fake_language_server = fake_language_servers.next().await.unwrap();
3308        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3309            |params, _| async move {
3310                assert_eq!(
3311                    params
3312                        .text_document_position_params
3313                        .text_document
3314                        .uri
3315                        .as_str(),
3316                    "file:///root-1/main.rs"
3317                );
3318                assert_eq!(
3319                    params.text_document_position_params.position,
3320                    lsp::Position::new(0, 34)
3321                );
3322                Ok(Some(vec![
3323                    lsp::DocumentHighlight {
3324                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3325                        range: lsp::Range::new(
3326                            lsp::Position::new(0, 10),
3327                            lsp::Position::new(0, 16),
3328                        ),
3329                    },
3330                    lsp::DocumentHighlight {
3331                        kind: Some(lsp::DocumentHighlightKind::READ),
3332                        range: lsp::Range::new(
3333                            lsp::Position::new(0, 32),
3334                            lsp::Position::new(0, 38),
3335                        ),
3336                    },
3337                    lsp::DocumentHighlight {
3338                        kind: Some(lsp::DocumentHighlightKind::READ),
3339                        range: lsp::Range::new(
3340                            lsp::Position::new(0, 41),
3341                            lsp::Position::new(0, 47),
3342                        ),
3343                    },
3344                ]))
3345            },
3346        );
3347
3348        let highlights = project_b
3349            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3350            .await
3351            .unwrap();
3352        buffer_b.read_with(cx_b, |buffer, _| {
3353            let snapshot = buffer.snapshot();
3354
3355            let highlights = highlights
3356                .into_iter()
3357                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3358                .collect::<Vec<_>>();
3359            assert_eq!(
3360                highlights,
3361                &[
3362                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3363                    (lsp::DocumentHighlightKind::READ, 32..38),
3364                    (lsp::DocumentHighlightKind::READ, 41..47)
3365                ]
3366            )
3367        });
3368    }
3369
3370    #[gpui::test(iterations = 10)]
3371    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3372        cx_a.foreground().forbid_parking();
3373        let lang_registry = Arc::new(LanguageRegistry::test());
3374        let fs = FakeFs::new(cx_a.background());
3375        fs.insert_tree(
3376            "/code",
3377            json!({
3378                "crate-1": {
3379                    ".zed.toml": r#"collaborators = ["user_b"]"#,
3380                    "one.rs": "const ONE: usize = 1;",
3381                },
3382                "crate-2": {
3383                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3384                },
3385                "private": {
3386                    "passwords.txt": "the-password",
3387                }
3388            }),
3389        )
3390        .await;
3391
3392        // Set up a fake language server.
3393        let mut language = Language::new(
3394            LanguageConfig {
3395                name: "Rust".into(),
3396                path_suffixes: vec!["rs".to_string()],
3397                ..Default::default()
3398            },
3399            Some(tree_sitter_rust::language()),
3400        );
3401        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3402        lang_registry.add(Arc::new(language));
3403
3404        // Connect to a server as 2 clients.
3405        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3406        let client_a = server.create_client(cx_a, "user_a").await;
3407        let client_b = server.create_client(cx_b, "user_b").await;
3408
3409        // Share a project as client A
3410        let project_a = cx_a.update(|cx| {
3411            Project::local(
3412                client_a.clone(),
3413                client_a.user_store.clone(),
3414                lang_registry.clone(),
3415                fs.clone(),
3416                cx,
3417            )
3418        });
3419        let (worktree_a, _) = project_a
3420            .update(cx_a, |p, cx| {
3421                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3422            })
3423            .await
3424            .unwrap();
3425        worktree_a
3426            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3427            .await;
3428        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3429        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3430        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3431
3432        // Join the worktree as client B.
3433        let project_b = Project::remote(
3434            project_id,
3435            client_b.clone(),
3436            client_b.user_store.clone(),
3437            lang_registry.clone(),
3438            fs.clone(),
3439            &mut cx_b.to_async(),
3440        )
3441        .await
3442        .unwrap();
3443
3444        // Cause the language server to start.
3445        let _buffer = cx_b
3446            .background()
3447            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3448            .await
3449            .unwrap();
3450
3451        let fake_language_server = fake_language_servers.next().await.unwrap();
3452        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3453            |_, _| async move {
3454                #[allow(deprecated)]
3455                Ok(Some(vec![lsp::SymbolInformation {
3456                    name: "TWO".into(),
3457                    location: lsp::Location {
3458                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3459                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3460                    },
3461                    kind: lsp::SymbolKind::CONSTANT,
3462                    tags: None,
3463                    container_name: None,
3464                    deprecated: None,
3465                }]))
3466            },
3467        );
3468
3469        // Request the definition of a symbol as the guest.
3470        let symbols = project_b
3471            .update(cx_b, |p, cx| p.symbols("two", cx))
3472            .await
3473            .unwrap();
3474        assert_eq!(symbols.len(), 1);
3475        assert_eq!(symbols[0].name, "TWO");
3476
3477        // Open one of the returned symbols.
3478        let buffer_b_2 = project_b
3479            .update(cx_b, |project, cx| {
3480                project.open_buffer_for_symbol(&symbols[0], cx)
3481            })
3482            .await
3483            .unwrap();
3484        buffer_b_2.read_with(cx_b, |buffer, _| {
3485            assert_eq!(
3486                buffer.file().unwrap().path().as_ref(),
3487                Path::new("../crate-2/two.rs")
3488            );
3489        });
3490
3491        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3492        let mut fake_symbol = symbols[0].clone();
3493        fake_symbol.path = Path::new("/code/secrets").into();
3494        let error = project_b
3495            .update(cx_b, |project, cx| {
3496                project.open_buffer_for_symbol(&fake_symbol, cx)
3497            })
3498            .await
3499            .unwrap_err();
3500        assert!(error.to_string().contains("invalid symbol signature"));
3501    }
3502
3503    #[gpui::test(iterations = 10)]
3504    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3505        cx_a: &mut TestAppContext,
3506        cx_b: &mut TestAppContext,
3507        mut rng: StdRng,
3508    ) {
3509        cx_a.foreground().forbid_parking();
3510        let lang_registry = Arc::new(LanguageRegistry::test());
3511        let fs = FakeFs::new(cx_a.background());
3512        fs.insert_tree(
3513            "/root",
3514            json!({
3515                ".zed.toml": r#"collaborators = ["user_b"]"#,
3516                "a.rs": "const ONE: usize = b::TWO;",
3517                "b.rs": "const TWO: usize = 2",
3518            }),
3519        )
3520        .await;
3521
3522        // Set up a fake language server.
3523        let mut language = Language::new(
3524            LanguageConfig {
3525                name: "Rust".into(),
3526                path_suffixes: vec!["rs".to_string()],
3527                ..Default::default()
3528            },
3529            Some(tree_sitter_rust::language()),
3530        );
3531        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3532        lang_registry.add(Arc::new(language));
3533
3534        // Connect to a server as 2 clients.
3535        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3536        let client_a = server.create_client(cx_a, "user_a").await;
3537        let client_b = server.create_client(cx_b, "user_b").await;
3538
3539        // Share a project as client A
3540        let project_a = cx_a.update(|cx| {
3541            Project::local(
3542                client_a.clone(),
3543                client_a.user_store.clone(),
3544                lang_registry.clone(),
3545                fs.clone(),
3546                cx,
3547            )
3548        });
3549
3550        let (worktree_a, _) = project_a
3551            .update(cx_a, |p, cx| {
3552                p.find_or_create_local_worktree("/root", true, cx)
3553            })
3554            .await
3555            .unwrap();
3556        worktree_a
3557            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3558            .await;
3559        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3560        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3561        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3562
3563        // Join the worktree as client B.
3564        let project_b = Project::remote(
3565            project_id,
3566            client_b.clone(),
3567            client_b.user_store.clone(),
3568            lang_registry.clone(),
3569            fs.clone(),
3570            &mut cx_b.to_async(),
3571        )
3572        .await
3573        .unwrap();
3574
3575        let buffer_b1 = cx_b
3576            .background()
3577            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3578            .await
3579            .unwrap();
3580
3581        let fake_language_server = fake_language_servers.next().await.unwrap();
3582        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3583            |_, _| async move {
3584                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3585                    lsp::Location::new(
3586                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
3587                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3588                    ),
3589                )))
3590            },
3591        );
3592
3593        let definitions;
3594        let buffer_b2;
3595        if rng.gen() {
3596            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3597            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3598        } else {
3599            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3600            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3601        }
3602
3603        let buffer_b2 = buffer_b2.await.unwrap();
3604        let definitions = definitions.await.unwrap();
3605        assert_eq!(definitions.len(), 1);
3606        assert_eq!(definitions[0].buffer, buffer_b2);
3607    }
3608
3609    #[gpui::test(iterations = 10)]
3610    async fn test_collaborating_with_code_actions(
3611        cx_a: &mut TestAppContext,
3612        cx_b: &mut TestAppContext,
3613    ) {
3614        cx_a.foreground().forbid_parking();
3615        let lang_registry = Arc::new(LanguageRegistry::test());
3616        let fs = FakeFs::new(cx_a.background());
3617        cx_b.update(|cx| editor::init(cx));
3618
3619        // Set up a fake language server.
3620        let mut language = Language::new(
3621            LanguageConfig {
3622                name: "Rust".into(),
3623                path_suffixes: vec!["rs".to_string()],
3624                ..Default::default()
3625            },
3626            Some(tree_sitter_rust::language()),
3627        );
3628        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3629        lang_registry.add(Arc::new(language));
3630
3631        // Connect to a server as 2 clients.
3632        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3633        let client_a = server.create_client(cx_a, "user_a").await;
3634        let client_b = server.create_client(cx_b, "user_b").await;
3635
3636        // Share a project as client A
3637        fs.insert_tree(
3638            "/a",
3639            json!({
3640                ".zed.toml": r#"collaborators = ["user_b"]"#,
3641                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3642                "other.rs": "pub fn foo() -> usize { 4 }",
3643            }),
3644        )
3645        .await;
3646        let project_a = cx_a.update(|cx| {
3647            Project::local(
3648                client_a.clone(),
3649                client_a.user_store.clone(),
3650                lang_registry.clone(),
3651                fs.clone(),
3652                cx,
3653            )
3654        });
3655        let (worktree_a, _) = project_a
3656            .update(cx_a, |p, cx| {
3657                p.find_or_create_local_worktree("/a", true, cx)
3658            })
3659            .await
3660            .unwrap();
3661        worktree_a
3662            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3663            .await;
3664        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3665        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3666        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3667
3668        // Join the worktree as client B.
3669        let project_b = Project::remote(
3670            project_id,
3671            client_b.clone(),
3672            client_b.user_store.clone(),
3673            lang_registry.clone(),
3674            fs.clone(),
3675            &mut cx_b.to_async(),
3676        )
3677        .await
3678        .unwrap();
3679        let mut params = cx_b.update(WorkspaceParams::test);
3680        params.languages = lang_registry.clone();
3681        params.client = client_b.client.clone();
3682        params.user_store = client_b.user_store.clone();
3683        params.project = project_b;
3684
3685        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3686        let editor_b = workspace_b
3687            .update(cx_b, |workspace, cx| {
3688                workspace.open_path((worktree_id, "main.rs"), cx)
3689            })
3690            .await
3691            .unwrap()
3692            .downcast::<Editor>()
3693            .unwrap();
3694
3695        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3696        fake_language_server
3697            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3698                assert_eq!(
3699                    params.text_document.uri,
3700                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3701                );
3702                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3703                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3704                Ok(None)
3705            })
3706            .next()
3707            .await;
3708
3709        // Move cursor to a location that contains code actions.
3710        editor_b.update(cx_b, |editor, cx| {
3711            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3712            cx.focus(&editor_b);
3713        });
3714
3715        fake_language_server
3716            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3717                assert_eq!(
3718                    params.text_document.uri,
3719                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3720                );
3721                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3722                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3723
3724                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
3725                    lsp::CodeAction {
3726                        title: "Inline into all callers".to_string(),
3727                        edit: Some(lsp::WorkspaceEdit {
3728                            changes: Some(
3729                                [
3730                                    (
3731                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3732                                        vec![lsp::TextEdit::new(
3733                                            lsp::Range::new(
3734                                                lsp::Position::new(1, 22),
3735                                                lsp::Position::new(1, 34),
3736                                            ),
3737                                            "4".to_string(),
3738                                        )],
3739                                    ),
3740                                    (
3741                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3742                                        vec![lsp::TextEdit::new(
3743                                            lsp::Range::new(
3744                                                lsp::Position::new(0, 0),
3745                                                lsp::Position::new(0, 27),
3746                                            ),
3747                                            "".to_string(),
3748                                        )],
3749                                    ),
3750                                ]
3751                                .into_iter()
3752                                .collect(),
3753                            ),
3754                            ..Default::default()
3755                        }),
3756                        data: Some(json!({
3757                            "codeActionParams": {
3758                                "range": {
3759                                    "start": {"line": 1, "column": 31},
3760                                    "end": {"line": 1, "column": 31},
3761                                }
3762                            }
3763                        })),
3764                        ..Default::default()
3765                    },
3766                )]))
3767            })
3768            .next()
3769            .await;
3770
3771        // Toggle code actions and wait for them to display.
3772        editor_b.update(cx_b, |editor, cx| {
3773            editor.toggle_code_actions(
3774                &ToggleCodeActions {
3775                    deployed_from_indicator: false,
3776                },
3777                cx,
3778            );
3779        });
3780        editor_b
3781            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3782            .await;
3783
3784        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3785
3786        // Confirming the code action will trigger a resolve request.
3787        let confirm_action = workspace_b
3788            .update(cx_b, |workspace, cx| {
3789                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
3790            })
3791            .unwrap();
3792        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
3793            |_, _| async move {
3794                Ok(lsp::CodeAction {
3795                    title: "Inline into all callers".to_string(),
3796                    edit: Some(lsp::WorkspaceEdit {
3797                        changes: Some(
3798                            [
3799                                (
3800                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3801                                    vec![lsp::TextEdit::new(
3802                                        lsp::Range::new(
3803                                            lsp::Position::new(1, 22),
3804                                            lsp::Position::new(1, 34),
3805                                        ),
3806                                        "4".to_string(),
3807                                    )],
3808                                ),
3809                                (
3810                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
3811                                    vec![lsp::TextEdit::new(
3812                                        lsp::Range::new(
3813                                            lsp::Position::new(0, 0),
3814                                            lsp::Position::new(0, 27),
3815                                        ),
3816                                        "".to_string(),
3817                                    )],
3818                                ),
3819                            ]
3820                            .into_iter()
3821                            .collect(),
3822                        ),
3823                        ..Default::default()
3824                    }),
3825                    ..Default::default()
3826                })
3827            },
3828        );
3829
3830        // After the action is confirmed, an editor containing both modified files is opened.
3831        confirm_action.await.unwrap();
3832        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3833            workspace
3834                .active_item(cx)
3835                .unwrap()
3836                .downcast::<Editor>()
3837                .unwrap()
3838        });
3839        code_action_editor.update(cx_b, |editor, cx| {
3840            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3841            editor.undo(&Undo, cx);
3842            assert_eq!(
3843                editor.text(cx),
3844                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3845            );
3846            editor.redo(&Redo, cx);
3847            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3848        });
3849    }
3850
3851    #[gpui::test(iterations = 10)]
3852    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3853        cx_a.foreground().forbid_parking();
3854        let lang_registry = Arc::new(LanguageRegistry::test());
3855        let fs = FakeFs::new(cx_a.background());
3856        cx_b.update(|cx| editor::init(cx));
3857
3858        // Set up a fake language server.
3859        let mut language = Language::new(
3860            LanguageConfig {
3861                name: "Rust".into(),
3862                path_suffixes: vec!["rs".to_string()],
3863                ..Default::default()
3864            },
3865            Some(tree_sitter_rust::language()),
3866        );
3867        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3868            capabilities: lsp::ServerCapabilities {
3869                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
3870                    prepare_provider: Some(true),
3871                    work_done_progress_options: Default::default(),
3872                })),
3873                ..Default::default()
3874            },
3875            ..Default::default()
3876        });
3877        lang_registry.add(Arc::new(language));
3878
3879        // Connect to a server as 2 clients.
3880        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3881        let client_a = server.create_client(cx_a, "user_a").await;
3882        let client_b = server.create_client(cx_b, "user_b").await;
3883
3884        // Share a project as client A
3885        fs.insert_tree(
3886            "/dir",
3887            json!({
3888                ".zed.toml": r#"collaborators = ["user_b"]"#,
3889                "one.rs": "const ONE: usize = 1;",
3890                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3891            }),
3892        )
3893        .await;
3894        let project_a = cx_a.update(|cx| {
3895            Project::local(
3896                client_a.clone(),
3897                client_a.user_store.clone(),
3898                lang_registry.clone(),
3899                fs.clone(),
3900                cx,
3901            )
3902        });
3903        let (worktree_a, _) = project_a
3904            .update(cx_a, |p, cx| {
3905                p.find_or_create_local_worktree("/dir", true, cx)
3906            })
3907            .await
3908            .unwrap();
3909        worktree_a
3910            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3911            .await;
3912        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3913        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3914        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3915
3916        // Join the worktree as client B.
3917        let project_b = Project::remote(
3918            project_id,
3919            client_b.clone(),
3920            client_b.user_store.clone(),
3921            lang_registry.clone(),
3922            fs.clone(),
3923            &mut cx_b.to_async(),
3924        )
3925        .await
3926        .unwrap();
3927        let mut params = cx_b.update(WorkspaceParams::test);
3928        params.languages = lang_registry.clone();
3929        params.client = client_b.client.clone();
3930        params.user_store = client_b.user_store.clone();
3931        params.project = project_b;
3932
3933        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3934        let editor_b = workspace_b
3935            .update(cx_b, |workspace, cx| {
3936                workspace.open_path((worktree_id, "one.rs"), cx)
3937            })
3938            .await
3939            .unwrap()
3940            .downcast::<Editor>()
3941            .unwrap();
3942        let fake_language_server = fake_language_servers.next().await.unwrap();
3943
3944        // Move cursor to a location that can be renamed.
3945        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3946            editor.select_ranges([7..7], None, cx);
3947            editor.rename(&Rename, cx).unwrap()
3948        });
3949
3950        fake_language_server
3951            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
3952                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3953                assert_eq!(params.position, lsp::Position::new(0, 7));
3954                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3955                    lsp::Position::new(0, 6),
3956                    lsp::Position::new(0, 9),
3957                ))))
3958            })
3959            .next()
3960            .await
3961            .unwrap();
3962        prepare_rename.await.unwrap();
3963        editor_b.update(cx_b, |editor, cx| {
3964            let rename = editor.pending_rename().unwrap();
3965            let buffer = editor.buffer().read(cx).snapshot(cx);
3966            assert_eq!(
3967                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3968                6..9
3969            );
3970            rename.editor.update(cx, |rename_editor, cx| {
3971                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3972                    rename_buffer.edit([0..3], "THREE", cx);
3973                });
3974            });
3975        });
3976
3977        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3978            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3979        });
3980        fake_language_server
3981            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
3982                assert_eq!(
3983                    params.text_document_position.text_document.uri.as_str(),
3984                    "file:///dir/one.rs"
3985                );
3986                assert_eq!(
3987                    params.text_document_position.position,
3988                    lsp::Position::new(0, 6)
3989                );
3990                assert_eq!(params.new_name, "THREE");
3991                Ok(Some(lsp::WorkspaceEdit {
3992                    changes: Some(
3993                        [
3994                            (
3995                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3996                                vec![lsp::TextEdit::new(
3997                                    lsp::Range::new(
3998                                        lsp::Position::new(0, 6),
3999                                        lsp::Position::new(0, 9),
4000                                    ),
4001                                    "THREE".to_string(),
4002                                )],
4003                            ),
4004                            (
4005                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4006                                vec![
4007                                    lsp::TextEdit::new(
4008                                        lsp::Range::new(
4009                                            lsp::Position::new(0, 24),
4010                                            lsp::Position::new(0, 27),
4011                                        ),
4012                                        "THREE".to_string(),
4013                                    ),
4014                                    lsp::TextEdit::new(
4015                                        lsp::Range::new(
4016                                            lsp::Position::new(0, 35),
4017                                            lsp::Position::new(0, 38),
4018                                        ),
4019                                        "THREE".to_string(),
4020                                    ),
4021                                ],
4022                            ),
4023                        ]
4024                        .into_iter()
4025                        .collect(),
4026                    ),
4027                    ..Default::default()
4028                }))
4029            })
4030            .next()
4031            .await
4032            .unwrap();
4033        confirm_rename.await.unwrap();
4034
4035        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4036            workspace
4037                .active_item(cx)
4038                .unwrap()
4039                .downcast::<Editor>()
4040                .unwrap()
4041        });
4042        rename_editor.update(cx_b, |editor, cx| {
4043            assert_eq!(
4044                editor.text(cx),
4045                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
4046            );
4047            editor.undo(&Undo, cx);
4048            assert_eq!(
4049                editor.text(cx),
4050                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
4051            );
4052            editor.redo(&Redo, cx);
4053            assert_eq!(
4054                editor.text(cx),
4055                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
4056            );
4057        });
4058
4059        // Ensure temporary rename edits cannot be undone/redone.
4060        editor_b.update(cx_b, |editor, cx| {
4061            editor.undo(&Undo, cx);
4062            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4063            editor.undo(&Undo, cx);
4064            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4065            editor.redo(&Redo, cx);
4066            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4067        })
4068    }
4069
4070    #[gpui::test(iterations = 10)]
4071    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4072        cx_a.foreground().forbid_parking();
4073
4074        // Connect to a server as 2 clients.
4075        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4076        let client_a = server.create_client(cx_a, "user_a").await;
4077        let client_b = server.create_client(cx_b, "user_b").await;
4078
4079        // Create an org that includes these 2 users.
4080        let db = &server.app_state.db;
4081        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4082        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4083            .await
4084            .unwrap();
4085        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4086            .await
4087            .unwrap();
4088
4089        // Create a channel that includes all the users.
4090        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4091        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4092            .await
4093            .unwrap();
4094        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4095            .await
4096            .unwrap();
4097        db.create_channel_message(
4098            channel_id,
4099            client_b.current_user_id(&cx_b),
4100            "hello A, it's B.",
4101            OffsetDateTime::now_utc(),
4102            1,
4103        )
4104        .await
4105        .unwrap();
4106
4107        let channels_a = cx_a
4108            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4109        channels_a
4110            .condition(cx_a, |list, _| list.available_channels().is_some())
4111            .await;
4112        channels_a.read_with(cx_a, |list, _| {
4113            assert_eq!(
4114                list.available_channels().unwrap(),
4115                &[ChannelDetails {
4116                    id: channel_id.to_proto(),
4117                    name: "test-channel".to_string()
4118                }]
4119            )
4120        });
4121        let channel_a = channels_a.update(cx_a, |this, cx| {
4122            this.get_channel(channel_id.to_proto(), cx).unwrap()
4123        });
4124        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4125        channel_a
4126            .condition(&cx_a, |channel, _| {
4127                channel_messages(channel)
4128                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4129            })
4130            .await;
4131
4132        let channels_b = cx_b
4133            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4134        channels_b
4135            .condition(cx_b, |list, _| list.available_channels().is_some())
4136            .await;
4137        channels_b.read_with(cx_b, |list, _| {
4138            assert_eq!(
4139                list.available_channels().unwrap(),
4140                &[ChannelDetails {
4141                    id: channel_id.to_proto(),
4142                    name: "test-channel".to_string()
4143                }]
4144            )
4145        });
4146
4147        let channel_b = channels_b.update(cx_b, |this, cx| {
4148            this.get_channel(channel_id.to_proto(), cx).unwrap()
4149        });
4150        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4151        channel_b
4152            .condition(&cx_b, |channel, _| {
4153                channel_messages(channel)
4154                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4155            })
4156            .await;
4157
4158        channel_a
4159            .update(cx_a, |channel, cx| {
4160                channel
4161                    .send_message("oh, hi B.".to_string(), cx)
4162                    .unwrap()
4163                    .detach();
4164                let task = channel.send_message("sup".to_string(), cx).unwrap();
4165                assert_eq!(
4166                    channel_messages(channel),
4167                    &[
4168                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4169                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4170                        ("user_a".to_string(), "sup".to_string(), true)
4171                    ]
4172                );
4173                task
4174            })
4175            .await
4176            .unwrap();
4177
4178        channel_b
4179            .condition(&cx_b, |channel, _| {
4180                channel_messages(channel)
4181                    == [
4182                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4183                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4184                        ("user_a".to_string(), "sup".to_string(), false),
4185                    ]
4186            })
4187            .await;
4188
4189        assert_eq!(
4190            server
4191                .state()
4192                .await
4193                .channel(channel_id)
4194                .unwrap()
4195                .connection_ids
4196                .len(),
4197            2
4198        );
4199        cx_b.update(|_| drop(channel_b));
4200        server
4201            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4202            .await;
4203
4204        cx_a.update(|_| drop(channel_a));
4205        server
4206            .condition(|state| state.channel(channel_id).is_none())
4207            .await;
4208    }
4209
4210    #[gpui::test(iterations = 10)]
4211    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4212        cx_a.foreground().forbid_parking();
4213
4214        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4215        let client_a = server.create_client(cx_a, "user_a").await;
4216
4217        let db = &server.app_state.db;
4218        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4219        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4220        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4221            .await
4222            .unwrap();
4223        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4224            .await
4225            .unwrap();
4226
4227        let channels_a = cx_a
4228            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4229        channels_a
4230            .condition(cx_a, |list, _| list.available_channels().is_some())
4231            .await;
4232        let channel_a = channels_a.update(cx_a, |this, cx| {
4233            this.get_channel(channel_id.to_proto(), cx).unwrap()
4234        });
4235
4236        // Messages aren't allowed to be too long.
4237        channel_a
4238            .update(cx_a, |channel, cx| {
4239                let long_body = "this is long.\n".repeat(1024);
4240                channel.send_message(long_body, cx).unwrap()
4241            })
4242            .await
4243            .unwrap_err();
4244
4245        // Messages aren't allowed to be blank.
4246        channel_a.update(cx_a, |channel, cx| {
4247            channel.send_message(String::new(), cx).unwrap_err()
4248        });
4249
4250        // Leading and trailing whitespace are trimmed.
4251        channel_a
4252            .update(cx_a, |channel, cx| {
4253                channel
4254                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4255                    .unwrap()
4256            })
4257            .await
4258            .unwrap();
4259        assert_eq!(
4260            db.get_channel_messages(channel_id, 10, None)
4261                .await
4262                .unwrap()
4263                .iter()
4264                .map(|m| &m.body)
4265                .collect::<Vec<_>>(),
4266            &["surrounded by whitespace"]
4267        );
4268    }
4269
4270    #[gpui::test(iterations = 10)]
4271    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4272        cx_a.foreground().forbid_parking();
4273
4274        // Connect to a server as 2 clients.
4275        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4276        let client_a = server.create_client(cx_a, "user_a").await;
4277        let client_b = server.create_client(cx_b, "user_b").await;
4278        let mut status_b = client_b.status();
4279
4280        // Create an org that includes these 2 users.
4281        let db = &server.app_state.db;
4282        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4283        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4284            .await
4285            .unwrap();
4286        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4287            .await
4288            .unwrap();
4289
4290        // Create a channel that includes all the users.
4291        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4292        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4293            .await
4294            .unwrap();
4295        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4296            .await
4297            .unwrap();
4298        db.create_channel_message(
4299            channel_id,
4300            client_b.current_user_id(&cx_b),
4301            "hello A, it's B.",
4302            OffsetDateTime::now_utc(),
4303            2,
4304        )
4305        .await
4306        .unwrap();
4307
4308        let channels_a = cx_a
4309            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4310        channels_a
4311            .condition(cx_a, |list, _| list.available_channels().is_some())
4312            .await;
4313
4314        channels_a.read_with(cx_a, |list, _| {
4315            assert_eq!(
4316                list.available_channels().unwrap(),
4317                &[ChannelDetails {
4318                    id: channel_id.to_proto(),
4319                    name: "test-channel".to_string()
4320                }]
4321            )
4322        });
4323        let channel_a = channels_a.update(cx_a, |this, cx| {
4324            this.get_channel(channel_id.to_proto(), cx).unwrap()
4325        });
4326        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4327        channel_a
4328            .condition(&cx_a, |channel, _| {
4329                channel_messages(channel)
4330                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4331            })
4332            .await;
4333
4334        let channels_b = cx_b
4335            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4336        channels_b
4337            .condition(cx_b, |list, _| list.available_channels().is_some())
4338            .await;
4339        channels_b.read_with(cx_b, |list, _| {
4340            assert_eq!(
4341                list.available_channels().unwrap(),
4342                &[ChannelDetails {
4343                    id: channel_id.to_proto(),
4344                    name: "test-channel".to_string()
4345                }]
4346            )
4347        });
4348
4349        let channel_b = channels_b.update(cx_b, |this, cx| {
4350            this.get_channel(channel_id.to_proto(), cx).unwrap()
4351        });
4352        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4353        channel_b
4354            .condition(&cx_b, |channel, _| {
4355                channel_messages(channel)
4356                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4357            })
4358            .await;
4359
4360        // Disconnect client B, ensuring we can still access its cached channel data.
4361        server.forbid_connections();
4362        server.disconnect_client(client_b.current_user_id(&cx_b));
4363        cx_b.foreground().advance_clock(Duration::from_secs(3));
4364        while !matches!(
4365            status_b.next().await,
4366            Some(client::Status::ReconnectionError { .. })
4367        ) {}
4368
4369        channels_b.read_with(cx_b, |channels, _| {
4370            assert_eq!(
4371                channels.available_channels().unwrap(),
4372                [ChannelDetails {
4373                    id: channel_id.to_proto(),
4374                    name: "test-channel".to_string()
4375                }]
4376            )
4377        });
4378        channel_b.read_with(cx_b, |channel, _| {
4379            assert_eq!(
4380                channel_messages(channel),
4381                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4382            )
4383        });
4384
4385        // Send a message from client B while it is disconnected.
4386        channel_b
4387            .update(cx_b, |channel, cx| {
4388                let task = channel
4389                    .send_message("can you see this?".to_string(), cx)
4390                    .unwrap();
4391                assert_eq!(
4392                    channel_messages(channel),
4393                    &[
4394                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4395                        ("user_b".to_string(), "can you see this?".to_string(), true)
4396                    ]
4397                );
4398                task
4399            })
4400            .await
4401            .unwrap_err();
4402
4403        // Send a message from client A while B is disconnected.
4404        channel_a
4405            .update(cx_a, |channel, cx| {
4406                channel
4407                    .send_message("oh, hi B.".to_string(), cx)
4408                    .unwrap()
4409                    .detach();
4410                let task = channel.send_message("sup".to_string(), cx).unwrap();
4411                assert_eq!(
4412                    channel_messages(channel),
4413                    &[
4414                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4415                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4416                        ("user_a".to_string(), "sup".to_string(), true)
4417                    ]
4418                );
4419                task
4420            })
4421            .await
4422            .unwrap();
4423
4424        // Give client B a chance to reconnect.
4425        server.allow_connections();
4426        cx_b.foreground().advance_clock(Duration::from_secs(10));
4427
4428        // Verify that B sees the new messages upon reconnection, as well as the message client B
4429        // sent while offline.
4430        channel_b
4431            .condition(&cx_b, |channel, _| {
4432                channel_messages(channel)
4433                    == [
4434                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4435                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4436                        ("user_a".to_string(), "sup".to_string(), false),
4437                        ("user_b".to_string(), "can you see this?".to_string(), false),
4438                    ]
4439            })
4440            .await;
4441
4442        // Ensure client A and B can communicate normally after reconnection.
4443        channel_a
4444            .update(cx_a, |channel, cx| {
4445                channel.send_message("you online?".to_string(), cx).unwrap()
4446            })
4447            .await
4448            .unwrap();
4449        channel_b
4450            .condition(&cx_b, |channel, _| {
4451                channel_messages(channel)
4452                    == [
4453                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4454                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4455                        ("user_a".to_string(), "sup".to_string(), false),
4456                        ("user_b".to_string(), "can you see this?".to_string(), false),
4457                        ("user_a".to_string(), "you online?".to_string(), false),
4458                    ]
4459            })
4460            .await;
4461
4462        channel_b
4463            .update(cx_b, |channel, cx| {
4464                channel.send_message("yep".to_string(), cx).unwrap()
4465            })
4466            .await
4467            .unwrap();
4468        channel_a
4469            .condition(&cx_a, |channel, _| {
4470                channel_messages(channel)
4471                    == [
4472                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4473                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4474                        ("user_a".to_string(), "sup".to_string(), false),
4475                        ("user_b".to_string(), "can you see this?".to_string(), false),
4476                        ("user_a".to_string(), "you online?".to_string(), false),
4477                        ("user_b".to_string(), "yep".to_string(), false),
4478                    ]
4479            })
4480            .await;
4481    }
4482
4483    #[gpui::test(iterations = 10)]
4484    async fn test_contacts(
4485        cx_a: &mut TestAppContext,
4486        cx_b: &mut TestAppContext,
4487        cx_c: &mut TestAppContext,
4488    ) {
4489        cx_a.foreground().forbid_parking();
4490        let lang_registry = Arc::new(LanguageRegistry::test());
4491        let fs = FakeFs::new(cx_a.background());
4492
4493        // Connect to a server as 3 clients.
4494        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4495        let client_a = server.create_client(cx_a, "user_a").await;
4496        let client_b = server.create_client(cx_b, "user_b").await;
4497        let client_c = server.create_client(cx_c, "user_c").await;
4498
4499        // Share a worktree as client A.
4500        fs.insert_tree(
4501            "/a",
4502            json!({
4503                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4504            }),
4505        )
4506        .await;
4507
4508        let project_a = cx_a.update(|cx| {
4509            Project::local(
4510                client_a.clone(),
4511                client_a.user_store.clone(),
4512                lang_registry.clone(),
4513                fs.clone(),
4514                cx,
4515            )
4516        });
4517        let (worktree_a, _) = project_a
4518            .update(cx_a, |p, cx| {
4519                p.find_or_create_local_worktree("/a", true, cx)
4520            })
4521            .await
4522            .unwrap();
4523        worktree_a
4524            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4525            .await;
4526
4527        client_a
4528            .user_store
4529            .condition(&cx_a, |user_store, _| {
4530                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4531            })
4532            .await;
4533        client_b
4534            .user_store
4535            .condition(&cx_b, |user_store, _| {
4536                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4537            })
4538            .await;
4539        client_c
4540            .user_store
4541            .condition(&cx_c, |user_store, _| {
4542                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4543            })
4544            .await;
4545
4546        let project_id = project_a
4547            .update(cx_a, |project, _| project.next_remote_id())
4548            .await;
4549        project_a
4550            .update(cx_a, |project, cx| project.share(cx))
4551            .await
4552            .unwrap();
4553        client_a
4554            .user_store
4555            .condition(&cx_a, |user_store, _| {
4556                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4557            })
4558            .await;
4559        client_b
4560            .user_store
4561            .condition(&cx_b, |user_store, _| {
4562                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4563            })
4564            .await;
4565        client_c
4566            .user_store
4567            .condition(&cx_c, |user_store, _| {
4568                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4569            })
4570            .await;
4571
4572        let _project_b = Project::remote(
4573            project_id,
4574            client_b.clone(),
4575            client_b.user_store.clone(),
4576            lang_registry.clone(),
4577            fs.clone(),
4578            &mut cx_b.to_async(),
4579        )
4580        .await
4581        .unwrap();
4582
4583        client_a
4584            .user_store
4585            .condition(&cx_a, |user_store, _| {
4586                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4587            })
4588            .await;
4589        client_b
4590            .user_store
4591            .condition(&cx_b, |user_store, _| {
4592                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4593            })
4594            .await;
4595        client_c
4596            .user_store
4597            .condition(&cx_c, |user_store, _| {
4598                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4599            })
4600            .await;
4601
4602        project_a
4603            .condition(&cx_a, |project, _| {
4604                project.collaborators().contains_key(&client_b.peer_id)
4605            })
4606            .await;
4607
4608        cx_a.update(move |_| drop(project_a));
4609        client_a
4610            .user_store
4611            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4612            .await;
4613        client_b
4614            .user_store
4615            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4616            .await;
4617        client_c
4618            .user_store
4619            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4620            .await;
4621
4622        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
4623            user_store
4624                .contacts()
4625                .iter()
4626                .map(|contact| {
4627                    let worktrees = contact
4628                        .projects
4629                        .iter()
4630                        .map(|p| {
4631                            (
4632                                p.worktree_root_names[0].as_str(),
4633                                p.is_shared,
4634                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4635                            )
4636                        })
4637                        .collect();
4638                    (contact.user.github_login.as_str(), worktrees)
4639                })
4640                .collect()
4641        }
4642    }
4643
4644    #[gpui::test(iterations = 10)]
4645    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4646        cx_a.foreground().forbid_parking();
4647        let fs = FakeFs::new(cx_a.background());
4648
4649        // 2 clients connect to a server.
4650        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4651        let mut client_a = server.create_client(cx_a, "user_a").await;
4652        let mut client_b = server.create_client(cx_b, "user_b").await;
4653        cx_a.update(editor::init);
4654        cx_b.update(editor::init);
4655
4656        // Client A shares a project.
4657        fs.insert_tree(
4658            "/a",
4659            json!({
4660                ".zed.toml": r#"collaborators = ["user_b"]"#,
4661                "1.txt": "one",
4662                "2.txt": "two",
4663                "3.txt": "three",
4664            }),
4665        )
4666        .await;
4667        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4668        project_a
4669            .update(cx_a, |project, cx| project.share(cx))
4670            .await
4671            .unwrap();
4672
4673        // Client B joins the project.
4674        let project_b = client_b
4675            .build_remote_project(
4676                project_a
4677                    .read_with(cx_a, |project, _| project.remote_id())
4678                    .unwrap(),
4679                cx_b,
4680            )
4681            .await;
4682
4683        // Client A opens some editors.
4684        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4685        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4686        let editor_a1 = workspace_a
4687            .update(cx_a, |workspace, cx| {
4688                workspace.open_path((worktree_id, "1.txt"), cx)
4689            })
4690            .await
4691            .unwrap()
4692            .downcast::<Editor>()
4693            .unwrap();
4694        let editor_a2 = workspace_a
4695            .update(cx_a, |workspace, cx| {
4696                workspace.open_path((worktree_id, "2.txt"), cx)
4697            })
4698            .await
4699            .unwrap()
4700            .downcast::<Editor>()
4701            .unwrap();
4702
4703        // Client B opens an editor.
4704        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4705        let editor_b1 = workspace_b
4706            .update(cx_b, |workspace, cx| {
4707                workspace.open_path((worktree_id, "1.txt"), cx)
4708            })
4709            .await
4710            .unwrap()
4711            .downcast::<Editor>()
4712            .unwrap();
4713
4714        let client_a_id = project_b.read_with(cx_b, |project, _| {
4715            project.collaborators().values().next().unwrap().peer_id
4716        });
4717        let client_b_id = project_a.read_with(cx_a, |project, _| {
4718            project.collaborators().values().next().unwrap().peer_id
4719        });
4720
4721        // When client B starts following client A, all visible view states are replicated to client B.
4722        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4723        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4724        workspace_b
4725            .update(cx_b, |workspace, cx| {
4726                workspace
4727                    .toggle_follow(&ToggleFollow(client_a_id), cx)
4728                    .unwrap()
4729            })
4730            .await
4731            .unwrap();
4732        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4733            workspace
4734                .active_item(cx)
4735                .unwrap()
4736                .downcast::<Editor>()
4737                .unwrap()
4738        });
4739        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4740        assert_eq!(
4741            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4742            Some((worktree_id, "2.txt").into())
4743        );
4744        assert_eq!(
4745            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4746            vec![2..3]
4747        );
4748        assert_eq!(
4749            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4750            vec![0..1]
4751        );
4752
4753        // When client A activates a different editor, client B does so as well.
4754        workspace_a.update(cx_a, |workspace, cx| {
4755            workspace.activate_item(&editor_a1, cx)
4756        });
4757        workspace_b
4758            .condition(cx_b, |workspace, cx| {
4759                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4760            })
4761            .await;
4762
4763        // When client A navigates back and forth, client B does so as well.
4764        workspace_a
4765            .update(cx_a, |workspace, cx| {
4766                workspace::Pane::go_back(workspace, None, cx)
4767            })
4768            .await;
4769        workspace_b
4770            .condition(cx_b, |workspace, cx| {
4771                workspace.active_item(cx).unwrap().id() == editor_b2.id()
4772            })
4773            .await;
4774
4775        workspace_a
4776            .update(cx_a, |workspace, cx| {
4777                workspace::Pane::go_forward(workspace, None, cx)
4778            })
4779            .await;
4780        workspace_b
4781            .condition(cx_b, |workspace, cx| {
4782                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4783            })
4784            .await;
4785
4786        // Changes to client A's editor are reflected on client B.
4787        editor_a1.update(cx_a, |editor, cx| {
4788            editor.select_ranges([1..1, 2..2], None, cx);
4789        });
4790        editor_b1
4791            .condition(cx_b, |editor, cx| {
4792                editor.selected_ranges(cx) == vec![1..1, 2..2]
4793            })
4794            .await;
4795
4796        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
4797        editor_b1
4798            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
4799            .await;
4800
4801        editor_a1.update(cx_a, |editor, cx| {
4802            editor.select_ranges([3..3], None, cx);
4803            editor.set_scroll_position(vec2f(0., 100.), cx);
4804        });
4805        editor_b1
4806            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
4807            .await;
4808
4809        // After unfollowing, client B stops receiving updates from client A.
4810        workspace_b.update(cx_b, |workspace, cx| {
4811            workspace.unfollow(&workspace.active_pane().clone(), cx)
4812        });
4813        workspace_a.update(cx_a, |workspace, cx| {
4814            workspace.activate_item(&editor_a2, cx)
4815        });
4816        cx_a.foreground().run_until_parked();
4817        assert_eq!(
4818            workspace_b.read_with(cx_b, |workspace, cx| workspace
4819                .active_item(cx)
4820                .unwrap()
4821                .id()),
4822            editor_b1.id()
4823        );
4824
4825        // Client A starts following client B.
4826        workspace_a
4827            .update(cx_a, |workspace, cx| {
4828                workspace
4829                    .toggle_follow(&ToggleFollow(client_b_id), cx)
4830                    .unwrap()
4831            })
4832            .await
4833            .unwrap();
4834        assert_eq!(
4835            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4836            Some(client_b_id)
4837        );
4838        assert_eq!(
4839            workspace_a.read_with(cx_a, |workspace, cx| workspace
4840                .active_item(cx)
4841                .unwrap()
4842                .id()),
4843            editor_a1.id()
4844        );
4845
4846        // Following interrupts when client B disconnects.
4847        client_b.disconnect(&cx_b.to_async()).unwrap();
4848        cx_a.foreground().run_until_parked();
4849        assert_eq!(
4850            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4851            None
4852        );
4853    }
4854
4855    #[gpui::test(iterations = 10)]
4856    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4857        cx_a.foreground().forbid_parking();
4858        let fs = FakeFs::new(cx_a.background());
4859
4860        // 2 clients connect to a server.
4861        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4862        let mut client_a = server.create_client(cx_a, "user_a").await;
4863        let mut client_b = server.create_client(cx_b, "user_b").await;
4864        cx_a.update(editor::init);
4865        cx_b.update(editor::init);
4866
4867        // Client A shares a project.
4868        fs.insert_tree(
4869            "/a",
4870            json!({
4871                ".zed.toml": r#"collaborators = ["user_b"]"#,
4872                "1.txt": "one",
4873                "2.txt": "two",
4874                "3.txt": "three",
4875                "4.txt": "four",
4876            }),
4877        )
4878        .await;
4879        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4880        project_a
4881            .update(cx_a, |project, cx| project.share(cx))
4882            .await
4883            .unwrap();
4884
4885        // Client B joins the project.
4886        let project_b = client_b
4887            .build_remote_project(
4888                project_a
4889                    .read_with(cx_a, |project, _| project.remote_id())
4890                    .unwrap(),
4891                cx_b,
4892            )
4893            .await;
4894
4895        // Client A opens some editors.
4896        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4897        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4898        let _editor_a1 = workspace_a
4899            .update(cx_a, |workspace, cx| {
4900                workspace.open_path((worktree_id, "1.txt"), cx)
4901            })
4902            .await
4903            .unwrap()
4904            .downcast::<Editor>()
4905            .unwrap();
4906
4907        // Client B opens an editor.
4908        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4909        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
4910        let _editor_b1 = workspace_b
4911            .update(cx_b, |workspace, cx| {
4912                workspace.open_path((worktree_id, "2.txt"), cx)
4913            })
4914            .await
4915            .unwrap()
4916            .downcast::<Editor>()
4917            .unwrap();
4918
4919        // Clients A and B follow each other in split panes
4920        workspace_a
4921            .update(cx_a, |workspace, cx| {
4922                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4923                assert_ne!(*workspace.active_pane(), pane_a1);
4924                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
4925                workspace
4926                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4927                    .unwrap()
4928            })
4929            .await
4930            .unwrap();
4931        workspace_b
4932            .update(cx_b, |workspace, cx| {
4933                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4934                assert_ne!(*workspace.active_pane(), pane_b1);
4935                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
4936                workspace
4937                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4938                    .unwrap()
4939            })
4940            .await
4941            .unwrap();
4942
4943        workspace_a
4944            .update(cx_a, |workspace, cx| {
4945                workspace.activate_next_pane(cx);
4946                assert_eq!(*workspace.active_pane(), pane_a1);
4947                workspace.open_path((worktree_id, "3.txt"), cx)
4948            })
4949            .await
4950            .unwrap();
4951        workspace_b
4952            .update(cx_b, |workspace, cx| {
4953                workspace.activate_next_pane(cx);
4954                assert_eq!(*workspace.active_pane(), pane_b1);
4955                workspace.open_path((worktree_id, "4.txt"), cx)
4956            })
4957            .await
4958            .unwrap();
4959        cx_a.foreground().run_until_parked();
4960
4961        // Ensure leader updates don't change the active pane of followers
4962        workspace_a.read_with(cx_a, |workspace, _| {
4963            assert_eq!(*workspace.active_pane(), pane_a1);
4964        });
4965        workspace_b.read_with(cx_b, |workspace, _| {
4966            assert_eq!(*workspace.active_pane(), pane_b1);
4967        });
4968
4969        // Ensure peers following each other doesn't cause an infinite loop.
4970        assert_eq!(
4971            workspace_a.read_with(cx_a, |workspace, cx| workspace
4972                .active_item(cx)
4973                .unwrap()
4974                .project_path(cx)),
4975            Some((worktree_id, "3.txt").into())
4976        );
4977        workspace_a.update(cx_a, |workspace, cx| {
4978            assert_eq!(
4979                workspace.active_item(cx).unwrap().project_path(cx),
4980                Some((worktree_id, "3.txt").into())
4981            );
4982            workspace.activate_next_pane(cx);
4983            assert_eq!(
4984                workspace.active_item(cx).unwrap().project_path(cx),
4985                Some((worktree_id, "4.txt").into())
4986            );
4987        });
4988        workspace_b.update(cx_b, |workspace, cx| {
4989            assert_eq!(
4990                workspace.active_item(cx).unwrap().project_path(cx),
4991                Some((worktree_id, "4.txt").into())
4992            );
4993            workspace.activate_next_pane(cx);
4994            assert_eq!(
4995                workspace.active_item(cx).unwrap().project_path(cx),
4996                Some((worktree_id, "3.txt").into())
4997            );
4998        });
4999    }
5000
5001    #[gpui::test(iterations = 10)]
5002    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5003        cx_a.foreground().forbid_parking();
5004        let fs = FakeFs::new(cx_a.background());
5005
5006        // 2 clients connect to a server.
5007        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5008        let mut client_a = server.create_client(cx_a, "user_a").await;
5009        let mut client_b = server.create_client(cx_b, "user_b").await;
5010        cx_a.update(editor::init);
5011        cx_b.update(editor::init);
5012
5013        // Client A shares a project.
5014        fs.insert_tree(
5015            "/a",
5016            json!({
5017                ".zed.toml": r#"collaborators = ["user_b"]"#,
5018                "1.txt": "one",
5019                "2.txt": "two",
5020                "3.txt": "three",
5021            }),
5022        )
5023        .await;
5024        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5025        project_a
5026            .update(cx_a, |project, cx| project.share(cx))
5027            .await
5028            .unwrap();
5029
5030        // Client B joins the project.
5031        let project_b = client_b
5032            .build_remote_project(
5033                project_a
5034                    .read_with(cx_a, |project, _| project.remote_id())
5035                    .unwrap(),
5036                cx_b,
5037            )
5038            .await;
5039
5040        // Client A opens some editors.
5041        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5042        let _editor_a1 = workspace_a
5043            .update(cx_a, |workspace, cx| {
5044                workspace.open_path((worktree_id, "1.txt"), cx)
5045            })
5046            .await
5047            .unwrap()
5048            .downcast::<Editor>()
5049            .unwrap();
5050
5051        // Client B starts following client A.
5052        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5053        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5054        let leader_id = project_b.read_with(cx_b, |project, _| {
5055            project.collaborators().values().next().unwrap().peer_id
5056        });
5057        workspace_b
5058            .update(cx_b, |workspace, cx| {
5059                workspace
5060                    .toggle_follow(&ToggleFollow(leader_id), cx)
5061                    .unwrap()
5062            })
5063            .await
5064            .unwrap();
5065        assert_eq!(
5066            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5067            Some(leader_id)
5068        );
5069        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5070            workspace
5071                .active_item(cx)
5072                .unwrap()
5073                .downcast::<Editor>()
5074                .unwrap()
5075        });
5076
5077        // When client B moves, it automatically stops following client A.
5078        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5079        assert_eq!(
5080            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5081            None
5082        );
5083
5084        workspace_b
5085            .update(cx_b, |workspace, cx| {
5086                workspace
5087                    .toggle_follow(&ToggleFollow(leader_id), cx)
5088                    .unwrap()
5089            })
5090            .await
5091            .unwrap();
5092        assert_eq!(
5093            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5094            Some(leader_id)
5095        );
5096
5097        // When client B edits, it automatically stops following client A.
5098        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5099        assert_eq!(
5100            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5101            None
5102        );
5103
5104        workspace_b
5105            .update(cx_b, |workspace, cx| {
5106                workspace
5107                    .toggle_follow(&ToggleFollow(leader_id), cx)
5108                    .unwrap()
5109            })
5110            .await
5111            .unwrap();
5112        assert_eq!(
5113            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5114            Some(leader_id)
5115        );
5116
5117        // When client B scrolls, it automatically stops following client A.
5118        editor_b2.update(cx_b, |editor, cx| {
5119            editor.set_scroll_position(vec2f(0., 3.), cx)
5120        });
5121        assert_eq!(
5122            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5123            None
5124        );
5125
5126        workspace_b
5127            .update(cx_b, |workspace, cx| {
5128                workspace
5129                    .toggle_follow(&ToggleFollow(leader_id), cx)
5130                    .unwrap()
5131            })
5132            .await
5133            .unwrap();
5134        assert_eq!(
5135            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5136            Some(leader_id)
5137        );
5138
5139        // When client B activates a different pane, it continues following client A in the original pane.
5140        workspace_b.update(cx_b, |workspace, cx| {
5141            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5142        });
5143        assert_eq!(
5144            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5145            Some(leader_id)
5146        );
5147
5148        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5149        assert_eq!(
5150            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5151            Some(leader_id)
5152        );
5153
5154        // When client B activates a different item in the original pane, it automatically stops following client A.
5155        workspace_b
5156            .update(cx_b, |workspace, cx| {
5157                workspace.open_path((worktree_id, "2.txt"), cx)
5158            })
5159            .await
5160            .unwrap();
5161        assert_eq!(
5162            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5163            None
5164        );
5165    }
5166
5167    #[gpui::test(iterations = 100)]
5168    async fn test_random_collaboration(
5169        cx: &mut TestAppContext,
5170        deterministic: Arc<Deterministic>,
5171        rng: StdRng,
5172    ) {
5173        cx.foreground().forbid_parking();
5174        let max_peers = env::var("MAX_PEERS")
5175            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5176            .unwrap_or(5);
5177        assert!(max_peers <= 5);
5178
5179        let max_operations = env::var("OPERATIONS")
5180            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5181            .unwrap_or(10);
5182
5183        let rng = Arc::new(Mutex::new(rng));
5184
5185        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5186        let host_language_registry = Arc::new(LanguageRegistry::test());
5187
5188        let fs = FakeFs::new(cx.background());
5189        fs.insert_tree(
5190            "/_collab",
5191            json!({
5192                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5193            }),
5194        )
5195        .await;
5196
5197        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5198        let mut clients = Vec::new();
5199        let mut user_ids = Vec::new();
5200        let mut op_start_signals = Vec::new();
5201        let files = Arc::new(Mutex::new(Vec::new()));
5202
5203        let mut next_entity_id = 100000;
5204        let mut host_cx = TestAppContext::new(
5205            cx.foreground_platform(),
5206            cx.platform(),
5207            deterministic.build_foreground(next_entity_id),
5208            deterministic.build_background(),
5209            cx.font_cache(),
5210            cx.leak_detector(),
5211            next_entity_id,
5212        );
5213        let host = server.create_client(&mut host_cx, "host").await;
5214        let host_project = host_cx.update(|cx| {
5215            Project::local(
5216                host.client.clone(),
5217                host.user_store.clone(),
5218                host_language_registry.clone(),
5219                fs.clone(),
5220                cx,
5221            )
5222        });
5223        let host_project_id = host_project
5224            .update(&mut host_cx, |p, _| p.next_remote_id())
5225            .await;
5226
5227        let (collab_worktree, _) = host_project
5228            .update(&mut host_cx, |project, cx| {
5229                project.find_or_create_local_worktree("/_collab", true, cx)
5230            })
5231            .await
5232            .unwrap();
5233        collab_worktree
5234            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5235            .await;
5236        host_project
5237            .update(&mut host_cx, |project, cx| project.share(cx))
5238            .await
5239            .unwrap();
5240
5241        // Set up fake language servers.
5242        let mut language = Language::new(
5243            LanguageConfig {
5244                name: "Rust".into(),
5245                path_suffixes: vec!["rs".to_string()],
5246                ..Default::default()
5247            },
5248            None,
5249        );
5250        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5251            name: "the-fake-language-server",
5252            capabilities: lsp::LanguageServer::full_capabilities(),
5253            initializer: Some(Box::new({
5254                let rng = rng.clone();
5255                let files = files.clone();
5256                let project = host_project.downgrade();
5257                move |fake_server: &mut FakeLanguageServer| {
5258                    fake_server.handle_request::<lsp::request::Completion, _, _>(
5259                        |_, _| async move {
5260                            Ok(Some(lsp::CompletionResponse::Array(vec![
5261                                lsp::CompletionItem {
5262                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5263                                        range: lsp::Range::new(
5264                                            lsp::Position::new(0, 0),
5265                                            lsp::Position::new(0, 0),
5266                                        ),
5267                                        new_text: "the-new-text".to_string(),
5268                                    })),
5269                                    ..Default::default()
5270                                },
5271                            ])))
5272                        },
5273                    );
5274
5275                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5276                        |_, _| async move {
5277                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5278                                lsp::CodeAction {
5279                                    title: "the-code-action".to_string(),
5280                                    ..Default::default()
5281                                },
5282                            )]))
5283                        },
5284                    );
5285
5286                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5287                        |params, _| async move {
5288                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5289                                params.position,
5290                                params.position,
5291                            ))))
5292                        },
5293                    );
5294
5295                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5296                        let files = files.clone();
5297                        let rng = rng.clone();
5298                        move |_, _| {
5299                            let files = files.clone();
5300                            let rng = rng.clone();
5301                            async move {
5302                                let files = files.lock();
5303                                let mut rng = rng.lock();
5304                                let count = rng.gen_range::<usize, _>(1..3);
5305                                let files = (0..count)
5306                                    .map(|_| files.choose(&mut *rng).unwrap())
5307                                    .collect::<Vec<_>>();
5308                                log::info!("LSP: Returning definitions in files {:?}", &files);
5309                                Ok(Some(lsp::GotoDefinitionResponse::Array(
5310                                    files
5311                                        .into_iter()
5312                                        .map(|file| lsp::Location {
5313                                            uri: lsp::Url::from_file_path(file).unwrap(),
5314                                            range: Default::default(),
5315                                        })
5316                                        .collect(),
5317                                )))
5318                            }
5319                        }
5320                    });
5321
5322                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5323                        let rng = rng.clone();
5324                        let project = project.clone();
5325                        move |params, mut cx| {
5326                            let highlights = if let Some(project) = project.upgrade(&cx) {
5327                                project.update(&mut cx, |project, cx| {
5328                                    let path = params
5329                                        .text_document_position_params
5330                                        .text_document
5331                                        .uri
5332                                        .to_file_path()
5333                                        .unwrap();
5334                                    let (worktree, relative_path) =
5335                                        project.find_local_worktree(&path, cx)?;
5336                                    let project_path =
5337                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
5338                                    let buffer =
5339                                        project.get_open_buffer(&project_path, cx)?.read(cx);
5340
5341                                    let mut highlights = Vec::new();
5342                                    let highlight_count = rng.lock().gen_range(1..=5);
5343                                    let mut prev_end = 0;
5344                                    for _ in 0..highlight_count {
5345                                        let range =
5346                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
5347
5348                                        highlights.push(lsp::DocumentHighlight {
5349                                            range: range_to_lsp(range.to_point_utf16(buffer)),
5350                                            kind: Some(lsp::DocumentHighlightKind::READ),
5351                                        });
5352                                        prev_end = range.end;
5353                                    }
5354                                    Some(highlights)
5355                                })
5356                            } else {
5357                                None
5358                            };
5359                            async move { Ok(highlights) }
5360                        }
5361                    });
5362                }
5363            })),
5364            ..Default::default()
5365        });
5366        host_language_registry.add(Arc::new(language));
5367
5368        let op_start_signal = futures::channel::mpsc::unbounded();
5369        user_ids.push(host.current_user_id(&host_cx));
5370        op_start_signals.push(op_start_signal.0);
5371        clients.push(host_cx.foreground().spawn(host.simulate_host(
5372            host_project,
5373            files,
5374            op_start_signal.1,
5375            rng.clone(),
5376            host_cx,
5377        )));
5378
5379        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5380            rng.lock().gen_range(0..max_operations)
5381        } else {
5382            max_operations
5383        };
5384        let mut available_guests = vec![
5385            "guest-1".to_string(),
5386            "guest-2".to_string(),
5387            "guest-3".to_string(),
5388            "guest-4".to_string(),
5389        ];
5390        let mut operations = 0;
5391        while operations < max_operations {
5392            if operations == disconnect_host_at {
5393                server.disconnect_client(user_ids[0]);
5394                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5395                drop(op_start_signals);
5396                let mut clients = futures::future::join_all(clients).await;
5397                cx.foreground().run_until_parked();
5398
5399                let (host, mut host_cx, host_err) = clients.remove(0);
5400                if let Some(host_err) = host_err {
5401                    log::error!("host error - {}", host_err);
5402                }
5403                host.project
5404                    .as_ref()
5405                    .unwrap()
5406                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5407                for (guest, mut guest_cx, guest_err) in clients {
5408                    if let Some(guest_err) = guest_err {
5409                        log::error!("{} error - {}", guest.username, guest_err);
5410                    }
5411                    let contacts = server
5412                        .store
5413                        .read()
5414                        .await
5415                        .contacts_for_user(guest.current_user_id(&guest_cx));
5416                    assert!(!contacts
5417                        .iter()
5418                        .flat_map(|contact| &contact.projects)
5419                        .any(|project| project.id == host_project_id));
5420                    guest
5421                        .project
5422                        .as_ref()
5423                        .unwrap()
5424                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5425                    guest_cx.update(|_| drop(guest));
5426                }
5427                host_cx.update(|_| drop(host));
5428
5429                return;
5430            }
5431
5432            let distribution = rng.lock().gen_range(0..100);
5433            match distribution {
5434                0..=19 if !available_guests.is_empty() => {
5435                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
5436                    let guest_username = available_guests.remove(guest_ix);
5437                    log::info!("Adding new connection for {}", guest_username);
5438                    next_entity_id += 100000;
5439                    let mut guest_cx = TestAppContext::new(
5440                        cx.foreground_platform(),
5441                        cx.platform(),
5442                        deterministic.build_foreground(next_entity_id),
5443                        deterministic.build_background(),
5444                        cx.font_cache(),
5445                        cx.leak_detector(),
5446                        next_entity_id,
5447                    );
5448                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
5449                    let guest_project = Project::remote(
5450                        host_project_id,
5451                        guest.client.clone(),
5452                        guest.user_store.clone(),
5453                        guest_lang_registry.clone(),
5454                        FakeFs::new(cx.background()),
5455                        &mut guest_cx.to_async(),
5456                    )
5457                    .await
5458                    .unwrap();
5459                    let op_start_signal = futures::channel::mpsc::unbounded();
5460                    user_ids.push(guest.current_user_id(&guest_cx));
5461                    op_start_signals.push(op_start_signal.0);
5462                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5463                        guest_username.clone(),
5464                        guest_project,
5465                        op_start_signal.1,
5466                        rng.clone(),
5467                        guest_cx,
5468                    )));
5469
5470                    log::info!("Added connection for {}", guest_username);
5471                    operations += 1;
5472                }
5473                20..=29 if clients.len() > 1 => {
5474                    log::info!("Removing guest");
5475                    let guest_ix = rng.lock().gen_range(1..clients.len());
5476                    let removed_guest_id = user_ids.remove(guest_ix);
5477                    let guest = clients.remove(guest_ix);
5478                    op_start_signals.remove(guest_ix);
5479                    server.disconnect_client(removed_guest_id);
5480                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5481                    let (guest, mut guest_cx, guest_err) = guest.await;
5482                    if let Some(guest_err) = guest_err {
5483                        log::error!("{} error - {}", guest.username, guest_err);
5484                    }
5485                    guest
5486                        .project
5487                        .as_ref()
5488                        .unwrap()
5489                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5490                    for user_id in &user_ids {
5491                        for contact in server.store.read().await.contacts_for_user(*user_id) {
5492                            assert_ne!(
5493                                contact.user_id, removed_guest_id.0 as u64,
5494                                "removed guest is still a contact of another peer"
5495                            );
5496                            for project in contact.projects {
5497                                for project_guest_id in project.guests {
5498                                    assert_ne!(
5499                                        project_guest_id, removed_guest_id.0 as u64,
5500                                        "removed guest appears as still participating on a project"
5501                                    );
5502                                }
5503                            }
5504                        }
5505                    }
5506
5507                    log::info!("{} removed", guest.username);
5508                    available_guests.push(guest.username.clone());
5509                    guest_cx.update(|_| drop(guest));
5510
5511                    operations += 1;
5512                }
5513                _ => {
5514                    while operations < max_operations && rng.lock().gen_bool(0.7) {
5515                        op_start_signals
5516                            .choose(&mut *rng.lock())
5517                            .unwrap()
5518                            .unbounded_send(())
5519                            .unwrap();
5520                        operations += 1;
5521                    }
5522
5523                    if rng.lock().gen_bool(0.8) {
5524                        cx.foreground().run_until_parked();
5525                    }
5526                }
5527            }
5528        }
5529
5530        drop(op_start_signals);
5531        let mut clients = futures::future::join_all(clients).await;
5532        cx.foreground().run_until_parked();
5533
5534        let (host_client, mut host_cx, host_err) = clients.remove(0);
5535        if let Some(host_err) = host_err {
5536            panic!("host error - {}", host_err);
5537        }
5538        let host_project = host_client.project.as_ref().unwrap();
5539        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5540            project
5541                .worktrees(cx)
5542                .map(|worktree| {
5543                    let snapshot = worktree.read(cx).snapshot();
5544                    (snapshot.id(), snapshot)
5545                })
5546                .collect::<BTreeMap<_, _>>()
5547        });
5548
5549        host_client
5550            .project
5551            .as_ref()
5552            .unwrap()
5553            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5554
5555        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5556            if let Some(guest_err) = guest_err {
5557                panic!("{} error - {}", guest_client.username, guest_err);
5558            }
5559            let worktree_snapshots =
5560                guest_client
5561                    .project
5562                    .as_ref()
5563                    .unwrap()
5564                    .read_with(&guest_cx, |project, cx| {
5565                        project
5566                            .worktrees(cx)
5567                            .map(|worktree| {
5568                                let worktree = worktree.read(cx);
5569                                (worktree.id(), worktree.snapshot())
5570                            })
5571                            .collect::<BTreeMap<_, _>>()
5572                    });
5573
5574            assert_eq!(
5575                worktree_snapshots.keys().collect::<Vec<_>>(),
5576                host_worktree_snapshots.keys().collect::<Vec<_>>(),
5577                "{} has different worktrees than the host",
5578                guest_client.username
5579            );
5580            for (id, host_snapshot) in &host_worktree_snapshots {
5581                let guest_snapshot = &worktree_snapshots[id];
5582                assert_eq!(
5583                    guest_snapshot.root_name(),
5584                    host_snapshot.root_name(),
5585                    "{} has different root name than the host for worktree {}",
5586                    guest_client.username,
5587                    id
5588                );
5589                assert_eq!(
5590                    guest_snapshot.entries(false).collect::<Vec<_>>(),
5591                    host_snapshot.entries(false).collect::<Vec<_>>(),
5592                    "{} has different snapshot than the host for worktree {}",
5593                    guest_client.username,
5594                    id
5595                );
5596            }
5597
5598            guest_client
5599                .project
5600                .as_ref()
5601                .unwrap()
5602                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5603
5604            for guest_buffer in &guest_client.buffers {
5605                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5606                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5607                    project.buffer_for_id(buffer_id, cx).expect(&format!(
5608                        "host does not have buffer for guest:{}, peer:{}, id:{}",
5609                        guest_client.username, guest_client.peer_id, buffer_id
5610                    ))
5611                });
5612                let path = host_buffer
5613                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5614
5615                assert_eq!(
5616                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5617                    0,
5618                    "{}, buffer {}, path {:?} has deferred operations",
5619                    guest_client.username,
5620                    buffer_id,
5621                    path,
5622                );
5623                assert_eq!(
5624                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5625                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5626                    "{}, buffer {}, path {:?}, differs from the host's buffer",
5627                    guest_client.username,
5628                    buffer_id,
5629                    path
5630                );
5631            }
5632
5633            guest_cx.update(|_| drop(guest_client));
5634        }
5635
5636        host_cx.update(|_| drop(host_client));
5637    }
5638
5639    struct TestServer {
5640        peer: Arc<Peer>,
5641        app_state: Arc<AppState>,
5642        server: Arc<Server>,
5643        foreground: Rc<executor::Foreground>,
5644        notifications: mpsc::UnboundedReceiver<()>,
5645        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5646        forbid_connections: Arc<AtomicBool>,
5647        _test_db: TestDb,
5648    }
5649
5650    impl TestServer {
5651        async fn start(
5652            foreground: Rc<executor::Foreground>,
5653            background: Arc<executor::Background>,
5654        ) -> Self {
5655            let test_db = TestDb::fake(background);
5656            let app_state = Self::build_app_state(&test_db).await;
5657            let peer = Peer::new();
5658            let notifications = mpsc::unbounded();
5659            let server = Server::new(app_state.clone(), Some(notifications.0));
5660            Self {
5661                peer,
5662                app_state,
5663                server,
5664                foreground,
5665                notifications: notifications.1,
5666                connection_killers: Default::default(),
5667                forbid_connections: Default::default(),
5668                _test_db: test_db,
5669            }
5670        }
5671
5672        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
5673            cx.update(|cx| {
5674                let settings = Settings::test(cx);
5675                cx.set_global(settings);
5676            });
5677
5678            let http = FakeHttpClient::with_404_response();
5679            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
5680            let client_name = name.to_string();
5681            let mut client = Client::new(http.clone());
5682            let server = self.server.clone();
5683            let connection_killers = self.connection_killers.clone();
5684            let forbid_connections = self.forbid_connections.clone();
5685            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
5686
5687            Arc::get_mut(&mut client)
5688                .unwrap()
5689                .override_authenticate(move |cx| {
5690                    cx.spawn(|_| async move {
5691                        let access_token = "the-token".to_string();
5692                        Ok(Credentials {
5693                            user_id: user_id.0 as u64,
5694                            access_token,
5695                        })
5696                    })
5697                })
5698                .override_establish_connection(move |credentials, cx| {
5699                    assert_eq!(credentials.user_id, user_id.0 as u64);
5700                    assert_eq!(credentials.access_token, "the-token");
5701
5702                    let server = server.clone();
5703                    let connection_killers = connection_killers.clone();
5704                    let forbid_connections = forbid_connections.clone();
5705                    let client_name = client_name.clone();
5706                    let connection_id_tx = connection_id_tx.clone();
5707                    cx.spawn(move |cx| async move {
5708                        if forbid_connections.load(SeqCst) {
5709                            Err(EstablishConnectionError::other(anyhow!(
5710                                "server is forbidding connections"
5711                            )))
5712                        } else {
5713                            let (client_conn, server_conn, killed) =
5714                                Connection::in_memory(cx.background());
5715                            connection_killers.lock().insert(user_id, killed);
5716                            cx.background()
5717                                .spawn(server.handle_connection(
5718                                    server_conn,
5719                                    client_name,
5720                                    user_id,
5721                                    Some(connection_id_tx),
5722                                    cx.background(),
5723                                ))
5724                                .detach();
5725                            Ok(client_conn)
5726                        }
5727                    })
5728                });
5729
5730            client
5731                .authenticate_and_connect(false, &cx.to_async())
5732                .await
5733                .unwrap();
5734
5735            Channel::init(&client);
5736            Project::init(&client);
5737            cx.update(|cx| {
5738                workspace::init(&client, cx);
5739            });
5740
5741            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5742            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5743
5744            let client = TestClient {
5745                client,
5746                peer_id,
5747                username: name.to_string(),
5748                user_store,
5749                language_registry: Arc::new(LanguageRegistry::test()),
5750                project: Default::default(),
5751                buffers: Default::default(),
5752            };
5753            client.wait_for_current_user(cx).await;
5754            client
5755        }
5756
5757        fn disconnect_client(&self, user_id: UserId) {
5758            self.connection_killers
5759                .lock()
5760                .remove(&user_id)
5761                .unwrap()
5762                .store(true, SeqCst);
5763        }
5764
5765        fn forbid_connections(&self) {
5766            self.forbid_connections.store(true, SeqCst);
5767        }
5768
5769        fn allow_connections(&self) {
5770            self.forbid_connections.store(false, SeqCst);
5771        }
5772
5773        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
5774            Arc::new(AppState {
5775                db: test_db.db().clone(),
5776                api_token: Default::default(),
5777            })
5778        }
5779
5780        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
5781            self.server.store.read().await
5782        }
5783
5784        async fn condition<F>(&mut self, mut predicate: F)
5785        where
5786            F: FnMut(&Store) -> bool,
5787        {
5788            assert!(
5789                self.foreground.parking_forbidden(),
5790                "you must call forbid_parking to use server conditions so we don't block indefinitely"
5791            );
5792            while !(predicate)(&*self.server.store.read().await) {
5793                self.foreground.start_waiting();
5794                self.notifications.next().await;
5795                self.foreground.finish_waiting();
5796            }
5797        }
5798    }
5799
5800    impl Deref for TestServer {
5801        type Target = Server;
5802
5803        fn deref(&self) -> &Self::Target {
5804            &self.server
5805        }
5806    }
5807
5808    impl Drop for TestServer {
5809        fn drop(&mut self) {
5810            self.peer.reset();
5811        }
5812    }
5813
5814    struct TestClient {
5815        client: Arc<Client>,
5816        username: String,
5817        pub peer_id: PeerId,
5818        pub user_store: ModelHandle<UserStore>,
5819        language_registry: Arc<LanguageRegistry>,
5820        project: Option<ModelHandle<Project>>,
5821        buffers: HashSet<ModelHandle<language::Buffer>>,
5822    }
5823
5824    impl Deref for TestClient {
5825        type Target = Arc<Client>;
5826
5827        fn deref(&self) -> &Self::Target {
5828            &self.client
5829        }
5830    }
5831
5832    impl TestClient {
5833        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
5834            UserId::from_proto(
5835                self.user_store
5836                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
5837            )
5838        }
5839
5840        async fn wait_for_current_user(&self, cx: &TestAppContext) {
5841            let mut authed_user = self
5842                .user_store
5843                .read_with(cx, |user_store, _| user_store.watch_current_user());
5844            while authed_user.next().await.unwrap().is_none() {}
5845        }
5846
5847        async fn build_local_project(
5848            &mut self,
5849            fs: Arc<FakeFs>,
5850            root_path: impl AsRef<Path>,
5851            cx: &mut TestAppContext,
5852        ) -> (ModelHandle<Project>, WorktreeId) {
5853            let project = cx.update(|cx| {
5854                Project::local(
5855                    self.client.clone(),
5856                    self.user_store.clone(),
5857                    self.language_registry.clone(),
5858                    fs,
5859                    cx,
5860                )
5861            });
5862            self.project = Some(project.clone());
5863            let (worktree, _) = project
5864                .update(cx, |p, cx| {
5865                    p.find_or_create_local_worktree(root_path, true, cx)
5866                })
5867                .await
5868                .unwrap();
5869            worktree
5870                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
5871                .await;
5872            project
5873                .update(cx, |project, _| project.next_remote_id())
5874                .await;
5875            (project, worktree.read_with(cx, |tree, _| tree.id()))
5876        }
5877
5878        async fn build_remote_project(
5879            &mut self,
5880            project_id: u64,
5881            cx: &mut TestAppContext,
5882        ) -> ModelHandle<Project> {
5883            let project = Project::remote(
5884                project_id,
5885                self.client.clone(),
5886                self.user_store.clone(),
5887                self.language_registry.clone(),
5888                FakeFs::new(cx.background()),
5889                &mut cx.to_async(),
5890            )
5891            .await
5892            .unwrap();
5893            self.project = Some(project.clone());
5894            project
5895        }
5896
5897        fn build_workspace(
5898            &self,
5899            project: &ModelHandle<Project>,
5900            cx: &mut TestAppContext,
5901        ) -> ViewHandle<Workspace> {
5902            let (window_id, _) = cx.add_window(|_| EmptyView);
5903            cx.add_view(window_id, |cx| {
5904                let fs = project.read(cx).fs().clone();
5905                Workspace::new(
5906                    &WorkspaceParams {
5907                        fs,
5908                        project: project.clone(),
5909                        user_store: self.user_store.clone(),
5910                        languages: self.language_registry.clone(),
5911                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
5912                        channel_list: cx.add_model(|cx| {
5913                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
5914                        }),
5915                        client: self.client.clone(),
5916                    },
5917                    cx,
5918                )
5919            })
5920        }
5921
5922        async fn simulate_host(
5923            mut self,
5924            project: ModelHandle<Project>,
5925            files: Arc<Mutex<Vec<PathBuf>>>,
5926            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5927            rng: Arc<Mutex<StdRng>>,
5928            mut cx: TestAppContext,
5929        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
5930            async fn simulate_host_internal(
5931                client: &mut TestClient,
5932                project: ModelHandle<Project>,
5933                files: Arc<Mutex<Vec<PathBuf>>>,
5934                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5935                rng: Arc<Mutex<StdRng>>,
5936                cx: &mut TestAppContext,
5937            ) -> anyhow::Result<()> {
5938                let fs = project.read_with(cx, |project, _| project.fs().clone());
5939
5940                while op_start_signal.next().await.is_some() {
5941                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
5942                    match distribution {
5943                        0..=20 if !files.lock().is_empty() => {
5944                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5945                            let mut path = path.as_path();
5946                            while let Some(parent_path) = path.parent() {
5947                                path = parent_path;
5948                                if rng.lock().gen() {
5949                                    break;
5950                                }
5951                            }
5952
5953                            log::info!("Host: find/create local worktree {:?}", path);
5954                            let find_or_create_worktree = project.update(cx, |project, cx| {
5955                                project.find_or_create_local_worktree(path, true, cx)
5956                            });
5957                            if rng.lock().gen() {
5958                                cx.background().spawn(find_or_create_worktree).detach();
5959                            } else {
5960                                find_or_create_worktree.await?;
5961                            }
5962                        }
5963                        10..=80 if !files.lock().is_empty() => {
5964                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
5965                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5966                                let (worktree, path) = project
5967                                    .update(cx, |project, cx| {
5968                                        project.find_or_create_local_worktree(
5969                                            file.clone(),
5970                                            true,
5971                                            cx,
5972                                        )
5973                                    })
5974                                    .await?;
5975                                let project_path =
5976                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
5977                                log::info!(
5978                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
5979                                    file,
5980                                    project_path.0,
5981                                    project_path.1
5982                                );
5983                                let buffer = project
5984                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
5985                                    .await
5986                                    .unwrap();
5987                                client.buffers.insert(buffer.clone());
5988                                buffer
5989                            } else {
5990                                client
5991                                    .buffers
5992                                    .iter()
5993                                    .choose(&mut *rng.lock())
5994                                    .unwrap()
5995                                    .clone()
5996                            };
5997
5998                            if rng.lock().gen_bool(0.1) {
5999                                cx.update(|cx| {
6000                                    log::info!(
6001                                        "Host: dropping buffer {:?}",
6002                                        buffer.read(cx).file().unwrap().full_path(cx)
6003                                    );
6004                                    client.buffers.remove(&buffer);
6005                                    drop(buffer);
6006                                });
6007                            } else {
6008                                buffer.update(cx, |buffer, cx| {
6009                                    log::info!(
6010                                        "Host: updating buffer {:?} ({})",
6011                                        buffer.file().unwrap().full_path(cx),
6012                                        buffer.remote_id()
6013                                    );
6014
6015                                    if rng.lock().gen_bool(0.7) {
6016                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6017                                    } else {
6018                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6019                                    }
6020                                });
6021                            }
6022                        }
6023                        _ => loop {
6024                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6025                            let mut path = PathBuf::new();
6026                            path.push("/");
6027                            for _ in 0..path_component_count {
6028                                let letter = rng.lock().gen_range(b'a'..=b'z');
6029                                path.push(std::str::from_utf8(&[letter]).unwrap());
6030                            }
6031                            path.set_extension("rs");
6032                            let parent_path = path.parent().unwrap();
6033
6034                            log::info!("Host: creating file {:?}", path,);
6035
6036                            if fs.create_dir(&parent_path).await.is_ok()
6037                                && fs.create_file(&path, Default::default()).await.is_ok()
6038                            {
6039                                files.lock().push(path);
6040                                break;
6041                            } else {
6042                                log::info!("Host: cannot create file");
6043                            }
6044                        },
6045                    }
6046
6047                    cx.background().simulate_random_delay().await;
6048                }
6049
6050                Ok(())
6051            }
6052
6053            let result = simulate_host_internal(
6054                &mut self,
6055                project.clone(),
6056                files,
6057                op_start_signal,
6058                rng,
6059                &mut cx,
6060            )
6061            .await;
6062            log::info!("Host done");
6063            self.project = Some(project);
6064            (self, cx, result.err())
6065        }
6066
6067        pub async fn simulate_guest(
6068            mut self,
6069            guest_username: String,
6070            project: ModelHandle<Project>,
6071            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6072            rng: Arc<Mutex<StdRng>>,
6073            mut cx: TestAppContext,
6074        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6075            async fn simulate_guest_internal(
6076                client: &mut TestClient,
6077                guest_username: &str,
6078                project: ModelHandle<Project>,
6079                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6080                rng: Arc<Mutex<StdRng>>,
6081                cx: &mut TestAppContext,
6082            ) -> anyhow::Result<()> {
6083                while op_start_signal.next().await.is_some() {
6084                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6085                        let worktree = if let Some(worktree) =
6086                            project.read_with(cx, |project, cx| {
6087                                project
6088                                    .worktrees(&cx)
6089                                    .filter(|worktree| {
6090                                        let worktree = worktree.read(cx);
6091                                        worktree.is_visible()
6092                                            && worktree.entries(false).any(|e| e.is_file())
6093                                    })
6094                                    .choose(&mut *rng.lock())
6095                            }) {
6096                            worktree
6097                        } else {
6098                            cx.background().simulate_random_delay().await;
6099                            continue;
6100                        };
6101
6102                        let (worktree_root_name, project_path) =
6103                            worktree.read_with(cx, |worktree, _| {
6104                                let entry = worktree
6105                                    .entries(false)
6106                                    .filter(|e| e.is_file())
6107                                    .choose(&mut *rng.lock())
6108                                    .unwrap();
6109                                (
6110                                    worktree.root_name().to_string(),
6111                                    (worktree.id(), entry.path.clone()),
6112                                )
6113                            });
6114                        log::info!(
6115                            "{}: opening path {:?} in worktree {} ({})",
6116                            guest_username,
6117                            project_path.1,
6118                            project_path.0,
6119                            worktree_root_name,
6120                        );
6121                        let buffer = project
6122                            .update(cx, |project, cx| {
6123                                project.open_buffer(project_path.clone(), cx)
6124                            })
6125                            .await?;
6126                        log::info!(
6127                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6128                            guest_username,
6129                            project_path.1,
6130                            project_path.0,
6131                            worktree_root_name,
6132                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6133                        );
6134                        client.buffers.insert(buffer.clone());
6135                        buffer
6136                    } else {
6137                        client
6138                            .buffers
6139                            .iter()
6140                            .choose(&mut *rng.lock())
6141                            .unwrap()
6142                            .clone()
6143                    };
6144
6145                    let choice = rng.lock().gen_range(0..100);
6146                    match choice {
6147                        0..=9 => {
6148                            cx.update(|cx| {
6149                                log::info!(
6150                                    "{}: dropping buffer {:?}",
6151                                    guest_username,
6152                                    buffer.read(cx).file().unwrap().full_path(cx)
6153                                );
6154                                client.buffers.remove(&buffer);
6155                                drop(buffer);
6156                            });
6157                        }
6158                        10..=19 => {
6159                            let completions = project.update(cx, |project, cx| {
6160                                log::info!(
6161                                    "{}: requesting completions for buffer {} ({:?})",
6162                                    guest_username,
6163                                    buffer.read(cx).remote_id(),
6164                                    buffer.read(cx).file().unwrap().full_path(cx)
6165                                );
6166                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6167                                project.completions(&buffer, offset, cx)
6168                            });
6169                            let completions = cx.background().spawn(async move {
6170                                completions
6171                                    .await
6172                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6173                            });
6174                            if rng.lock().gen_bool(0.3) {
6175                                log::info!("{}: detaching completions request", guest_username);
6176                                cx.update(|cx| completions.detach_and_log_err(cx));
6177                            } else {
6178                                completions.await?;
6179                            }
6180                        }
6181                        20..=29 => {
6182                            let code_actions = project.update(cx, |project, cx| {
6183                                log::info!(
6184                                    "{}: requesting code actions for buffer {} ({:?})",
6185                                    guest_username,
6186                                    buffer.read(cx).remote_id(),
6187                                    buffer.read(cx).file().unwrap().full_path(cx)
6188                                );
6189                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6190                                project.code_actions(&buffer, range, cx)
6191                            });
6192                            let code_actions = cx.background().spawn(async move {
6193                                code_actions.await.map_err(|err| {
6194                                    anyhow!("code actions request failed: {:?}", err)
6195                                })
6196                            });
6197                            if rng.lock().gen_bool(0.3) {
6198                                log::info!("{}: detaching code actions request", guest_username);
6199                                cx.update(|cx| code_actions.detach_and_log_err(cx));
6200                            } else {
6201                                code_actions.await?;
6202                            }
6203                        }
6204                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6205                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6206                                log::info!(
6207                                    "{}: saving buffer {} ({:?})",
6208                                    guest_username,
6209                                    buffer.remote_id(),
6210                                    buffer.file().unwrap().full_path(cx)
6211                                );
6212                                (buffer.version(), buffer.save(cx))
6213                            });
6214                            let save = cx.background().spawn(async move {
6215                                let (saved_version, _) = save
6216                                    .await
6217                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6218                                assert!(saved_version.observed_all(&requested_version));
6219                                Ok::<_, anyhow::Error>(())
6220                            });
6221                            if rng.lock().gen_bool(0.3) {
6222                                log::info!("{}: detaching save request", guest_username);
6223                                cx.update(|cx| save.detach_and_log_err(cx));
6224                            } else {
6225                                save.await?;
6226                            }
6227                        }
6228                        40..=44 => {
6229                            let prepare_rename = project.update(cx, |project, cx| {
6230                                log::info!(
6231                                    "{}: preparing rename for buffer {} ({:?})",
6232                                    guest_username,
6233                                    buffer.read(cx).remote_id(),
6234                                    buffer.read(cx).file().unwrap().full_path(cx)
6235                                );
6236                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6237                                project.prepare_rename(buffer, offset, cx)
6238                            });
6239                            let prepare_rename = cx.background().spawn(async move {
6240                                prepare_rename.await.map_err(|err| {
6241                                    anyhow!("prepare rename request failed: {:?}", err)
6242                                })
6243                            });
6244                            if rng.lock().gen_bool(0.3) {
6245                                log::info!("{}: detaching prepare rename request", guest_username);
6246                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6247                            } else {
6248                                prepare_rename.await?;
6249                            }
6250                        }
6251                        45..=49 => {
6252                            let definitions = project.update(cx, |project, cx| {
6253                                log::info!(
6254                                    "{}: requesting definitions for buffer {} ({:?})",
6255                                    guest_username,
6256                                    buffer.read(cx).remote_id(),
6257                                    buffer.read(cx).file().unwrap().full_path(cx)
6258                                );
6259                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6260                                project.definition(&buffer, offset, cx)
6261                            });
6262                            let definitions = cx.background().spawn(async move {
6263                                definitions
6264                                    .await
6265                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6266                            });
6267                            if rng.lock().gen_bool(0.3) {
6268                                log::info!("{}: detaching definitions request", guest_username);
6269                                cx.update(|cx| definitions.detach_and_log_err(cx));
6270                            } else {
6271                                client
6272                                    .buffers
6273                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6274                            }
6275                        }
6276                        50..=54 => {
6277                            let highlights = project.update(cx, |project, cx| {
6278                                log::info!(
6279                                    "{}: requesting highlights for buffer {} ({:?})",
6280                                    guest_username,
6281                                    buffer.read(cx).remote_id(),
6282                                    buffer.read(cx).file().unwrap().full_path(cx)
6283                                );
6284                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6285                                project.document_highlights(&buffer, offset, cx)
6286                            });
6287                            let highlights = cx.background().spawn(async move {
6288                                highlights
6289                                    .await
6290                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6291                            });
6292                            if rng.lock().gen_bool(0.3) {
6293                                log::info!("{}: detaching highlights request", guest_username);
6294                                cx.update(|cx| highlights.detach_and_log_err(cx));
6295                            } else {
6296                                highlights.await?;
6297                            }
6298                        }
6299                        55..=59 => {
6300                            let search = project.update(cx, |project, cx| {
6301                                let query = rng.lock().gen_range('a'..='z');
6302                                log::info!("{}: project-wide search {:?}", guest_username, query);
6303                                project.search(SearchQuery::text(query, false, false), cx)
6304                            });
6305                            let search = cx.background().spawn(async move {
6306                                search
6307                                    .await
6308                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
6309                            });
6310                            if rng.lock().gen_bool(0.3) {
6311                                log::info!("{}: detaching search request", guest_username);
6312                                cx.update(|cx| search.detach_and_log_err(cx));
6313                            } else {
6314                                client.buffers.extend(search.await?.into_keys());
6315                            }
6316                        }
6317                        _ => {
6318                            buffer.update(cx, |buffer, cx| {
6319                                log::info!(
6320                                    "{}: updating buffer {} ({:?})",
6321                                    guest_username,
6322                                    buffer.remote_id(),
6323                                    buffer.file().unwrap().full_path(cx)
6324                                );
6325                                if rng.lock().gen_bool(0.7) {
6326                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6327                                } else {
6328                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6329                                }
6330                            });
6331                        }
6332                    }
6333                    cx.background().simulate_random_delay().await;
6334                }
6335                Ok(())
6336            }
6337
6338            let result = simulate_guest_internal(
6339                &mut self,
6340                &guest_username,
6341                project.clone(),
6342                op_start_signal,
6343                rng,
6344                &mut cx,
6345            )
6346            .await;
6347            log::info!("{}: done", guest_username);
6348
6349            self.project = Some(project);
6350            (self, cx, result.err())
6351        }
6352    }
6353
6354    impl Drop for TestClient {
6355        fn drop(&mut self) {
6356            self.client.tear_down();
6357        }
6358    }
6359
6360    impl Executor for Arc<gpui::executor::Background> {
6361        type Sleep = gpui::executor::Timer;
6362
6363        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6364            self.spawn(future).detach();
6365        }
6366
6367        fn sleep(&self, duration: Duration) -> Self::Sleep {
6368            self.as_ref().timer(duration)
6369        }
6370    }
6371
6372    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6373        channel
6374            .messages()
6375            .cursor::<()>()
6376            .map(|m| {
6377                (
6378                    m.sender.github_login.clone(),
6379                    m.body.clone(),
6380                    m.is_pending(),
6381                )
6382            })
6383            .collect()
6384    }
6385
6386    struct EmptyView;
6387
6388    impl gpui::Entity for EmptyView {
6389        type Event = ();
6390    }
6391
6392    impl gpui::View for EmptyView {
6393        fn ui_name() -> &'static str {
6394            "empty view"
6395        }
6396
6397        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6398            gpui::Element::boxed(gpui::elements::Empty)
6399        }
6400    }
6401}