rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{self, ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::IntoResponse,
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::HashMap;
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::{
  40        atomic::{AtomicBool, Ordering::SeqCst},
  41        Arc,
  42    },
  43    time::Duration,
  44};
  45use store::{Store, Worktree};
  46use time::OffsetDateTime;
  47use tokio::{
  48    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  49    time::Sleep,
  50};
  51use tower::ServiceBuilder;
  52use tracing::{info_span, Instrument};
  53
  54type MessageHandler =
  55    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  56
  57struct Response<R> {
  58    server: Arc<Server>,
  59    receipt: Receipt<R>,
  60    responded: Arc<AtomicBool>,
  61}
  62
  63impl<R: RequestMessage> Response<R> {
  64    fn send(self, payload: R::Response) -> Result<()> {
  65        self.responded.store(true, SeqCst);
  66        self.server.peer.respond(self.receipt, payload)?;
  67        Ok(())
  68    }
  69
  70    fn into_receipt(self) -> Receipt<R> {
  71        self.responded.store(true, SeqCst);
  72        self.receipt
  73    }
  74}
  75
  76pub struct Server {
  77    peer: Arc<Peer>,
  78    store: RwLock<Store>,
  79    app_state: Arc<AppState>,
  80    handlers: HashMap<TypeId, MessageHandler>,
  81    notifications: Option<mpsc::UnboundedSender<()>>,
  82}
  83
  84pub trait Executor: Send + Clone {
  85    type Sleep: Send + Future;
  86    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  87    fn sleep(&self, duration: Duration) -> Self::Sleep;
  88}
  89
  90#[derive(Clone)]
  91pub struct RealExecutor;
  92
  93const MESSAGE_COUNT_PER_PAGE: usize = 100;
  94const MAX_MESSAGE_LEN: usize = 1024;
  95
  96struct StoreReadGuard<'a> {
  97    guard: RwLockReadGuard<'a, Store>,
  98    _not_send: PhantomData<Rc<()>>,
  99}
 100
 101struct StoreWriteGuard<'a> {
 102    guard: RwLockWriteGuard<'a, Store>,
 103    _not_send: PhantomData<Rc<()>>,
 104}
 105
 106impl Server {
 107    pub fn new(
 108        app_state: Arc<AppState>,
 109        notifications: Option<mpsc::UnboundedSender<()>>,
 110    ) -> Arc<Self> {
 111        let mut server = Self {
 112            peer: Peer::new(),
 113            app_state,
 114            store: Default::default(),
 115            handlers: Default::default(),
 116            notifications,
 117        };
 118
 119        server
 120            .add_request_handler(Server::ping)
 121            .add_request_handler(Server::register_project)
 122            .add_message_handler(Server::unregister_project)
 123            .add_request_handler(Server::join_project)
 124            .add_message_handler(Server::leave_project)
 125            .add_message_handler(Server::respond_to_join_project_request)
 126            .add_request_handler(Server::register_worktree)
 127            .add_message_handler(Server::unregister_worktree)
 128            .add_request_handler(Server::update_worktree)
 129            .add_message_handler(Server::start_language_server)
 130            .add_message_handler(Server::update_language_server)
 131            .add_message_handler(Server::update_diagnostic_summary)
 132            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 133            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 134            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 135            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 136            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 137            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 138            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 139            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 140            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 141            .add_request_handler(
 142                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 143            )
 144            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 145            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 146            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 147            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 148            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 149            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 150            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 151            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 152            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 153            .add_request_handler(Server::update_buffer)
 154            .add_message_handler(Server::update_buffer_file)
 155            .add_message_handler(Server::buffer_reloaded)
 156            .add_message_handler(Server::buffer_saved)
 157            .add_request_handler(Server::save_buffer)
 158            .add_request_handler(Server::get_channels)
 159            .add_request_handler(Server::get_users)
 160            .add_request_handler(Server::fuzzy_search_users)
 161            .add_request_handler(Server::request_contact)
 162            .add_request_handler(Server::remove_contact)
 163            .add_request_handler(Server::respond_to_contact_request)
 164            .add_request_handler(Server::join_channel)
 165            .add_message_handler(Server::leave_channel)
 166            .add_request_handler(Server::send_channel_message)
 167            .add_request_handler(Server::follow)
 168            .add_message_handler(Server::unfollow)
 169            .add_message_handler(Server::update_followers)
 170            .add_request_handler(Server::get_channel_messages);
 171
 172        Arc::new(server)
 173    }
 174
 175    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 176    where
 177        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 178        Fut: 'static + Send + Future<Output = Result<()>>,
 179        M: EnvelopedMessage,
 180    {
 181        let prev_handler = self.handlers.insert(
 182            TypeId::of::<M>(),
 183            Box::new(move |server, envelope| {
 184                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 185                let span = info_span!(
 186                    "handle message",
 187                    payload_type = envelope.payload_type_name(),
 188                    payload = format!("{:?}", envelope.payload).as_str(),
 189                );
 190                let future = (handler)(server, *envelope);
 191                async move {
 192                    if let Err(error) = future.await {
 193                        tracing::error!(%error, "error handling message");
 194                    }
 195                }
 196                .instrument(span)
 197                .boxed()
 198            }),
 199        );
 200        if prev_handler.is_some() {
 201            panic!("registered a handler for the same message twice");
 202        }
 203        self
 204    }
 205
 206    /// Handle a request while holding a lock to the store. This is useful when we're registering
 207    /// a connection but we want to respond on the connection before anybody else can send on it.
 208    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 209    where
 210        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
 211        Fut: Send + Future<Output = Result<()>>,
 212        M: RequestMessage,
 213    {
 214        let handler = Arc::new(handler);
 215        self.add_message_handler(move |server, envelope| {
 216            let receipt = envelope.receipt();
 217            let handler = handler.clone();
 218            async move {
 219                let responded = Arc::new(AtomicBool::default());
 220                let response = Response {
 221                    server: server.clone(),
 222                    responded: responded.clone(),
 223                    receipt: envelope.receipt(),
 224                };
 225                match (handler)(server.clone(), envelope, response).await {
 226                    Ok(()) => {
 227                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 228                            Ok(())
 229                        } else {
 230                            Err(anyhow!("handler did not send a response"))?
 231                        }
 232                    }
 233                    Err(error) => {
 234                        server.peer.respond_with_error(
 235                            receipt,
 236                            proto::Error {
 237                                message: error.to_string(),
 238                            },
 239                        )?;
 240                        Err(error)
 241                    }
 242                }
 243            }
 244        })
 245    }
 246
 247    pub fn handle_connection<E: Executor>(
 248        self: &Arc<Self>,
 249        connection: Connection,
 250        address: String,
 251        user_id: UserId,
 252        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 253        executor: E,
 254    ) -> impl Future<Output = Result<()>> {
 255        let mut this = self.clone();
 256        let span = info_span!("handle connection", %user_id, %address);
 257        async move {
 258            let (connection_id, handle_io, mut incoming_rx) = this
 259                .peer
 260                .add_connection(connection, {
 261                    let executor = executor.clone();
 262                    move |duration| {
 263                        let timer = executor.sleep(duration);
 264                        async move {
 265                            timer.await;
 266                        }
 267                    }
 268                })
 269                .await;
 270
 271            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 272
 273            if let Some(send_connection_id) = send_connection_id.as_mut() {
 274                let _ = send_connection_id.send(connection_id).await;
 275            }
 276
 277            let contacts = this.app_state.db.get_contacts(user_id).await?;
 278
 279            {
 280                let mut store = this.store_mut().await;
 281                store.add_connection(connection_id, user_id);
 282                this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 283            }
 284            this.update_user_contacts(user_id).await?;
 285
 286            let handle_io = handle_io.fuse();
 287            futures::pin_mut!(handle_io);
 288            loop {
 289                let next_message = incoming_rx.next().fuse();
 290                futures::pin_mut!(next_message);
 291                futures::select_biased! {
 292                    result = handle_io => {
 293                        if let Err(error) = result {
 294                            tracing::error!(%error, "error handling I/O");
 295                        }
 296                        break;
 297                    }
 298                    message = next_message => {
 299                        if let Some(message) = message {
 300                            let type_name = message.payload_type_name();
 301                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 302                            async {
 303                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 304                                    let notifications = this.notifications.clone();
 305                                    let is_background = message.is_background();
 306                                    let handle_message = (handler)(this.clone(), message);
 307                                    let handle_message = async move {
 308                                        handle_message.await;
 309                                        if let Some(mut notifications) = notifications {
 310                                            let _ = notifications.send(()).await;
 311                                        }
 312                                    };
 313                                    if is_background {
 314                                        executor.spawn_detached(handle_message);
 315                                    } else {
 316                                        handle_message.await;
 317                                    }
 318                                } else {
 319                                    tracing::error!("no message handler");
 320                                }
 321                            }.instrument(span).await;
 322                        } else {
 323                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 324                            break;
 325                        }
 326                    }
 327                }
 328            }
 329
 330            if let Err(error) = this.sign_out(connection_id).await {
 331                tracing::error!(%error, "error signing out");
 332            }
 333
 334            Ok(())
 335        }.instrument(span)
 336    }
 337
 338    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 339        self.peer.disconnect(connection_id);
 340
 341        let removed_user_id = {
 342            let mut store = self.store_mut().await;
 343            let removed_connection = store.remove_connection(connection_id)?;
 344
 345            for (project_id, project) in removed_connection.hosted_projects {
 346                broadcast(connection_id, project.guests.keys().copied(), |conn_id| {
 347                    self.peer
 348                        .send(conn_id, proto::UnregisterProject { project_id })
 349                });
 350
 351                for (_, receipts) in project.join_requests {
 352                    for receipt in receipts {
 353                        self.peer.respond(
 354                            receipt,
 355                            proto::JoinProjectResponse {
 356                                variant: Some(proto::join_project_response::Variant::Decline(
 357                                    proto::join_project_response::Decline {
 358                                        reason: proto::join_project_response::decline::Reason::WentOffline as i32
 359                                    },
 360                                )),
 361                            },
 362                        )?;
 363                    }
 364                }
 365            }
 366
 367            for project_id in removed_connection.guest_project_ids {
 368                if let Some(project) = store.project(project_id).trace_err() {
 369                    broadcast(connection_id, project.connection_ids(), |conn_id| {
 370                        self.peer.send(
 371                            conn_id,
 372                            proto::RemoveProjectCollaborator {
 373                                project_id,
 374                                peer_id: connection_id.0,
 375                            },
 376                        )
 377                    });
 378                    if project.guests.is_empty() {
 379                        self.peer
 380                            .send(
 381                                project.host_connection_id,
 382                                proto::ProjectUnshared { project_id },
 383                            )
 384                            .trace_err();
 385                    }
 386                }
 387            }
 388
 389            removed_connection.user_id
 390        };
 391
 392        self.update_user_contacts(removed_user_id).await?;
 393
 394        Ok(())
 395    }
 396
 397    async fn ping(
 398        self: Arc<Server>,
 399        _: TypedEnvelope<proto::Ping>,
 400        response: Response<proto::Ping>,
 401    ) -> Result<()> {
 402        response.send(proto::Ack {})?;
 403        Ok(())
 404    }
 405
 406    async fn register_project(
 407        self: Arc<Server>,
 408        request: TypedEnvelope<proto::RegisterProject>,
 409        response: Response<proto::RegisterProject>,
 410    ) -> Result<()> {
 411        let user_id;
 412        let project_id;
 413        {
 414            let mut state = self.store_mut().await;
 415            user_id = state.user_id_for_connection(request.sender_id)?;
 416            project_id = state.register_project(request.sender_id, user_id);
 417        };
 418        self.update_user_contacts(user_id).await?;
 419        response.send(proto::RegisterProjectResponse { project_id })?;
 420        Ok(())
 421    }
 422
 423    async fn unregister_project(
 424        self: Arc<Server>,
 425        request: TypedEnvelope<proto::UnregisterProject>,
 426    ) -> Result<()> {
 427        let (user_id, project) = {
 428            let mut state = self.store_mut().await;
 429            let project =
 430                state.unregister_project(request.payload.project_id, request.sender_id)?;
 431            (state.user_id_for_connection(request.sender_id)?, project)
 432        };
 433        for (_, receipts) in project.join_requests {
 434            for receipt in receipts {
 435                self.peer.respond(
 436                    receipt,
 437                    proto::JoinProjectResponse {
 438                        variant: Some(proto::join_project_response::Variant::Decline(
 439                            proto::join_project_response::Decline {
 440                                reason: proto::join_project_response::decline::Reason::Closed
 441                                    as i32,
 442                            },
 443                        )),
 444                    },
 445                )?;
 446            }
 447        }
 448
 449        self.update_user_contacts(user_id).await?;
 450        Ok(())
 451    }
 452
 453    async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
 454        let contacts = self.app_state.db.get_contacts(user_id).await?;
 455        let store = self.store().await;
 456        let updated_contact = store.contact_for_user(user_id, false);
 457        for contact in contacts {
 458            if let db::Contact::Accepted {
 459                user_id: contact_user_id,
 460                ..
 461            } = contact
 462            {
 463                for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
 464                    self.peer
 465                        .send(
 466                            contact_conn_id,
 467                            proto::UpdateContacts {
 468                                contacts: vec![updated_contact.clone()],
 469                                remove_contacts: Default::default(),
 470                                incoming_requests: Default::default(),
 471                                remove_incoming_requests: Default::default(),
 472                                outgoing_requests: Default::default(),
 473                                remove_outgoing_requests: Default::default(),
 474                            },
 475                        )
 476                        .trace_err();
 477                }
 478            }
 479        }
 480        Ok(())
 481    }
 482
 483    async fn join_project(
 484        self: Arc<Server>,
 485        request: TypedEnvelope<proto::JoinProject>,
 486        response: Response<proto::JoinProject>,
 487    ) -> Result<()> {
 488        let project_id = request.payload.project_id;
 489        let host_user_id;
 490        let guest_user_id;
 491        let host_connection_id;
 492        {
 493            let state = self.store().await;
 494            let project = state.project(project_id)?;
 495            host_user_id = project.host_user_id;
 496            host_connection_id = project.host_connection_id;
 497            guest_user_id = state.user_id_for_connection(request.sender_id)?;
 498        };
 499
 500        let has_contact = self
 501            .app_state
 502            .db
 503            .has_contact(guest_user_id, host_user_id)
 504            .await?;
 505        if !has_contact {
 506            return Err(anyhow!("no such project"))?;
 507        }
 508
 509        self.store_mut().await.request_join_project(
 510            guest_user_id,
 511            project_id,
 512            response.into_receipt(),
 513        )?;
 514        self.peer.send(
 515            host_connection_id,
 516            proto::RequestJoinProject {
 517                project_id,
 518                requester_id: guest_user_id.to_proto(),
 519            },
 520        )?;
 521        Ok(())
 522    }
 523
 524    async fn respond_to_join_project_request(
 525        self: Arc<Server>,
 526        request: TypedEnvelope<proto::RespondToJoinProjectRequest>,
 527    ) -> Result<()> {
 528        let host_user_id;
 529
 530        {
 531            let mut state = self.store_mut().await;
 532            let project_id = request.payload.project_id;
 533            let project = state.project(project_id)?;
 534            if project.host_connection_id != request.sender_id {
 535                Err(anyhow!("no such connection"))?;
 536            }
 537
 538            host_user_id = project.host_user_id;
 539            let guest_user_id = UserId::from_proto(request.payload.requester_id);
 540
 541            if !request.payload.allow {
 542                let receipts = state
 543                    .deny_join_project_request(request.sender_id, guest_user_id, project_id)
 544                    .ok_or_else(|| anyhow!("no such request"))?;
 545                for receipt in receipts {
 546                    self.peer.respond(
 547                        receipt,
 548                        proto::JoinProjectResponse {
 549                            variant: Some(proto::join_project_response::Variant::Decline(
 550                                proto::join_project_response::Decline {
 551                                    reason: proto::join_project_response::decline::Reason::Declined
 552                                        as i32,
 553                                },
 554                            )),
 555                        },
 556                    )?;
 557                }
 558                return Ok(());
 559            }
 560
 561            let (receipts_with_replica_ids, project) = state
 562                .accept_join_project_request(request.sender_id, guest_user_id, project_id)
 563                .ok_or_else(|| anyhow!("no such request"))?;
 564
 565            let peer_count = project.guests.len();
 566            let mut collaborators = Vec::with_capacity(peer_count);
 567            collaborators.push(proto::Collaborator {
 568                peer_id: project.host_connection_id.0,
 569                replica_id: 0,
 570                user_id: project.host_user_id.to_proto(),
 571            });
 572            let worktrees = project
 573                .worktrees
 574                .iter()
 575                .filter_map(|(id, shared_worktree)| {
 576                    let worktree = project.worktrees.get(&id)?;
 577                    Some(proto::Worktree {
 578                        id: *id,
 579                        root_name: worktree.root_name.clone(),
 580                        entries: shared_worktree.entries.values().cloned().collect(),
 581                        diagnostic_summaries: shared_worktree
 582                            .diagnostic_summaries
 583                            .values()
 584                            .cloned()
 585                            .collect(),
 586                        visible: worktree.visible,
 587                        scan_id: shared_worktree.scan_id,
 588                    })
 589                })
 590                .collect::<Vec<_>>();
 591
 592            // Add all guests other than the requesting user's own connections as collaborators
 593            for (peer_conn_id, (peer_replica_id, peer_user_id)) in &project.guests {
 594                if receipts_with_replica_ids
 595                    .iter()
 596                    .all(|(receipt, _)| receipt.sender_id != *peer_conn_id)
 597                {
 598                    collaborators.push(proto::Collaborator {
 599                        peer_id: peer_conn_id.0,
 600                        replica_id: *peer_replica_id as u32,
 601                        user_id: peer_user_id.to_proto(),
 602                    });
 603                }
 604            }
 605
 606            for conn_id in project.connection_ids() {
 607                for (receipt, replica_id) in &receipts_with_replica_ids {
 608                    if conn_id != receipt.sender_id {
 609                        self.peer.send(
 610                            conn_id,
 611                            proto::AddProjectCollaborator {
 612                                project_id,
 613                                collaborator: Some(proto::Collaborator {
 614                                    peer_id: receipt.sender_id.0,
 615                                    replica_id: *replica_id as u32,
 616                                    user_id: guest_user_id.to_proto(),
 617                                }),
 618                            },
 619                        )?;
 620                    }
 621                }
 622            }
 623
 624            for (receipt, replica_id) in receipts_with_replica_ids {
 625                self.peer.respond(
 626                    receipt,
 627                    proto::JoinProjectResponse {
 628                        variant: Some(proto::join_project_response::Variant::Accept(
 629                            proto::join_project_response::Accept {
 630                                worktrees: worktrees.clone(),
 631                                replica_id: replica_id as u32,
 632                                collaborators: collaborators.clone(),
 633                                language_servers: project.language_servers.clone(),
 634                            },
 635                        )),
 636                    },
 637                )?;
 638            }
 639        }
 640
 641        self.update_user_contacts(host_user_id).await?;
 642        Ok(())
 643    }
 644
 645    async fn leave_project(
 646        self: Arc<Server>,
 647        request: TypedEnvelope<proto::LeaveProject>,
 648    ) -> Result<()> {
 649        let sender_id = request.sender_id;
 650        let project_id = request.payload.project_id;
 651        let project;
 652        {
 653            let mut store = self.store_mut().await;
 654            project = store.leave_project(sender_id, project_id)?;
 655
 656            if project.remove_collaborator {
 657                broadcast(sender_id, project.connection_ids, |conn_id| {
 658                    self.peer.send(
 659                        conn_id,
 660                        proto::RemoveProjectCollaborator {
 661                            project_id,
 662                            peer_id: sender_id.0,
 663                        },
 664                    )
 665                });
 666            }
 667
 668            if let Some(requester_id) = project.cancel_request {
 669                self.peer.send(
 670                    project.host_connection_id,
 671                    proto::JoinProjectRequestCancelled {
 672                        project_id,
 673                        requester_id: requester_id.to_proto(),
 674                    },
 675                )?;
 676            }
 677
 678            if project.unshare {
 679                self.peer.send(
 680                    project.host_connection_id,
 681                    proto::ProjectUnshared { project_id },
 682                )?;
 683            }
 684        }
 685        self.update_user_contacts(project.host_user_id).await?;
 686        Ok(())
 687    }
 688
 689    async fn register_worktree(
 690        self: Arc<Server>,
 691        request: TypedEnvelope<proto::RegisterWorktree>,
 692        response: Response<proto::RegisterWorktree>,
 693    ) -> Result<()> {
 694        let host_user_id;
 695        {
 696            let mut state = self.store_mut().await;
 697            host_user_id = state.user_id_for_connection(request.sender_id)?;
 698
 699            let guest_connection_ids = state
 700                .read_project(request.payload.project_id, request.sender_id)?
 701                .guest_connection_ids();
 702            state.register_worktree(
 703                request.payload.project_id,
 704                request.payload.worktree_id,
 705                request.sender_id,
 706                Worktree {
 707                    root_name: request.payload.root_name.clone(),
 708                    visible: request.payload.visible,
 709                    ..Default::default()
 710                },
 711            )?;
 712
 713            broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 714                self.peer
 715                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 716            });
 717        }
 718        self.update_user_contacts(host_user_id).await?;
 719        response.send(proto::Ack {})?;
 720        Ok(())
 721    }
 722
 723    async fn unregister_worktree(
 724        self: Arc<Server>,
 725        request: TypedEnvelope<proto::UnregisterWorktree>,
 726    ) -> Result<()> {
 727        let host_user_id;
 728        let project_id = request.payload.project_id;
 729        let worktree_id = request.payload.worktree_id;
 730        {
 731            let mut state = self.store_mut().await;
 732            let (_, guest_connection_ids) =
 733                state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 734            host_user_id = state.user_id_for_connection(request.sender_id)?;
 735            broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 736                self.peer.send(
 737                    conn_id,
 738                    proto::UnregisterWorktree {
 739                        project_id,
 740                        worktree_id,
 741                    },
 742                )
 743            });
 744        }
 745        self.update_user_contacts(host_user_id).await?;
 746        Ok(())
 747    }
 748
 749    async fn update_worktree(
 750        self: Arc<Server>,
 751        request: TypedEnvelope<proto::UpdateWorktree>,
 752        response: Response<proto::UpdateWorktree>,
 753    ) -> Result<()> {
 754        let connection_ids = self.store_mut().await.update_worktree(
 755            request.sender_id,
 756            request.payload.project_id,
 757            request.payload.worktree_id,
 758            &request.payload.removed_entries,
 759            &request.payload.updated_entries,
 760            request.payload.scan_id,
 761        )?;
 762
 763        broadcast(request.sender_id, connection_ids, |connection_id| {
 764            self.peer
 765                .forward_send(request.sender_id, connection_id, request.payload.clone())
 766        });
 767        response.send(proto::Ack {})?;
 768        Ok(())
 769    }
 770
 771    async fn update_diagnostic_summary(
 772        self: Arc<Server>,
 773        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 774    ) -> Result<()> {
 775        let summary = request
 776            .payload
 777            .summary
 778            .clone()
 779            .ok_or_else(|| anyhow!("invalid summary"))?;
 780        let receiver_ids = self.store_mut().await.update_diagnostic_summary(
 781            request.payload.project_id,
 782            request.payload.worktree_id,
 783            request.sender_id,
 784            summary,
 785        )?;
 786
 787        broadcast(request.sender_id, receiver_ids, |connection_id| {
 788            self.peer
 789                .forward_send(request.sender_id, connection_id, request.payload.clone())
 790        });
 791        Ok(())
 792    }
 793
 794    async fn start_language_server(
 795        self: Arc<Server>,
 796        request: TypedEnvelope<proto::StartLanguageServer>,
 797    ) -> Result<()> {
 798        let receiver_ids = self.store_mut().await.start_language_server(
 799            request.payload.project_id,
 800            request.sender_id,
 801            request
 802                .payload
 803                .server
 804                .clone()
 805                .ok_or_else(|| anyhow!("invalid language server"))?,
 806        )?;
 807        broadcast(request.sender_id, receiver_ids, |connection_id| {
 808            self.peer
 809                .forward_send(request.sender_id, connection_id, request.payload.clone())
 810        });
 811        Ok(())
 812    }
 813
 814    async fn update_language_server(
 815        self: Arc<Server>,
 816        request: TypedEnvelope<proto::UpdateLanguageServer>,
 817    ) -> Result<()> {
 818        let receiver_ids = self
 819            .store()
 820            .await
 821            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 822        broadcast(request.sender_id, receiver_ids, |connection_id| {
 823            self.peer
 824                .forward_send(request.sender_id, connection_id, request.payload.clone())
 825        });
 826        Ok(())
 827    }
 828
 829    async fn forward_project_request<T>(
 830        self: Arc<Server>,
 831        request: TypedEnvelope<T>,
 832        response: Response<T>,
 833    ) -> Result<()>
 834    where
 835        T: EntityMessage + RequestMessage,
 836    {
 837        let host_connection_id = self
 838            .store()
 839            .await
 840            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 841            .host_connection_id;
 842
 843        response.send(
 844            self.peer
 845                .forward_request(request.sender_id, host_connection_id, request.payload)
 846                .await?,
 847        )?;
 848        Ok(())
 849    }
 850
 851    async fn save_buffer(
 852        self: Arc<Server>,
 853        request: TypedEnvelope<proto::SaveBuffer>,
 854        response: Response<proto::SaveBuffer>,
 855    ) -> Result<()> {
 856        let host = self
 857            .store()
 858            .await
 859            .read_project(request.payload.project_id, request.sender_id)?
 860            .host_connection_id;
 861        let response_payload = self
 862            .peer
 863            .forward_request(request.sender_id, host, request.payload.clone())
 864            .await?;
 865
 866        let mut guests = self
 867            .store()
 868            .await
 869            .read_project(request.payload.project_id, request.sender_id)?
 870            .connection_ids();
 871        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 872        broadcast(host, guests, |conn_id| {
 873            self.peer
 874                .forward_send(host, conn_id, response_payload.clone())
 875        });
 876        response.send(response_payload)?;
 877        Ok(())
 878    }
 879
 880    async fn update_buffer(
 881        self: Arc<Server>,
 882        request: TypedEnvelope<proto::UpdateBuffer>,
 883        response: Response<proto::UpdateBuffer>,
 884    ) -> Result<()> {
 885        let receiver_ids = self
 886            .store()
 887            .await
 888            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 889        broadcast(request.sender_id, receiver_ids, |connection_id| {
 890            self.peer
 891                .forward_send(request.sender_id, connection_id, request.payload.clone())
 892        });
 893        response.send(proto::Ack {})?;
 894        Ok(())
 895    }
 896
 897    async fn update_buffer_file(
 898        self: Arc<Server>,
 899        request: TypedEnvelope<proto::UpdateBufferFile>,
 900    ) -> Result<()> {
 901        let receiver_ids = self
 902            .store()
 903            .await
 904            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 905        broadcast(request.sender_id, receiver_ids, |connection_id| {
 906            self.peer
 907                .forward_send(request.sender_id, connection_id, request.payload.clone())
 908        });
 909        Ok(())
 910    }
 911
 912    async fn buffer_reloaded(
 913        self: Arc<Server>,
 914        request: TypedEnvelope<proto::BufferReloaded>,
 915    ) -> Result<()> {
 916        let receiver_ids = self
 917            .store()
 918            .await
 919            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 920        broadcast(request.sender_id, receiver_ids, |connection_id| {
 921            self.peer
 922                .forward_send(request.sender_id, connection_id, request.payload.clone())
 923        });
 924        Ok(())
 925    }
 926
 927    async fn buffer_saved(
 928        self: Arc<Server>,
 929        request: TypedEnvelope<proto::BufferSaved>,
 930    ) -> Result<()> {
 931        let receiver_ids = self
 932            .store()
 933            .await
 934            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 935        broadcast(request.sender_id, receiver_ids, |connection_id| {
 936            self.peer
 937                .forward_send(request.sender_id, connection_id, request.payload.clone())
 938        });
 939        Ok(())
 940    }
 941
 942    async fn follow(
 943        self: Arc<Self>,
 944        request: TypedEnvelope<proto::Follow>,
 945        response: Response<proto::Follow>,
 946    ) -> Result<()> {
 947        let leader_id = ConnectionId(request.payload.leader_id);
 948        let follower_id = request.sender_id;
 949        if !self
 950            .store()
 951            .await
 952            .project_connection_ids(request.payload.project_id, follower_id)?
 953            .contains(&leader_id)
 954        {
 955            Err(anyhow!("no such peer"))?;
 956        }
 957        let mut response_payload = self
 958            .peer
 959            .forward_request(request.sender_id, leader_id, request.payload)
 960            .await?;
 961        response_payload
 962            .views
 963            .retain(|view| view.leader_id != Some(follower_id.0));
 964        response.send(response_payload)?;
 965        Ok(())
 966    }
 967
 968    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 969        let leader_id = ConnectionId(request.payload.leader_id);
 970        if !self
 971            .store()
 972            .await
 973            .project_connection_ids(request.payload.project_id, request.sender_id)?
 974            .contains(&leader_id)
 975        {
 976            Err(anyhow!("no such peer"))?;
 977        }
 978        self.peer
 979            .forward_send(request.sender_id, leader_id, request.payload)?;
 980        Ok(())
 981    }
 982
 983    async fn update_followers(
 984        self: Arc<Self>,
 985        request: TypedEnvelope<proto::UpdateFollowers>,
 986    ) -> Result<()> {
 987        let connection_ids = self
 988            .store()
 989            .await
 990            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 991        let leader_id = request
 992            .payload
 993            .variant
 994            .as_ref()
 995            .and_then(|variant| match variant {
 996                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 997                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 998                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 999            });
1000        for follower_id in &request.payload.follower_ids {
1001            let follower_id = ConnectionId(*follower_id);
1002            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
1003                self.peer
1004                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
1005            }
1006        }
1007        Ok(())
1008    }
1009
1010    async fn get_channels(
1011        self: Arc<Server>,
1012        request: TypedEnvelope<proto::GetChannels>,
1013        response: Response<proto::GetChannels>,
1014    ) -> Result<()> {
1015        let user_id = self
1016            .store()
1017            .await
1018            .user_id_for_connection(request.sender_id)?;
1019        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
1020        response.send(proto::GetChannelsResponse {
1021            channels: channels
1022                .into_iter()
1023                .map(|chan| proto::Channel {
1024                    id: chan.id.to_proto(),
1025                    name: chan.name,
1026                })
1027                .collect(),
1028        })?;
1029        Ok(())
1030    }
1031
1032    async fn get_users(
1033        self: Arc<Server>,
1034        request: TypedEnvelope<proto::GetUsers>,
1035        response: Response<proto::GetUsers>,
1036    ) -> Result<()> {
1037        let user_ids = request
1038            .payload
1039            .user_ids
1040            .into_iter()
1041            .map(UserId::from_proto)
1042            .collect();
1043        let users = self
1044            .app_state
1045            .db
1046            .get_users_by_ids(user_ids)
1047            .await?
1048            .into_iter()
1049            .map(|user| proto::User {
1050                id: user.id.to_proto(),
1051                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1052                github_login: user.github_login,
1053            })
1054            .collect();
1055        response.send(proto::UsersResponse { users })?;
1056        Ok(())
1057    }
1058
1059    async fn fuzzy_search_users(
1060        self: Arc<Server>,
1061        request: TypedEnvelope<proto::FuzzySearchUsers>,
1062        response: Response<proto::FuzzySearchUsers>,
1063    ) -> Result<()> {
1064        let user_id = self
1065            .store()
1066            .await
1067            .user_id_for_connection(request.sender_id)?;
1068        let query = request.payload.query;
1069        let db = &self.app_state.db;
1070        let users = match query.len() {
1071            0 => vec![],
1072            1 | 2 => db
1073                .get_user_by_github_login(&query)
1074                .await?
1075                .into_iter()
1076                .collect(),
1077            _ => db.fuzzy_search_users(&query, 10).await?,
1078        };
1079        let users = users
1080            .into_iter()
1081            .filter(|user| user.id != user_id)
1082            .map(|user| proto::User {
1083                id: user.id.to_proto(),
1084                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1085                github_login: user.github_login,
1086            })
1087            .collect();
1088        response.send(proto::UsersResponse { users })?;
1089        Ok(())
1090    }
1091
1092    async fn request_contact(
1093        self: Arc<Server>,
1094        request: TypedEnvelope<proto::RequestContact>,
1095        response: Response<proto::RequestContact>,
1096    ) -> Result<()> {
1097        let requester_id = self
1098            .store()
1099            .await
1100            .user_id_for_connection(request.sender_id)?;
1101        let responder_id = UserId::from_proto(request.payload.responder_id);
1102        if requester_id == responder_id {
1103            return Err(anyhow!("cannot add yourself as a contact"))?;
1104        }
1105
1106        self.app_state
1107            .db
1108            .send_contact_request(requester_id, responder_id)
1109            .await?;
1110
1111        // Update outgoing contact requests of requester
1112        let mut update = proto::UpdateContacts::default();
1113        update.outgoing_requests.push(responder_id.to_proto());
1114        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1115            self.peer.send(connection_id, update.clone())?;
1116        }
1117
1118        // Update incoming contact requests of responder
1119        let mut update = proto::UpdateContacts::default();
1120        update
1121            .incoming_requests
1122            .push(proto::IncomingContactRequest {
1123                requester_id: requester_id.to_proto(),
1124                should_notify: true,
1125            });
1126        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1127            self.peer.send(connection_id, update.clone())?;
1128        }
1129
1130        response.send(proto::Ack {})?;
1131        Ok(())
1132    }
1133
1134    async fn respond_to_contact_request(
1135        self: Arc<Server>,
1136        request: TypedEnvelope<proto::RespondToContactRequest>,
1137        response: Response<proto::RespondToContactRequest>,
1138    ) -> Result<()> {
1139        let responder_id = self
1140            .store()
1141            .await
1142            .user_id_for_connection(request.sender_id)?;
1143        let requester_id = UserId::from_proto(request.payload.requester_id);
1144        if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1145            self.app_state
1146                .db
1147                .dismiss_contact_notification(responder_id, requester_id)
1148                .await?;
1149        } else {
1150            let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1151            self.app_state
1152                .db
1153                .respond_to_contact_request(responder_id, requester_id, accept)
1154                .await?;
1155
1156            let store = self.store().await;
1157            // Update responder with new contact
1158            let mut update = proto::UpdateContacts::default();
1159            if accept {
1160                update
1161                    .contacts
1162                    .push(store.contact_for_user(requester_id, false));
1163            }
1164            update
1165                .remove_incoming_requests
1166                .push(requester_id.to_proto());
1167            for connection_id in store.connection_ids_for_user(responder_id) {
1168                self.peer.send(connection_id, update.clone())?;
1169            }
1170
1171            // Update requester with new contact
1172            let mut update = proto::UpdateContacts::default();
1173            if accept {
1174                update
1175                    .contacts
1176                    .push(store.contact_for_user(responder_id, true));
1177            }
1178            update
1179                .remove_outgoing_requests
1180                .push(responder_id.to_proto());
1181            for connection_id in store.connection_ids_for_user(requester_id) {
1182                self.peer.send(connection_id, update.clone())?;
1183            }
1184        }
1185
1186        response.send(proto::Ack {})?;
1187        Ok(())
1188    }
1189
1190    async fn remove_contact(
1191        self: Arc<Server>,
1192        request: TypedEnvelope<proto::RemoveContact>,
1193        response: Response<proto::RemoveContact>,
1194    ) -> Result<()> {
1195        let requester_id = self
1196            .store()
1197            .await
1198            .user_id_for_connection(request.sender_id)?;
1199        let responder_id = UserId::from_proto(request.payload.user_id);
1200        self.app_state
1201            .db
1202            .remove_contact(requester_id, responder_id)
1203            .await?;
1204
1205        // Update outgoing contact requests of requester
1206        let mut update = proto::UpdateContacts::default();
1207        update
1208            .remove_outgoing_requests
1209            .push(responder_id.to_proto());
1210        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1211            self.peer.send(connection_id, update.clone())?;
1212        }
1213
1214        // Update incoming contact requests of responder
1215        let mut update = proto::UpdateContacts::default();
1216        update
1217            .remove_incoming_requests
1218            .push(requester_id.to_proto());
1219        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1220            self.peer.send(connection_id, update.clone())?;
1221        }
1222
1223        response.send(proto::Ack {})?;
1224        Ok(())
1225    }
1226
1227    async fn join_channel(
1228        self: Arc<Self>,
1229        request: TypedEnvelope<proto::JoinChannel>,
1230        response: Response<proto::JoinChannel>,
1231    ) -> Result<()> {
1232        let user_id = self
1233            .store()
1234            .await
1235            .user_id_for_connection(request.sender_id)?;
1236        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1237        if !self
1238            .app_state
1239            .db
1240            .can_user_access_channel(user_id, channel_id)
1241            .await?
1242        {
1243            Err(anyhow!("access denied"))?;
1244        }
1245
1246        self.store_mut()
1247            .await
1248            .join_channel(request.sender_id, channel_id);
1249        let messages = self
1250            .app_state
1251            .db
1252            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1253            .await?
1254            .into_iter()
1255            .map(|msg| proto::ChannelMessage {
1256                id: msg.id.to_proto(),
1257                body: msg.body,
1258                timestamp: msg.sent_at.unix_timestamp() as u64,
1259                sender_id: msg.sender_id.to_proto(),
1260                nonce: Some(msg.nonce.as_u128().into()),
1261            })
1262            .collect::<Vec<_>>();
1263        response.send(proto::JoinChannelResponse {
1264            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1265            messages,
1266        })?;
1267        Ok(())
1268    }
1269
1270    async fn leave_channel(
1271        self: Arc<Self>,
1272        request: TypedEnvelope<proto::LeaveChannel>,
1273    ) -> Result<()> {
1274        let user_id = self
1275            .store()
1276            .await
1277            .user_id_for_connection(request.sender_id)?;
1278        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1279        if !self
1280            .app_state
1281            .db
1282            .can_user_access_channel(user_id, channel_id)
1283            .await?
1284        {
1285            Err(anyhow!("access denied"))?;
1286        }
1287
1288        self.store_mut()
1289            .await
1290            .leave_channel(request.sender_id, channel_id);
1291
1292        Ok(())
1293    }
1294
1295    async fn send_channel_message(
1296        self: Arc<Self>,
1297        request: TypedEnvelope<proto::SendChannelMessage>,
1298        response: Response<proto::SendChannelMessage>,
1299    ) -> Result<()> {
1300        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1301        let user_id;
1302        let connection_ids;
1303        {
1304            let state = self.store().await;
1305            user_id = state.user_id_for_connection(request.sender_id)?;
1306            connection_ids = state.channel_connection_ids(channel_id)?;
1307        }
1308
1309        // Validate the message body.
1310        let body = request.payload.body.trim().to_string();
1311        if body.len() > MAX_MESSAGE_LEN {
1312            return Err(anyhow!("message is too long"))?;
1313        }
1314        if body.is_empty() {
1315            return Err(anyhow!("message can't be blank"))?;
1316        }
1317
1318        let timestamp = OffsetDateTime::now_utc();
1319        let nonce = request
1320            .payload
1321            .nonce
1322            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1323
1324        let message_id = self
1325            .app_state
1326            .db
1327            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1328            .await?
1329            .to_proto();
1330        let message = proto::ChannelMessage {
1331            sender_id: user_id.to_proto(),
1332            id: message_id,
1333            body,
1334            timestamp: timestamp.unix_timestamp() as u64,
1335            nonce: Some(nonce),
1336        };
1337        broadcast(request.sender_id, connection_ids, |conn_id| {
1338            self.peer.send(
1339                conn_id,
1340                proto::ChannelMessageSent {
1341                    channel_id: channel_id.to_proto(),
1342                    message: Some(message.clone()),
1343                },
1344            )
1345        });
1346        response.send(proto::SendChannelMessageResponse {
1347            message: Some(message),
1348        })?;
1349        Ok(())
1350    }
1351
1352    async fn get_channel_messages(
1353        self: Arc<Self>,
1354        request: TypedEnvelope<proto::GetChannelMessages>,
1355        response: Response<proto::GetChannelMessages>,
1356    ) -> Result<()> {
1357        let user_id = self
1358            .store()
1359            .await
1360            .user_id_for_connection(request.sender_id)?;
1361        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1362        if !self
1363            .app_state
1364            .db
1365            .can_user_access_channel(user_id, channel_id)
1366            .await?
1367        {
1368            Err(anyhow!("access denied"))?;
1369        }
1370
1371        let messages = self
1372            .app_state
1373            .db
1374            .get_channel_messages(
1375                channel_id,
1376                MESSAGE_COUNT_PER_PAGE,
1377                Some(MessageId::from_proto(request.payload.before_message_id)),
1378            )
1379            .await?
1380            .into_iter()
1381            .map(|msg| proto::ChannelMessage {
1382                id: msg.id.to_proto(),
1383                body: msg.body,
1384                timestamp: msg.sent_at.unix_timestamp() as u64,
1385                sender_id: msg.sender_id.to_proto(),
1386                nonce: Some(msg.nonce.as_u128().into()),
1387            })
1388            .collect::<Vec<_>>();
1389        response.send(proto::GetChannelMessagesResponse {
1390            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1391            messages,
1392        })?;
1393        Ok(())
1394    }
1395
1396    async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1397        #[cfg(test)]
1398        tokio::task::yield_now().await;
1399        let guard = self.store.read().await;
1400        #[cfg(test)]
1401        tokio::task::yield_now().await;
1402        StoreReadGuard {
1403            guard,
1404            _not_send: PhantomData,
1405        }
1406    }
1407
1408    async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1409        #[cfg(test)]
1410        tokio::task::yield_now().await;
1411        let guard = self.store.write().await;
1412        #[cfg(test)]
1413        tokio::task::yield_now().await;
1414        StoreWriteGuard {
1415            guard,
1416            _not_send: PhantomData,
1417        }
1418    }
1419}
1420
1421impl<'a> Deref for StoreReadGuard<'a> {
1422    type Target = Store;
1423
1424    fn deref(&self) -> &Self::Target {
1425        &*self.guard
1426    }
1427}
1428
1429impl<'a> Deref for StoreWriteGuard<'a> {
1430    type Target = Store;
1431
1432    fn deref(&self) -> &Self::Target {
1433        &*self.guard
1434    }
1435}
1436
1437impl<'a> DerefMut for StoreWriteGuard<'a> {
1438    fn deref_mut(&mut self) -> &mut Self::Target {
1439        &mut *self.guard
1440    }
1441}
1442
1443impl<'a> Drop for StoreWriteGuard<'a> {
1444    fn drop(&mut self) {
1445        #[cfg(test)]
1446        self.check_invariants();
1447
1448        let metrics = self.metrics();
1449        tracing::info!(
1450            connections = metrics.connections,
1451            registered_projects = metrics.registered_projects,
1452            shared_projects = metrics.shared_projects,
1453            "metrics"
1454        );
1455    }
1456}
1457
1458impl Executor for RealExecutor {
1459    type Sleep = Sleep;
1460
1461    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1462        tokio::task::spawn(future);
1463    }
1464
1465    fn sleep(&self, duration: Duration) -> Self::Sleep {
1466        tokio::time::sleep(duration)
1467    }
1468}
1469
1470fn broadcast<F>(
1471    sender_id: ConnectionId,
1472    receiver_ids: impl IntoIterator<Item = ConnectionId>,
1473    mut f: F,
1474) where
1475    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1476{
1477    for receiver_id in receiver_ids {
1478        if receiver_id != sender_id {
1479            f(receiver_id).trace_err();
1480        }
1481    }
1482}
1483
1484lazy_static! {
1485    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1486}
1487
1488pub struct ProtocolVersion(u32);
1489
1490impl Header for ProtocolVersion {
1491    fn name() -> &'static HeaderName {
1492        &ZED_PROTOCOL_VERSION
1493    }
1494
1495    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1496    where
1497        Self: Sized,
1498        I: Iterator<Item = &'i axum::http::HeaderValue>,
1499    {
1500        let version = values
1501            .next()
1502            .ok_or_else(|| axum::headers::Error::invalid())?
1503            .to_str()
1504            .map_err(|_| axum::headers::Error::invalid())?
1505            .parse()
1506            .map_err(|_| axum::headers::Error::invalid())?;
1507        Ok(Self(version))
1508    }
1509
1510    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1511        values.extend([self.0.to_string().parse().unwrap()]);
1512    }
1513}
1514
1515pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1516    let server = Server::new(app_state.clone(), None);
1517    Router::new()
1518        .route("/rpc", get(handle_websocket_request))
1519        .layer(
1520            ServiceBuilder::new()
1521                .layer(Extension(app_state))
1522                .layer(middleware::from_fn(auth::validate_header))
1523                .layer(Extension(server)),
1524        )
1525}
1526
1527pub async fn handle_websocket_request(
1528    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1529    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1530    Extension(server): Extension<Arc<Server>>,
1531    Extension(user_id): Extension<UserId>,
1532    ws: WebSocketUpgrade,
1533) -> axum::response::Response {
1534    if protocol_version != rpc::PROTOCOL_VERSION {
1535        return (
1536            StatusCode::UPGRADE_REQUIRED,
1537            "client must be upgraded".to_string(),
1538        )
1539            .into_response();
1540    }
1541    let socket_address = socket_address.to_string();
1542    ws.on_upgrade(move |socket| {
1543        use util::ResultExt;
1544        let socket = socket
1545            .map_ok(to_tungstenite_message)
1546            .err_into()
1547            .with(|message| async move { Ok(to_axum_message(message)) });
1548        let connection = Connection::new(Box::pin(socket));
1549        async move {
1550            server
1551                .handle_connection(connection, socket_address, user_id, None, RealExecutor)
1552                .await
1553                .log_err();
1554        }
1555    })
1556}
1557
1558fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1559    match message {
1560        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1561        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1562        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1563        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1564        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1565            code: frame.code.into(),
1566            reason: frame.reason,
1567        })),
1568    }
1569}
1570
1571fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1572    match message {
1573        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1574        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1575        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1576        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1577        AxumMessage::Close(frame) => {
1578            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1579                code: frame.code.into(),
1580                reason: frame.reason,
1581            }))
1582        }
1583    }
1584}
1585
1586pub trait ResultExt {
1587    type Ok;
1588
1589    fn trace_err(self) -> Option<Self::Ok>;
1590}
1591
1592impl<T, E> ResultExt for Result<T, E>
1593where
1594    E: std::fmt::Debug,
1595{
1596    type Ok = T;
1597
1598    fn trace_err(self) -> Option<T> {
1599        match self {
1600            Ok(value) => Some(value),
1601            Err(error) => {
1602                tracing::error!("{:?}", error);
1603                None
1604            }
1605        }
1606    }
1607}
1608
1609#[cfg(test)]
1610mod tests {
1611    use super::*;
1612    use crate::{
1613        db::{tests::TestDb, UserId},
1614        AppState,
1615    };
1616    use ::rpc::Peer;
1617    use client::{
1618        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1619        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1620    };
1621    use collections::{BTreeMap, HashSet};
1622    use editor::{
1623        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1624        ToOffset, ToggleCodeActions, Undo,
1625    };
1626    use gpui::{
1627        executor::{self, Deterministic},
1628        geometry::vector::vec2f,
1629        ModelHandle, TestAppContext, ViewHandle,
1630    };
1631    use language::{
1632        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1633        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1634    };
1635    use lsp::{self, FakeLanguageServer};
1636    use parking_lot::Mutex;
1637    use project::{
1638        fs::{FakeFs, Fs as _},
1639        search::SearchQuery,
1640        worktree::WorktreeHandle,
1641        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1642    };
1643    use rand::prelude::*;
1644    use rpc::PeerId;
1645    use serde_json::json;
1646    use settings::Settings;
1647    use sqlx::types::time::OffsetDateTime;
1648    use std::{
1649        cell::RefCell,
1650        env,
1651        ops::Deref,
1652        path::{Path, PathBuf},
1653        rc::Rc,
1654        sync::{
1655            atomic::{AtomicBool, Ordering::SeqCst},
1656            Arc,
1657        },
1658        time::Duration,
1659    };
1660    use theme::ThemeRegistry;
1661    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1662
1663    #[cfg(test)]
1664    #[ctor::ctor]
1665    fn init_logger() {
1666        if std::env::var("RUST_LOG").is_ok() {
1667            env_logger::init();
1668        }
1669    }
1670
1671    #[gpui::test(iterations = 10)]
1672    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1673        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1674        let lang_registry = Arc::new(LanguageRegistry::test());
1675        let fs = FakeFs::new(cx_a.background());
1676        cx_a.foreground().forbid_parking();
1677
1678        // Connect to a server as 2 clients.
1679        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1680        let client_a = server.create_client(cx_a, "user_a").await;
1681        let mut client_b = server.create_client(cx_b, "user_b").await;
1682        server
1683            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1684            .await;
1685
1686        // Share a project as client A
1687        fs.insert_tree(
1688            "/a",
1689            json!({
1690                "a.txt": "a-contents",
1691                "b.txt": "b-contents",
1692            }),
1693        )
1694        .await;
1695        let project_a = cx_a.update(|cx| {
1696            Project::local(
1697                client_a.clone(),
1698                client_a.user_store.clone(),
1699                lang_registry.clone(),
1700                fs.clone(),
1701                cx,
1702            )
1703        });
1704        let (worktree_a, _) = project_a
1705            .update(cx_a, |p, cx| {
1706                p.find_or_create_local_worktree("/a", true, cx)
1707            })
1708            .await
1709            .unwrap();
1710        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1711        worktree_a
1712            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1713            .await;
1714
1715        // Join that project as client B
1716        let client_b_peer_id = client_b.peer_id;
1717        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1718
1719        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1720            assert_eq!(
1721                project
1722                    .collaborators()
1723                    .get(&client_a.peer_id)
1724                    .unwrap()
1725                    .user
1726                    .github_login,
1727                "user_a"
1728            );
1729            project.replica_id()
1730        });
1731        project_a
1732            .condition(&cx_a, |tree, _| {
1733                tree.collaborators()
1734                    .get(&client_b_peer_id)
1735                    .map_or(false, |collaborator| {
1736                        collaborator.replica_id == replica_id_b
1737                            && collaborator.user.github_login == "user_b"
1738                    })
1739            })
1740            .await;
1741
1742        // Open the same file as client B and client A.
1743        let buffer_b = project_b
1744            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1745            .await
1746            .unwrap();
1747        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1748        project_a.read_with(cx_a, |project, cx| {
1749            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1750        });
1751        let buffer_a = project_a
1752            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1753            .await
1754            .unwrap();
1755
1756        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1757
1758        // TODO
1759        // // Create a selection set as client B and see that selection set as client A.
1760        // buffer_a
1761        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1762        //     .await;
1763
1764        // Edit the buffer as client B and see that edit as client A.
1765        editor_b.update(cx_b, |editor, cx| {
1766            editor.handle_input(&Input("ok, ".into()), cx)
1767        });
1768        buffer_a
1769            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1770            .await;
1771
1772        // TODO
1773        // // Remove the selection set as client B, see those selections disappear as client A.
1774        cx_b.update(move |_| drop(editor_b));
1775        // buffer_a
1776        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1777        //     .await;
1778
1779        // Dropping the client B's project removes client B from client A's collaborators.
1780        cx_b.update(move |_| {
1781            drop(client_b.project.take());
1782            drop(project_b);
1783        });
1784        project_a
1785            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1786            .await;
1787    }
1788
1789    #[gpui::test(iterations = 10)]
1790    async fn test_unshare_project(
1791        deterministic: Arc<Deterministic>,
1792        cx_a: &mut TestAppContext,
1793        cx_b: &mut TestAppContext,
1794    ) {
1795        let lang_registry = Arc::new(LanguageRegistry::test());
1796        let fs = FakeFs::new(cx_a.background());
1797        cx_a.foreground().forbid_parking();
1798
1799        // Connect to a server as 2 clients.
1800        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1801        let client_a = server.create_client(cx_a, "user_a").await;
1802        let mut client_b = server.create_client(cx_b, "user_b").await;
1803        server
1804            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1805            .await;
1806
1807        // Share a project as client A
1808        fs.insert_tree(
1809            "/a",
1810            json!({
1811                "a.txt": "a-contents",
1812                "b.txt": "b-contents",
1813            }),
1814        )
1815        .await;
1816        let project_a = cx_a.update(|cx| {
1817            Project::local(
1818                client_a.clone(),
1819                client_a.user_store.clone(),
1820                lang_registry.clone(),
1821                fs.clone(),
1822                cx,
1823            )
1824        });
1825        let (worktree_a, _) = project_a
1826            .update(cx_a, |p, cx| {
1827                p.find_or_create_local_worktree("/a", true, cx)
1828            })
1829            .await
1830            .unwrap();
1831        worktree_a
1832            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1833            .await;
1834        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1835
1836        // Join that project as client B
1837        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1838        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1839        project_b
1840            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1841            .await
1842            .unwrap();
1843
1844        // When client B leaves the project, it gets automatically unshared.
1845        cx_b.update(|_| {
1846            drop(client_b.project.take());
1847            drop(project_b);
1848        });
1849        deterministic.run_until_parked();
1850        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1851
1852        // When client B joins again, the project gets re-shared.
1853        let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1854        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1855        project_b2
1856            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1857            .await
1858            .unwrap();
1859    }
1860
1861    #[gpui::test(iterations = 10)]
1862    async fn test_host_disconnect(
1863        cx_a: &mut TestAppContext,
1864        cx_b: &mut TestAppContext,
1865        cx_c: &mut TestAppContext,
1866    ) {
1867        let lang_registry = Arc::new(LanguageRegistry::test());
1868        let fs = FakeFs::new(cx_a.background());
1869        cx_a.foreground().forbid_parking();
1870
1871        // Connect to a server as 3 clients.
1872        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1873        let client_a = server.create_client(cx_a, "user_a").await;
1874        let mut client_b = server.create_client(cx_b, "user_b").await;
1875        let client_c = server.create_client(cx_c, "user_c").await;
1876        server
1877            .make_contacts(vec![
1878                (&client_a, cx_a),
1879                (&client_b, cx_b),
1880                (&client_c, cx_c),
1881            ])
1882            .await;
1883
1884        // Share a project as client A
1885        fs.insert_tree(
1886            "/a",
1887            json!({
1888                "a.txt": "a-contents",
1889                "b.txt": "b-contents",
1890            }),
1891        )
1892        .await;
1893        let project_a = cx_a.update(|cx| {
1894            Project::local(
1895                client_a.clone(),
1896                client_a.user_store.clone(),
1897                lang_registry.clone(),
1898                fs.clone(),
1899                cx,
1900            )
1901        });
1902        let project_id = project_a
1903            .read_with(cx_a, |project, _| project.next_remote_id())
1904            .await;
1905        let (worktree_a, _) = project_a
1906            .update(cx_a, |p, cx| {
1907                p.find_or_create_local_worktree("/a", true, cx)
1908            })
1909            .await
1910            .unwrap();
1911        worktree_a
1912            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1913            .await;
1914        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1915
1916        // Join that project as client B
1917        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1918        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1919        project_b
1920            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1921            .await
1922            .unwrap();
1923
1924        // Request to join that project as client C
1925        let project_c = cx_c.spawn(|mut cx| {
1926            let client = client_c.client.clone();
1927            let user_store = client_c.user_store.clone();
1928            let lang_registry = lang_registry.clone();
1929            async move {
1930                Project::remote(
1931                    project_id,
1932                    client,
1933                    user_store,
1934                    lang_registry.clone(),
1935                    FakeFs::new(cx.background()),
1936                    &mut cx,
1937                )
1938                .await
1939            }
1940        });
1941
1942        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1943        server.disconnect_client(client_a.current_user_id(cx_a));
1944        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1945        project_a
1946            .condition(cx_a, |project, _| project.collaborators().is_empty())
1947            .await;
1948        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1949        project_b
1950            .condition(cx_b, |project, _| project.is_read_only())
1951            .await;
1952        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1953        cx_b.update(|_| {
1954            drop(project_b);
1955        });
1956        assert!(matches!(
1957            project_c.await.unwrap_err(),
1958            project::JoinProjectError::HostWentOffline
1959        ));
1960
1961        // Ensure guests can still join.
1962        let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1963        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1964        project_b2
1965            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1966            .await
1967            .unwrap();
1968    }
1969
1970    #[gpui::test(iterations = 10)]
1971    async fn test_decline_join_request(
1972        deterministic: Arc<Deterministic>,
1973        cx_a: &mut TestAppContext,
1974        cx_b: &mut TestAppContext,
1975    ) {
1976        let lang_registry = Arc::new(LanguageRegistry::test());
1977        let fs = FakeFs::new(cx_a.background());
1978        cx_a.foreground().forbid_parking();
1979
1980        // Connect to a server as 2 clients.
1981        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1982        let client_a = server.create_client(cx_a, "user_a").await;
1983        let client_b = server.create_client(cx_b, "user_b").await;
1984        server
1985            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1986            .await;
1987
1988        // Share a project as client A
1989        fs.insert_tree("/a", json!({})).await;
1990        let project_a = cx_a.update(|cx| {
1991            Project::local(
1992                client_a.clone(),
1993                client_a.user_store.clone(),
1994                lang_registry.clone(),
1995                fs.clone(),
1996                cx,
1997            )
1998        });
1999        let project_id = project_a
2000            .read_with(cx_a, |project, _| project.next_remote_id())
2001            .await;
2002        let (worktree_a, _) = project_a
2003            .update(cx_a, |p, cx| {
2004                p.find_or_create_local_worktree("/a", true, cx)
2005            })
2006            .await
2007            .unwrap();
2008        worktree_a
2009            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2010            .await;
2011
2012        // Request to join that project as client B
2013        let project_b = cx_b.spawn(|mut cx| {
2014            let client = client_b.client.clone();
2015            let user_store = client_b.user_store.clone();
2016            let lang_registry = lang_registry.clone();
2017            async move {
2018                Project::remote(
2019                    project_id,
2020                    client,
2021                    user_store,
2022                    lang_registry.clone(),
2023                    FakeFs::new(cx.background()),
2024                    &mut cx,
2025                )
2026                .await
2027            }
2028        });
2029        deterministic.run_until_parked();
2030        project_a.update(cx_a, |project, cx| {
2031            project.respond_to_join_request(client_b.user_id().unwrap(), false, cx)
2032        });
2033        assert!(matches!(
2034            project_b.await.unwrap_err(),
2035            project::JoinProjectError::HostDeclined
2036        ));
2037
2038        // Request to join the project again as client B
2039        let project_b = cx_b.spawn(|mut cx| {
2040            let client = client_b.client.clone();
2041            let user_store = client_b.user_store.clone();
2042            let lang_registry = lang_registry.clone();
2043            async move {
2044                Project::remote(
2045                    project_id,
2046                    client,
2047                    user_store,
2048                    lang_registry.clone(),
2049                    FakeFs::new(cx.background()),
2050                    &mut cx,
2051                )
2052                .await
2053            }
2054        });
2055
2056        // Close the project on the host
2057        deterministic.run_until_parked();
2058        cx_a.update(|_| drop(project_a));
2059        deterministic.run_until_parked();
2060        assert!(matches!(
2061            project_b.await.unwrap_err(),
2062            project::JoinProjectError::HostClosedProject
2063        ));
2064    }
2065
2066    #[gpui::test(iterations = 10)]
2067    async fn test_cancel_join_request(
2068        deterministic: Arc<Deterministic>,
2069        cx_a: &mut TestAppContext,
2070        cx_b: &mut TestAppContext,
2071    ) {
2072        let lang_registry = Arc::new(LanguageRegistry::test());
2073        let fs = FakeFs::new(cx_a.background());
2074        cx_a.foreground().forbid_parking();
2075
2076        // Connect to a server as 2 clients.
2077        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2078        let client_a = server.create_client(cx_a, "user_a").await;
2079        let client_b = server.create_client(cx_b, "user_b").await;
2080        server
2081            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2082            .await;
2083
2084        // Share a project as client A
2085        fs.insert_tree("/a", json!({})).await;
2086        let project_a = cx_a.update(|cx| {
2087            Project::local(
2088                client_a.clone(),
2089                client_a.user_store.clone(),
2090                lang_registry.clone(),
2091                fs.clone(),
2092                cx,
2093            )
2094        });
2095        let project_id = project_a
2096            .read_with(cx_a, |project, _| project.next_remote_id())
2097            .await;
2098
2099        let project_a_events = Rc::new(RefCell::new(Vec::new()));
2100        let user_b = client_a
2101            .user_store
2102            .update(cx_a, |store, cx| {
2103                store.fetch_user(client_b.user_id().unwrap(), cx)
2104            })
2105            .await
2106            .unwrap();
2107        project_a.update(cx_a, {
2108            let project_a_events = project_a_events.clone();
2109            move |_, cx| {
2110                cx.subscribe(&cx.handle(), move |_, _, event, _| {
2111                    project_a_events.borrow_mut().push(event.clone());
2112                })
2113                .detach();
2114            }
2115        });
2116
2117        let (worktree_a, _) = project_a
2118            .update(cx_a, |p, cx| {
2119                p.find_or_create_local_worktree("/a", true, cx)
2120            })
2121            .await
2122            .unwrap();
2123        worktree_a
2124            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2125            .await;
2126
2127        // Request to join that project as client B
2128        let project_b = cx_b.spawn(|mut cx| {
2129            let client = client_b.client.clone();
2130            let user_store = client_b.user_store.clone();
2131            let lang_registry = lang_registry.clone();
2132            async move {
2133                Project::remote(
2134                    project_id,
2135                    client,
2136                    user_store,
2137                    lang_registry.clone(),
2138                    FakeFs::new(cx.background()),
2139                    &mut cx,
2140                )
2141                .await
2142            }
2143        });
2144        deterministic.run_until_parked();
2145        assert_eq!(
2146            &*project_a_events.borrow(),
2147            &[project::Event::ContactRequestedJoin(user_b.clone())]
2148        );
2149        project_a_events.borrow_mut().clear();
2150
2151        // Cancel the join request by leaving the project
2152        client_b
2153            .client
2154            .send(proto::LeaveProject { project_id })
2155            .unwrap();
2156        drop(project_b);
2157
2158        deterministic.run_until_parked();
2159        assert_eq!(
2160            &*project_a_events.borrow(),
2161            &[project::Event::ContactCancelledJoinRequest(user_b.clone())]
2162        );
2163    }
2164
2165    #[gpui::test(iterations = 10)]
2166    async fn test_propagate_saves_and_fs_changes(
2167        cx_a: &mut TestAppContext,
2168        cx_b: &mut TestAppContext,
2169        cx_c: &mut TestAppContext,
2170    ) {
2171        let lang_registry = Arc::new(LanguageRegistry::test());
2172        let fs = FakeFs::new(cx_a.background());
2173        cx_a.foreground().forbid_parking();
2174
2175        // Connect to a server as 3 clients.
2176        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2177        let client_a = server.create_client(cx_a, "user_a").await;
2178        let mut client_b = server.create_client(cx_b, "user_b").await;
2179        let mut client_c = server.create_client(cx_c, "user_c").await;
2180        server
2181            .make_contacts(vec![
2182                (&client_a, cx_a),
2183                (&client_b, cx_b),
2184                (&client_c, cx_c),
2185            ])
2186            .await;
2187
2188        // Share a worktree as client A.
2189        fs.insert_tree(
2190            "/a",
2191            json!({
2192                "file1": "",
2193                "file2": ""
2194            }),
2195        )
2196        .await;
2197        let project_a = cx_a.update(|cx| {
2198            Project::local(
2199                client_a.clone(),
2200                client_a.user_store.clone(),
2201                lang_registry.clone(),
2202                fs.clone(),
2203                cx,
2204            )
2205        });
2206        let (worktree_a, _) = project_a
2207            .update(cx_a, |p, cx| {
2208                p.find_or_create_local_worktree("/a", true, cx)
2209            })
2210            .await
2211            .unwrap();
2212        worktree_a
2213            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2214            .await;
2215        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2216
2217        // Join that worktree as clients B and C.
2218        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2219        let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
2220        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2221        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
2222
2223        // Open and edit a buffer as both guests B and C.
2224        let buffer_b = project_b
2225            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2226            .await
2227            .unwrap();
2228        let buffer_c = project_c
2229            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2230            .await
2231            .unwrap();
2232        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
2233        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
2234
2235        // Open and edit that buffer as the host.
2236        let buffer_a = project_a
2237            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2238            .await
2239            .unwrap();
2240
2241        buffer_a
2242            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
2243            .await;
2244        buffer_a.update(cx_a, |buf, cx| {
2245            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
2246        });
2247
2248        // Wait for edits to propagate
2249        buffer_a
2250            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2251            .await;
2252        buffer_b
2253            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2254            .await;
2255        buffer_c
2256            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2257            .await;
2258
2259        // Edit the buffer as the host and concurrently save as guest B.
2260        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2261        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2262        save_b.await.unwrap();
2263        assert_eq!(
2264            fs.load("/a/file1".as_ref()).await.unwrap(),
2265            "hi-a, i-am-c, i-am-b, i-am-a"
2266        );
2267        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2268        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2269        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2270
2271        worktree_a.flush_fs_events(cx_a).await;
2272
2273        // Make changes on host's file system, see those changes on guest worktrees.
2274        fs.rename(
2275            "/a/file1".as_ref(),
2276            "/a/file1-renamed".as_ref(),
2277            Default::default(),
2278        )
2279        .await
2280        .unwrap();
2281
2282        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2283            .await
2284            .unwrap();
2285        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2286
2287        worktree_a
2288            .condition(&cx_a, |tree, _| {
2289                tree.paths()
2290                    .map(|p| p.to_string_lossy())
2291                    .collect::<Vec<_>>()
2292                    == ["file1-renamed", "file3", "file4"]
2293            })
2294            .await;
2295        worktree_b
2296            .condition(&cx_b, |tree, _| {
2297                tree.paths()
2298                    .map(|p| p.to_string_lossy())
2299                    .collect::<Vec<_>>()
2300                    == ["file1-renamed", "file3", "file4"]
2301            })
2302            .await;
2303        worktree_c
2304            .condition(&cx_c, |tree, _| {
2305                tree.paths()
2306                    .map(|p| p.to_string_lossy())
2307                    .collect::<Vec<_>>()
2308                    == ["file1-renamed", "file3", "file4"]
2309            })
2310            .await;
2311
2312        // Ensure buffer files are updated as well.
2313        buffer_a
2314            .condition(&cx_a, |buf, _| {
2315                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2316            })
2317            .await;
2318        buffer_b
2319            .condition(&cx_b, |buf, _| {
2320                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2321            })
2322            .await;
2323        buffer_c
2324            .condition(&cx_c, |buf, _| {
2325                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2326            })
2327            .await;
2328    }
2329
2330    #[gpui::test(iterations = 10)]
2331    async fn test_fs_operations(
2332        executor: Arc<Deterministic>,
2333        cx_a: &mut TestAppContext,
2334        cx_b: &mut TestAppContext,
2335    ) {
2336        executor.forbid_parking();
2337        let fs = FakeFs::new(cx_a.background());
2338
2339        // Connect to a server as 2 clients.
2340        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2341        let mut client_a = server.create_client(cx_a, "user_a").await;
2342        let mut client_b = server.create_client(cx_b, "user_b").await;
2343        server
2344            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2345            .await;
2346
2347        // Share a project as client A
2348        fs.insert_tree(
2349            "/dir",
2350            json!({
2351                "a.txt": "a-contents",
2352                "b.txt": "b-contents",
2353            }),
2354        )
2355        .await;
2356
2357        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2358        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2359
2360        let worktree_a =
2361            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2362        let worktree_b =
2363            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2364
2365        let entry = project_b
2366            .update(cx_b, |project, cx| {
2367                project
2368                    .create_entry((worktree_id, "c.txt"), false, cx)
2369                    .unwrap()
2370            })
2371            .await
2372            .unwrap();
2373        worktree_a.read_with(cx_a, |worktree, _| {
2374            assert_eq!(
2375                worktree
2376                    .paths()
2377                    .map(|p| p.to_string_lossy())
2378                    .collect::<Vec<_>>(),
2379                ["a.txt", "b.txt", "c.txt"]
2380            );
2381        });
2382        worktree_b.read_with(cx_b, |worktree, _| {
2383            assert_eq!(
2384                worktree
2385                    .paths()
2386                    .map(|p| p.to_string_lossy())
2387                    .collect::<Vec<_>>(),
2388                ["a.txt", "b.txt", "c.txt"]
2389            );
2390        });
2391
2392        project_b
2393            .update(cx_b, |project, cx| {
2394                project.rename_entry(entry.id, Path::new("d.txt"), cx)
2395            })
2396            .unwrap()
2397            .await
2398            .unwrap();
2399        worktree_a.read_with(cx_a, |worktree, _| {
2400            assert_eq!(
2401                worktree
2402                    .paths()
2403                    .map(|p| p.to_string_lossy())
2404                    .collect::<Vec<_>>(),
2405                ["a.txt", "b.txt", "d.txt"]
2406            );
2407        });
2408        worktree_b.read_with(cx_b, |worktree, _| {
2409            assert_eq!(
2410                worktree
2411                    .paths()
2412                    .map(|p| p.to_string_lossy())
2413                    .collect::<Vec<_>>(),
2414                ["a.txt", "b.txt", "d.txt"]
2415            );
2416        });
2417
2418        let dir_entry = project_b
2419            .update(cx_b, |project, cx| {
2420                project
2421                    .create_entry((worktree_id, "DIR"), true, cx)
2422                    .unwrap()
2423            })
2424            .await
2425            .unwrap();
2426        worktree_a.read_with(cx_a, |worktree, _| {
2427            assert_eq!(
2428                worktree
2429                    .paths()
2430                    .map(|p| p.to_string_lossy())
2431                    .collect::<Vec<_>>(),
2432                ["DIR", "a.txt", "b.txt", "d.txt"]
2433            );
2434        });
2435        worktree_b.read_with(cx_b, |worktree, _| {
2436            assert_eq!(
2437                worktree
2438                    .paths()
2439                    .map(|p| p.to_string_lossy())
2440                    .collect::<Vec<_>>(),
2441                ["DIR", "a.txt", "b.txt", "d.txt"]
2442            );
2443        });
2444
2445        project_b
2446            .update(cx_b, |project, cx| {
2447                project.delete_entry(dir_entry.id, cx).unwrap()
2448            })
2449            .await
2450            .unwrap();
2451        worktree_a.read_with(cx_a, |worktree, _| {
2452            assert_eq!(
2453                worktree
2454                    .paths()
2455                    .map(|p| p.to_string_lossy())
2456                    .collect::<Vec<_>>(),
2457                ["a.txt", "b.txt", "d.txt"]
2458            );
2459        });
2460        worktree_b.read_with(cx_b, |worktree, _| {
2461            assert_eq!(
2462                worktree
2463                    .paths()
2464                    .map(|p| p.to_string_lossy())
2465                    .collect::<Vec<_>>(),
2466                ["a.txt", "b.txt", "d.txt"]
2467            );
2468        });
2469
2470        project_b
2471            .update(cx_b, |project, cx| {
2472                project.delete_entry(entry.id, cx).unwrap()
2473            })
2474            .await
2475            .unwrap();
2476        worktree_a.read_with(cx_a, |worktree, _| {
2477            assert_eq!(
2478                worktree
2479                    .paths()
2480                    .map(|p| p.to_string_lossy())
2481                    .collect::<Vec<_>>(),
2482                ["a.txt", "b.txt"]
2483            );
2484        });
2485        worktree_b.read_with(cx_b, |worktree, _| {
2486            assert_eq!(
2487                worktree
2488                    .paths()
2489                    .map(|p| p.to_string_lossy())
2490                    .collect::<Vec<_>>(),
2491                ["a.txt", "b.txt"]
2492            );
2493        });
2494    }
2495
2496    #[gpui::test(iterations = 10)]
2497    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2498        cx_a.foreground().forbid_parking();
2499        let lang_registry = Arc::new(LanguageRegistry::test());
2500        let fs = FakeFs::new(cx_a.background());
2501
2502        // Connect to a server as 2 clients.
2503        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2504        let client_a = server.create_client(cx_a, "user_a").await;
2505        let mut client_b = server.create_client(cx_b, "user_b").await;
2506        server
2507            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2508            .await;
2509
2510        // Share a project as client A
2511        fs.insert_tree(
2512            "/dir",
2513            json!({
2514                "a.txt": "a-contents",
2515            }),
2516        )
2517        .await;
2518
2519        let project_a = cx_a.update(|cx| {
2520            Project::local(
2521                client_a.clone(),
2522                client_a.user_store.clone(),
2523                lang_registry.clone(),
2524                fs.clone(),
2525                cx,
2526            )
2527        });
2528        let (worktree_a, _) = project_a
2529            .update(cx_a, |p, cx| {
2530                p.find_or_create_local_worktree("/dir", true, cx)
2531            })
2532            .await
2533            .unwrap();
2534        worktree_a
2535            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2536            .await;
2537        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2538
2539        // Join that project as client B
2540        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2541
2542        // Open a buffer as client B
2543        let buffer_b = project_b
2544            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2545            .await
2546            .unwrap();
2547
2548        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2549        buffer_b.read_with(cx_b, |buf, _| {
2550            assert!(buf.is_dirty());
2551            assert!(!buf.has_conflict());
2552        });
2553
2554        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2555        buffer_b
2556            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2557            .await;
2558        buffer_b.read_with(cx_b, |buf, _| {
2559            assert!(!buf.has_conflict());
2560        });
2561
2562        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2563        buffer_b.read_with(cx_b, |buf, _| {
2564            assert!(buf.is_dirty());
2565            assert!(!buf.has_conflict());
2566        });
2567    }
2568
2569    #[gpui::test(iterations = 10)]
2570    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2571        cx_a.foreground().forbid_parking();
2572        let lang_registry = Arc::new(LanguageRegistry::test());
2573        let fs = FakeFs::new(cx_a.background());
2574
2575        // Connect to a server as 2 clients.
2576        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2577        let client_a = server.create_client(cx_a, "user_a").await;
2578        let mut client_b = server.create_client(cx_b, "user_b").await;
2579        server
2580            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2581            .await;
2582
2583        // Share a project as client A
2584        fs.insert_tree(
2585            "/dir",
2586            json!({
2587                "a.txt": "a-contents",
2588            }),
2589        )
2590        .await;
2591
2592        let project_a = cx_a.update(|cx| {
2593            Project::local(
2594                client_a.clone(),
2595                client_a.user_store.clone(),
2596                lang_registry.clone(),
2597                fs.clone(),
2598                cx,
2599            )
2600        });
2601        let (worktree_a, _) = project_a
2602            .update(cx_a, |p, cx| {
2603                p.find_or_create_local_worktree("/dir", true, cx)
2604            })
2605            .await
2606            .unwrap();
2607        worktree_a
2608            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2609            .await;
2610        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2611
2612        // Join that project as client B
2613        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2614        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2615
2616        // Open a buffer as client B
2617        let buffer_b = project_b
2618            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2619            .await
2620            .unwrap();
2621        buffer_b.read_with(cx_b, |buf, _| {
2622            assert!(!buf.is_dirty());
2623            assert!(!buf.has_conflict());
2624        });
2625
2626        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2627            .await
2628            .unwrap();
2629        buffer_b
2630            .condition(&cx_b, |buf, _| {
2631                buf.text() == "new contents" && !buf.is_dirty()
2632            })
2633            .await;
2634        buffer_b.read_with(cx_b, |buf, _| {
2635            assert!(!buf.has_conflict());
2636        });
2637    }
2638
2639    #[gpui::test(iterations = 10)]
2640    async fn test_editing_while_guest_opens_buffer(
2641        cx_a: &mut TestAppContext,
2642        cx_b: &mut TestAppContext,
2643    ) {
2644        cx_a.foreground().forbid_parking();
2645        let lang_registry = Arc::new(LanguageRegistry::test());
2646        let fs = FakeFs::new(cx_a.background());
2647
2648        // Connect to a server as 2 clients.
2649        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2650        let client_a = server.create_client(cx_a, "user_a").await;
2651        let mut client_b = server.create_client(cx_b, "user_b").await;
2652        server
2653            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2654            .await;
2655
2656        // Share a project as client A
2657        fs.insert_tree(
2658            "/dir",
2659            json!({
2660                "a.txt": "a-contents",
2661            }),
2662        )
2663        .await;
2664        let project_a = cx_a.update(|cx| {
2665            Project::local(
2666                client_a.clone(),
2667                client_a.user_store.clone(),
2668                lang_registry.clone(),
2669                fs.clone(),
2670                cx,
2671            )
2672        });
2673        let (worktree_a, _) = project_a
2674            .update(cx_a, |p, cx| {
2675                p.find_or_create_local_worktree("/dir", true, cx)
2676            })
2677            .await
2678            .unwrap();
2679        worktree_a
2680            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2681            .await;
2682        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2683
2684        // Join that project as client B
2685        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2686
2687        // Open a buffer as client A
2688        let buffer_a = project_a
2689            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2690            .await
2691            .unwrap();
2692
2693        // Start opening the same buffer as client B
2694        let buffer_b = cx_b
2695            .background()
2696            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2697
2698        // Edit the buffer as client A while client B is still opening it.
2699        cx_b.background().simulate_random_delay().await;
2700        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2701        cx_b.background().simulate_random_delay().await;
2702        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2703
2704        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2705        let buffer_b = buffer_b.await.unwrap();
2706        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2707    }
2708
2709    #[gpui::test(iterations = 10)]
2710    async fn test_leaving_worktree_while_opening_buffer(
2711        cx_a: &mut TestAppContext,
2712        cx_b: &mut TestAppContext,
2713    ) {
2714        cx_a.foreground().forbid_parking();
2715        let lang_registry = Arc::new(LanguageRegistry::test());
2716        let fs = FakeFs::new(cx_a.background());
2717
2718        // Connect to a server as 2 clients.
2719        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2720        let client_a = server.create_client(cx_a, "user_a").await;
2721        let mut client_b = server.create_client(cx_b, "user_b").await;
2722        server
2723            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2724            .await;
2725
2726        // Share a project as client A
2727        fs.insert_tree(
2728            "/dir",
2729            json!({
2730                "a.txt": "a-contents",
2731            }),
2732        )
2733        .await;
2734        let project_a = cx_a.update(|cx| {
2735            Project::local(
2736                client_a.clone(),
2737                client_a.user_store.clone(),
2738                lang_registry.clone(),
2739                fs.clone(),
2740                cx,
2741            )
2742        });
2743        let (worktree_a, _) = project_a
2744            .update(cx_a, |p, cx| {
2745                p.find_or_create_local_worktree("/dir", true, cx)
2746            })
2747            .await
2748            .unwrap();
2749        worktree_a
2750            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2751            .await;
2752        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2753
2754        // Join that project as client B
2755        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2756
2757        // See that a guest has joined as client A.
2758        project_a
2759            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2760            .await;
2761
2762        // Begin opening a buffer as client B, but leave the project before the open completes.
2763        let buffer_b = cx_b
2764            .background()
2765            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2766        cx_b.update(|_| {
2767            drop(client_b.project.take());
2768            drop(project_b);
2769        });
2770        drop(buffer_b);
2771
2772        // See that the guest has left.
2773        project_a
2774            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2775            .await;
2776    }
2777
2778    #[gpui::test(iterations = 10)]
2779    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2780        cx_a.foreground().forbid_parking();
2781        let lang_registry = Arc::new(LanguageRegistry::test());
2782        let fs = FakeFs::new(cx_a.background());
2783
2784        // Connect to a server as 2 clients.
2785        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2786        let client_a = server.create_client(cx_a, "user_a").await;
2787        let mut client_b = server.create_client(cx_b, "user_b").await;
2788        server
2789            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2790            .await;
2791
2792        // Share a project as client A
2793        fs.insert_tree(
2794            "/a",
2795            json!({
2796                "a.txt": "a-contents",
2797                "b.txt": "b-contents",
2798            }),
2799        )
2800        .await;
2801        let project_a = cx_a.update(|cx| {
2802            Project::local(
2803                client_a.clone(),
2804                client_a.user_store.clone(),
2805                lang_registry.clone(),
2806                fs.clone(),
2807                cx,
2808            )
2809        });
2810        let (worktree_a, _) = project_a
2811            .update(cx_a, |p, cx| {
2812                p.find_or_create_local_worktree("/a", true, cx)
2813            })
2814            .await
2815            .unwrap();
2816        worktree_a
2817            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2818            .await;
2819
2820        // Join that project as client B
2821        let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2822
2823        // Client A sees that a guest has joined.
2824        project_a
2825            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2826            .await;
2827
2828        // Drop client B's connection and ensure client A observes client B leaving the project.
2829        client_b.disconnect(&cx_b.to_async()).unwrap();
2830        project_a
2831            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2832            .await;
2833
2834        // Rejoin the project as client B
2835        let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2836
2837        // Client A sees that a guest has re-joined.
2838        project_a
2839            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2840            .await;
2841
2842        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2843        client_b.wait_for_current_user(cx_b).await;
2844        server.disconnect_client(client_b.current_user_id(cx_b));
2845        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2846        project_a
2847            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2848            .await;
2849    }
2850
2851    #[gpui::test(iterations = 10)]
2852    async fn test_collaborating_with_diagnostics(
2853        deterministic: Arc<Deterministic>,
2854        cx_a: &mut TestAppContext,
2855        cx_b: &mut TestAppContext,
2856        cx_c: &mut TestAppContext,
2857    ) {
2858        deterministic.forbid_parking();
2859        let lang_registry = Arc::new(LanguageRegistry::test());
2860        let fs = FakeFs::new(cx_a.background());
2861
2862        // Set up a fake language server.
2863        let mut language = Language::new(
2864            LanguageConfig {
2865                name: "Rust".into(),
2866                path_suffixes: vec!["rs".to_string()],
2867                ..Default::default()
2868            },
2869            Some(tree_sitter_rust::language()),
2870        );
2871        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2872        lang_registry.add(Arc::new(language));
2873
2874        // Connect to a server as 2 clients.
2875        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2876        let client_a = server.create_client(cx_a, "user_a").await;
2877        let mut client_b = server.create_client(cx_b, "user_b").await;
2878        let mut client_c = server.create_client(cx_c, "user_c").await;
2879        server
2880            .make_contacts(vec![
2881                (&client_a, cx_a),
2882                (&client_b, cx_b),
2883                (&client_c, cx_c),
2884            ])
2885            .await;
2886
2887        // Share a project as client A
2888        fs.insert_tree(
2889            "/a",
2890            json!({
2891                "a.rs": "let one = two",
2892                "other.rs": "",
2893            }),
2894        )
2895        .await;
2896        let project_a = cx_a.update(|cx| {
2897            Project::local(
2898                client_a.clone(),
2899                client_a.user_store.clone(),
2900                lang_registry.clone(),
2901                fs.clone(),
2902                cx,
2903            )
2904        });
2905        let (worktree_a, _) = project_a
2906            .update(cx_a, |p, cx| {
2907                p.find_or_create_local_worktree("/a", true, cx)
2908            })
2909            .await
2910            .unwrap();
2911        worktree_a
2912            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2913            .await;
2914        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2915        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2916
2917        // Cause the language server to start.
2918        let _buffer = cx_a
2919            .background()
2920            .spawn(project_a.update(cx_a, |project, cx| {
2921                project.open_buffer(
2922                    ProjectPath {
2923                        worktree_id,
2924                        path: Path::new("other.rs").into(),
2925                    },
2926                    cx,
2927                )
2928            }))
2929            .await
2930            .unwrap();
2931
2932        // Join the worktree as client B.
2933        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2934
2935        // Simulate a language server reporting errors for a file.
2936        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2937        fake_language_server
2938            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2939            .await;
2940        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2941            lsp::PublishDiagnosticsParams {
2942                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2943                version: None,
2944                diagnostics: vec![lsp::Diagnostic {
2945                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2946                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2947                    message: "message 1".to_string(),
2948                    ..Default::default()
2949                }],
2950            },
2951        );
2952
2953        // Wait for server to see the diagnostics update.
2954        deterministic.run_until_parked();
2955        {
2956            let store = server.store.read().await;
2957            let project = store.project(project_id).unwrap();
2958            let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap();
2959            assert!(!worktree.diagnostic_summaries.is_empty());
2960        }
2961
2962        // Ensure client B observes the new diagnostics.
2963        project_b.read_with(cx_b, |project, cx| {
2964            assert_eq!(
2965                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2966                &[(
2967                    ProjectPath {
2968                        worktree_id,
2969                        path: Arc::from(Path::new("a.rs")),
2970                    },
2971                    DiagnosticSummary {
2972                        error_count: 1,
2973                        warning_count: 0,
2974                        ..Default::default()
2975                    },
2976                )]
2977            )
2978        });
2979
2980        // Join project as client C and observe the diagnostics.
2981        let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
2982        project_c.read_with(cx_c, |project, cx| {
2983            assert_eq!(
2984                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2985                &[(
2986                    ProjectPath {
2987                        worktree_id,
2988                        path: Arc::from(Path::new("a.rs")),
2989                    },
2990                    DiagnosticSummary {
2991                        error_count: 1,
2992                        warning_count: 0,
2993                        ..Default::default()
2994                    },
2995                )]
2996            )
2997        });
2998
2999        // Simulate a language server reporting more errors for a file.
3000        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3001            lsp::PublishDiagnosticsParams {
3002                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3003                version: None,
3004                diagnostics: vec![
3005                    lsp::Diagnostic {
3006                        severity: Some(lsp::DiagnosticSeverity::ERROR),
3007                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
3008                        message: "message 1".to_string(),
3009                        ..Default::default()
3010                    },
3011                    lsp::Diagnostic {
3012                        severity: Some(lsp::DiagnosticSeverity::WARNING),
3013                        range: lsp::Range::new(
3014                            lsp::Position::new(0, 10),
3015                            lsp::Position::new(0, 13),
3016                        ),
3017                        message: "message 2".to_string(),
3018                        ..Default::default()
3019                    },
3020                ],
3021            },
3022        );
3023
3024        // Clients B and C get the updated summaries
3025        deterministic.run_until_parked();
3026        project_b.read_with(cx_b, |project, cx| {
3027            assert_eq!(
3028                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3029                [(
3030                    ProjectPath {
3031                        worktree_id,
3032                        path: Arc::from(Path::new("a.rs")),
3033                    },
3034                    DiagnosticSummary {
3035                        error_count: 1,
3036                        warning_count: 1,
3037                        ..Default::default()
3038                    },
3039                )]
3040            );
3041        });
3042        project_c.read_with(cx_c, |project, cx| {
3043            assert_eq!(
3044                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3045                [(
3046                    ProjectPath {
3047                        worktree_id,
3048                        path: Arc::from(Path::new("a.rs")),
3049                    },
3050                    DiagnosticSummary {
3051                        error_count: 1,
3052                        warning_count: 1,
3053                        ..Default::default()
3054                    },
3055                )]
3056            );
3057        });
3058
3059        // Open the file with the errors on client B. They should be present.
3060        let buffer_b = cx_b
3061            .background()
3062            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3063            .await
3064            .unwrap();
3065
3066        buffer_b.read_with(cx_b, |buffer, _| {
3067            assert_eq!(
3068                buffer
3069                    .snapshot()
3070                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
3071                    .map(|entry| entry)
3072                    .collect::<Vec<_>>(),
3073                &[
3074                    DiagnosticEntry {
3075                        range: Point::new(0, 4)..Point::new(0, 7),
3076                        diagnostic: Diagnostic {
3077                            group_id: 0,
3078                            message: "message 1".to_string(),
3079                            severity: lsp::DiagnosticSeverity::ERROR,
3080                            is_primary: true,
3081                            ..Default::default()
3082                        }
3083                    },
3084                    DiagnosticEntry {
3085                        range: Point::new(0, 10)..Point::new(0, 13),
3086                        diagnostic: Diagnostic {
3087                            group_id: 1,
3088                            severity: lsp::DiagnosticSeverity::WARNING,
3089                            message: "message 2".to_string(),
3090                            is_primary: true,
3091                            ..Default::default()
3092                        }
3093                    }
3094                ]
3095            );
3096        });
3097
3098        // Simulate a language server reporting no errors for a file.
3099        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3100            lsp::PublishDiagnosticsParams {
3101                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3102                version: None,
3103                diagnostics: vec![],
3104            },
3105        );
3106        deterministic.run_until_parked();
3107        project_a.read_with(cx_a, |project, cx| {
3108            assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3109        });
3110        project_b.read_with(cx_b, |project, cx| {
3111            assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3112        });
3113        project_c.read_with(cx_c, |project, cx| {
3114            assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3115        });
3116    }
3117
3118    #[gpui::test(iterations = 10)]
3119    async fn test_collaborating_with_completion(
3120        cx_a: &mut TestAppContext,
3121        cx_b: &mut TestAppContext,
3122    ) {
3123        cx_a.foreground().forbid_parking();
3124        let lang_registry = Arc::new(LanguageRegistry::test());
3125        let fs = FakeFs::new(cx_a.background());
3126
3127        // Set up a fake language server.
3128        let mut language = Language::new(
3129            LanguageConfig {
3130                name: "Rust".into(),
3131                path_suffixes: vec!["rs".to_string()],
3132                ..Default::default()
3133            },
3134            Some(tree_sitter_rust::language()),
3135        );
3136        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3137            capabilities: lsp::ServerCapabilities {
3138                completion_provider: Some(lsp::CompletionOptions {
3139                    trigger_characters: Some(vec![".".to_string()]),
3140                    ..Default::default()
3141                }),
3142                ..Default::default()
3143            },
3144            ..Default::default()
3145        });
3146        lang_registry.add(Arc::new(language));
3147
3148        // Connect to a server as 2 clients.
3149        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3150        let client_a = server.create_client(cx_a, "user_a").await;
3151        let mut client_b = server.create_client(cx_b, "user_b").await;
3152        server
3153            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3154            .await;
3155
3156        // Share a project as client A
3157        fs.insert_tree(
3158            "/a",
3159            json!({
3160                "main.rs": "fn main() { a }",
3161                "other.rs": "",
3162            }),
3163        )
3164        .await;
3165        let project_a = cx_a.update(|cx| {
3166            Project::local(
3167                client_a.clone(),
3168                client_a.user_store.clone(),
3169                lang_registry.clone(),
3170                fs.clone(),
3171                cx,
3172            )
3173        });
3174        let (worktree_a, _) = project_a
3175            .update(cx_a, |p, cx| {
3176                p.find_or_create_local_worktree("/a", true, cx)
3177            })
3178            .await
3179            .unwrap();
3180        worktree_a
3181            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3182            .await;
3183        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3184
3185        // Join the worktree as client B.
3186        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3187
3188        // Open a file in an editor as the guest.
3189        let buffer_b = project_b
3190            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3191            .await
3192            .unwrap();
3193        let (window_b, _) = cx_b.add_window(|_| EmptyView);
3194        let editor_b = cx_b.add_view(window_b, |cx| {
3195            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
3196        });
3197
3198        let fake_language_server = fake_language_servers.next().await.unwrap();
3199        buffer_b
3200            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3201            .await;
3202
3203        // Type a completion trigger character as the guest.
3204        editor_b.update(cx_b, |editor, cx| {
3205            editor.select_ranges([13..13], None, cx);
3206            editor.handle_input(&Input(".".into()), cx);
3207            cx.focus(&editor_b);
3208        });
3209
3210        // Receive a completion request as the host's language server.
3211        // Return some completions from the host's language server.
3212        cx_a.foreground().start_waiting();
3213        fake_language_server
3214            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3215                assert_eq!(
3216                    params.text_document_position.text_document.uri,
3217                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3218                );
3219                assert_eq!(
3220                    params.text_document_position.position,
3221                    lsp::Position::new(0, 14),
3222                );
3223
3224                Ok(Some(lsp::CompletionResponse::Array(vec![
3225                    lsp::CompletionItem {
3226                        label: "first_method(…)".into(),
3227                        detail: Some("fn(&mut self, B) -> C".into()),
3228                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3229                            new_text: "first_method($1)".to_string(),
3230                            range: lsp::Range::new(
3231                                lsp::Position::new(0, 14),
3232                                lsp::Position::new(0, 14),
3233                            ),
3234                        })),
3235                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3236                        ..Default::default()
3237                    },
3238                    lsp::CompletionItem {
3239                        label: "second_method(…)".into(),
3240                        detail: Some("fn(&mut self, C) -> D<E>".into()),
3241                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3242                            new_text: "second_method()".to_string(),
3243                            range: lsp::Range::new(
3244                                lsp::Position::new(0, 14),
3245                                lsp::Position::new(0, 14),
3246                            ),
3247                        })),
3248                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3249                        ..Default::default()
3250                    },
3251                ])))
3252            })
3253            .next()
3254            .await
3255            .unwrap();
3256        cx_a.foreground().finish_waiting();
3257
3258        // Open the buffer on the host.
3259        let buffer_a = project_a
3260            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3261            .await
3262            .unwrap();
3263        buffer_a
3264            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3265            .await;
3266
3267        // Confirm a completion on the guest.
3268        editor_b
3269            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3270            .await;
3271        editor_b.update(cx_b, |editor, cx| {
3272            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3273            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3274        });
3275
3276        // Return a resolved completion from the host's language server.
3277        // The resolved completion has an additional text edit.
3278        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3279            |params, _| async move {
3280                assert_eq!(params.label, "first_method(…)");
3281                Ok(lsp::CompletionItem {
3282                    label: "first_method(…)".into(),
3283                    detail: Some("fn(&mut self, B) -> C".into()),
3284                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3285                        new_text: "first_method($1)".to_string(),
3286                        range: lsp::Range::new(
3287                            lsp::Position::new(0, 14),
3288                            lsp::Position::new(0, 14),
3289                        ),
3290                    })),
3291                    additional_text_edits: Some(vec![lsp::TextEdit {
3292                        new_text: "use d::SomeTrait;\n".to_string(),
3293                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3294                    }]),
3295                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3296                    ..Default::default()
3297                })
3298            },
3299        );
3300
3301        // The additional edit is applied.
3302        buffer_a
3303            .condition(&cx_a, |buffer, _| {
3304                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3305            })
3306            .await;
3307        buffer_b
3308            .condition(&cx_b, |buffer, _| {
3309                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3310            })
3311            .await;
3312    }
3313
3314    #[gpui::test(iterations = 10)]
3315    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3316        cx_a.foreground().forbid_parking();
3317        let lang_registry = Arc::new(LanguageRegistry::test());
3318        let fs = FakeFs::new(cx_a.background());
3319
3320        // Connect to a server as 2 clients.
3321        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3322        let client_a = server.create_client(cx_a, "user_a").await;
3323        let mut client_b = server.create_client(cx_b, "user_b").await;
3324        server
3325            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3326            .await;
3327
3328        // Share a project as client A
3329        fs.insert_tree(
3330            "/a",
3331            json!({
3332                "a.rs": "let one = 1;",
3333            }),
3334        )
3335        .await;
3336        let project_a = cx_a.update(|cx| {
3337            Project::local(
3338                client_a.clone(),
3339                client_a.user_store.clone(),
3340                lang_registry.clone(),
3341                fs.clone(),
3342                cx,
3343            )
3344        });
3345        let (worktree_a, _) = project_a
3346            .update(cx_a, |p, cx| {
3347                p.find_or_create_local_worktree("/a", true, cx)
3348            })
3349            .await
3350            .unwrap();
3351        worktree_a
3352            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3353            .await;
3354        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3355        let buffer_a = project_a
3356            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3357            .await
3358            .unwrap();
3359
3360        // Join the worktree as client B.
3361        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3362
3363        let buffer_b = cx_b
3364            .background()
3365            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3366            .await
3367            .unwrap();
3368        buffer_b.update(cx_b, |buffer, cx| {
3369            buffer.edit([(4..7, "six")], cx);
3370            buffer.edit([(10..11, "6")], cx);
3371            assert_eq!(buffer.text(), "let six = 6;");
3372            assert!(buffer.is_dirty());
3373            assert!(!buffer.has_conflict());
3374        });
3375        buffer_a
3376            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3377            .await;
3378
3379        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3380            .await
3381            .unwrap();
3382        buffer_a
3383            .condition(cx_a, |buffer, _| buffer.has_conflict())
3384            .await;
3385        buffer_b
3386            .condition(cx_b, |buffer, _| buffer.has_conflict())
3387            .await;
3388
3389        project_b
3390            .update(cx_b, |project, cx| {
3391                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3392            })
3393            .await
3394            .unwrap();
3395        buffer_a.read_with(cx_a, |buffer, _| {
3396            assert_eq!(buffer.text(), "let seven = 7;");
3397            assert!(!buffer.is_dirty());
3398            assert!(!buffer.has_conflict());
3399        });
3400        buffer_b.read_with(cx_b, |buffer, _| {
3401            assert_eq!(buffer.text(), "let seven = 7;");
3402            assert!(!buffer.is_dirty());
3403            assert!(!buffer.has_conflict());
3404        });
3405
3406        buffer_a.update(cx_a, |buffer, cx| {
3407            // Undoing on the host is a no-op when the reload was initiated by the guest.
3408            buffer.undo(cx);
3409            assert_eq!(buffer.text(), "let seven = 7;");
3410            assert!(!buffer.is_dirty());
3411            assert!(!buffer.has_conflict());
3412        });
3413        buffer_b.update(cx_b, |buffer, cx| {
3414            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3415            buffer.undo(cx);
3416            assert_eq!(buffer.text(), "let six = 6;");
3417            assert!(buffer.is_dirty());
3418            assert!(!buffer.has_conflict());
3419        });
3420    }
3421
3422    #[gpui::test(iterations = 10)]
3423    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3424        cx_a.foreground().forbid_parking();
3425        let lang_registry = Arc::new(LanguageRegistry::test());
3426        let fs = FakeFs::new(cx_a.background());
3427
3428        // Set up a fake language server.
3429        let mut language = Language::new(
3430            LanguageConfig {
3431                name: "Rust".into(),
3432                path_suffixes: vec!["rs".to_string()],
3433                ..Default::default()
3434            },
3435            Some(tree_sitter_rust::language()),
3436        );
3437        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3438        lang_registry.add(Arc::new(language));
3439
3440        // Connect to a server as 2 clients.
3441        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3442        let client_a = server.create_client(cx_a, "user_a").await;
3443        let mut client_b = server.create_client(cx_b, "user_b").await;
3444        server
3445            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3446            .await;
3447
3448        // Share a project as client A
3449        fs.insert_tree(
3450            "/a",
3451            json!({
3452                "a.rs": "let one = two",
3453            }),
3454        )
3455        .await;
3456        let project_a = cx_a.update(|cx| {
3457            Project::local(
3458                client_a.clone(),
3459                client_a.user_store.clone(),
3460                lang_registry.clone(),
3461                fs.clone(),
3462                cx,
3463            )
3464        });
3465        let (worktree_a, _) = project_a
3466            .update(cx_a, |p, cx| {
3467                p.find_or_create_local_worktree("/a", true, cx)
3468            })
3469            .await
3470            .unwrap();
3471        worktree_a
3472            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3473            .await;
3474        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3475
3476        // Join the project as client B.
3477        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3478
3479        let buffer_b = cx_b
3480            .background()
3481            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3482            .await
3483            .unwrap();
3484
3485        let fake_language_server = fake_language_servers.next().await.unwrap();
3486        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3487            Ok(Some(vec![
3488                lsp::TextEdit {
3489                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3490                    new_text: "h".to_string(),
3491                },
3492                lsp::TextEdit {
3493                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3494                    new_text: "y".to_string(),
3495                },
3496            ]))
3497        });
3498
3499        project_b
3500            .update(cx_b, |project, cx| {
3501                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3502            })
3503            .await
3504            .unwrap();
3505        assert_eq!(
3506            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3507            "let honey = two"
3508        );
3509    }
3510
3511    #[gpui::test(iterations = 10)]
3512    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3513        cx_a.foreground().forbid_parking();
3514        let lang_registry = Arc::new(LanguageRegistry::test());
3515        let fs = FakeFs::new(cx_a.background());
3516        fs.insert_tree(
3517            "/root-1",
3518            json!({
3519                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3520            }),
3521        )
3522        .await;
3523        fs.insert_tree(
3524            "/root-2",
3525            json!({
3526                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3527            }),
3528        )
3529        .await;
3530
3531        // Set up a fake language server.
3532        let mut language = Language::new(
3533            LanguageConfig {
3534                name: "Rust".into(),
3535                path_suffixes: vec!["rs".to_string()],
3536                ..Default::default()
3537            },
3538            Some(tree_sitter_rust::language()),
3539        );
3540        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3541        lang_registry.add(Arc::new(language));
3542
3543        // Connect to a server as 2 clients.
3544        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3545        let client_a = server.create_client(cx_a, "user_a").await;
3546        let mut client_b = server.create_client(cx_b, "user_b").await;
3547        server
3548            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3549            .await;
3550
3551        // Share a project as client A
3552        let project_a = cx_a.update(|cx| {
3553            Project::local(
3554                client_a.clone(),
3555                client_a.user_store.clone(),
3556                lang_registry.clone(),
3557                fs.clone(),
3558                cx,
3559            )
3560        });
3561        let (worktree_a, _) = project_a
3562            .update(cx_a, |p, cx| {
3563                p.find_or_create_local_worktree("/root-1", true, cx)
3564            })
3565            .await
3566            .unwrap();
3567        worktree_a
3568            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3569            .await;
3570        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3571
3572        // Join the project as client B.
3573        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3574
3575        // Open the file on client B.
3576        let buffer_b = cx_b
3577            .background()
3578            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3579            .await
3580            .unwrap();
3581
3582        // Request the definition of a symbol as the guest.
3583        let fake_language_server = fake_language_servers.next().await.unwrap();
3584        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3585            |_, _| async move {
3586                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3587                    lsp::Location::new(
3588                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3589                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3590                    ),
3591                )))
3592            },
3593        );
3594
3595        let definitions_1 = project_b
3596            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3597            .await
3598            .unwrap();
3599        cx_b.read(|cx| {
3600            assert_eq!(definitions_1.len(), 1);
3601            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3602            let target_buffer = definitions_1[0].buffer.read(cx);
3603            assert_eq!(
3604                target_buffer.text(),
3605                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3606            );
3607            assert_eq!(
3608                definitions_1[0].range.to_point(target_buffer),
3609                Point::new(0, 6)..Point::new(0, 9)
3610            );
3611        });
3612
3613        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3614        // the previous call to `definition`.
3615        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3616            |_, _| async move {
3617                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3618                    lsp::Location::new(
3619                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3620                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3621                    ),
3622                )))
3623            },
3624        );
3625
3626        let definitions_2 = project_b
3627            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3628            .await
3629            .unwrap();
3630        cx_b.read(|cx| {
3631            assert_eq!(definitions_2.len(), 1);
3632            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3633            let target_buffer = definitions_2[0].buffer.read(cx);
3634            assert_eq!(
3635                target_buffer.text(),
3636                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3637            );
3638            assert_eq!(
3639                definitions_2[0].range.to_point(target_buffer),
3640                Point::new(1, 6)..Point::new(1, 11)
3641            );
3642        });
3643        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3644    }
3645
3646    #[gpui::test(iterations = 10)]
3647    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3648        cx_a.foreground().forbid_parking();
3649        let lang_registry = Arc::new(LanguageRegistry::test());
3650        let fs = FakeFs::new(cx_a.background());
3651        fs.insert_tree(
3652            "/root-1",
3653            json!({
3654                "one.rs": "const ONE: usize = 1;",
3655                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3656            }),
3657        )
3658        .await;
3659        fs.insert_tree(
3660            "/root-2",
3661            json!({
3662                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3663            }),
3664        )
3665        .await;
3666
3667        // Set up a fake language server.
3668        let mut language = Language::new(
3669            LanguageConfig {
3670                name: "Rust".into(),
3671                path_suffixes: vec!["rs".to_string()],
3672                ..Default::default()
3673            },
3674            Some(tree_sitter_rust::language()),
3675        );
3676        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3677        lang_registry.add(Arc::new(language));
3678
3679        // Connect to a server as 2 clients.
3680        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3681        let client_a = server.create_client(cx_a, "user_a").await;
3682        let mut client_b = server.create_client(cx_b, "user_b").await;
3683        server
3684            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3685            .await;
3686
3687        // Share a project as client A
3688        let project_a = cx_a.update(|cx| {
3689            Project::local(
3690                client_a.clone(),
3691                client_a.user_store.clone(),
3692                lang_registry.clone(),
3693                fs.clone(),
3694                cx,
3695            )
3696        });
3697        let (worktree_a, _) = project_a
3698            .update(cx_a, |p, cx| {
3699                p.find_or_create_local_worktree("/root-1", true, cx)
3700            })
3701            .await
3702            .unwrap();
3703        worktree_a
3704            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3705            .await;
3706        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3707
3708        // Join the worktree as client B.
3709        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3710
3711        // Open the file on client B.
3712        let buffer_b = cx_b
3713            .background()
3714            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3715            .await
3716            .unwrap();
3717
3718        // Request references to a symbol as the guest.
3719        let fake_language_server = fake_language_servers.next().await.unwrap();
3720        fake_language_server.handle_request::<lsp::request::References, _, _>(
3721            |params, _| async move {
3722                assert_eq!(
3723                    params.text_document_position.text_document.uri.as_str(),
3724                    "file:///root-1/one.rs"
3725                );
3726                Ok(Some(vec![
3727                    lsp::Location {
3728                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3729                        range: lsp::Range::new(
3730                            lsp::Position::new(0, 24),
3731                            lsp::Position::new(0, 27),
3732                        ),
3733                    },
3734                    lsp::Location {
3735                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3736                        range: lsp::Range::new(
3737                            lsp::Position::new(0, 35),
3738                            lsp::Position::new(0, 38),
3739                        ),
3740                    },
3741                    lsp::Location {
3742                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3743                        range: lsp::Range::new(
3744                            lsp::Position::new(0, 37),
3745                            lsp::Position::new(0, 40),
3746                        ),
3747                    },
3748                ]))
3749            },
3750        );
3751
3752        let references = project_b
3753            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3754            .await
3755            .unwrap();
3756        cx_b.read(|cx| {
3757            assert_eq!(references.len(), 3);
3758            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3759
3760            let two_buffer = references[0].buffer.read(cx);
3761            let three_buffer = references[2].buffer.read(cx);
3762            assert_eq!(
3763                two_buffer.file().unwrap().path().as_ref(),
3764                Path::new("two.rs")
3765            );
3766            assert_eq!(references[1].buffer, references[0].buffer);
3767            assert_eq!(
3768                three_buffer.file().unwrap().full_path(cx),
3769                Path::new("three.rs")
3770            );
3771
3772            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3773            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3774            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3775        });
3776    }
3777
3778    #[gpui::test(iterations = 10)]
3779    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3780        cx_a.foreground().forbid_parking();
3781        let lang_registry = Arc::new(LanguageRegistry::test());
3782        let fs = FakeFs::new(cx_a.background());
3783        fs.insert_tree(
3784            "/root-1",
3785            json!({
3786                "a": "hello world",
3787                "b": "goodnight moon",
3788                "c": "a world of goo",
3789                "d": "world champion of clown world",
3790            }),
3791        )
3792        .await;
3793        fs.insert_tree(
3794            "/root-2",
3795            json!({
3796                "e": "disney world is fun",
3797            }),
3798        )
3799        .await;
3800
3801        // Connect to a server as 2 clients.
3802        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3803        let client_a = server.create_client(cx_a, "user_a").await;
3804        let mut client_b = server.create_client(cx_b, "user_b").await;
3805        server
3806            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3807            .await;
3808
3809        // Share a project as client A
3810        let project_a = cx_a.update(|cx| {
3811            Project::local(
3812                client_a.clone(),
3813                client_a.user_store.clone(),
3814                lang_registry.clone(),
3815                fs.clone(),
3816                cx,
3817            )
3818        });
3819
3820        let (worktree_1, _) = project_a
3821            .update(cx_a, |p, cx| {
3822                p.find_or_create_local_worktree("/root-1", true, cx)
3823            })
3824            .await
3825            .unwrap();
3826        worktree_1
3827            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3828            .await;
3829        let (worktree_2, _) = project_a
3830            .update(cx_a, |p, cx| {
3831                p.find_or_create_local_worktree("/root-2", true, cx)
3832            })
3833            .await
3834            .unwrap();
3835        worktree_2
3836            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3837            .await;
3838
3839        // Join the worktree as client B.
3840        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3841        let results = project_b
3842            .update(cx_b, |project, cx| {
3843                project.search(SearchQuery::text("world", false, false), cx)
3844            })
3845            .await
3846            .unwrap();
3847
3848        let mut ranges_by_path = results
3849            .into_iter()
3850            .map(|(buffer, ranges)| {
3851                buffer.read_with(cx_b, |buffer, cx| {
3852                    let path = buffer.file().unwrap().full_path(cx);
3853                    let offset_ranges = ranges
3854                        .into_iter()
3855                        .map(|range| range.to_offset(buffer))
3856                        .collect::<Vec<_>>();
3857                    (path, offset_ranges)
3858                })
3859            })
3860            .collect::<Vec<_>>();
3861        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3862
3863        assert_eq!(
3864            ranges_by_path,
3865            &[
3866                (PathBuf::from("root-1/a"), vec![6..11]),
3867                (PathBuf::from("root-1/c"), vec![2..7]),
3868                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3869                (PathBuf::from("root-2/e"), vec![7..12]),
3870            ]
3871        );
3872    }
3873
3874    #[gpui::test(iterations = 10)]
3875    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3876        cx_a.foreground().forbid_parking();
3877        let lang_registry = Arc::new(LanguageRegistry::test());
3878        let fs = FakeFs::new(cx_a.background());
3879        fs.insert_tree(
3880            "/root-1",
3881            json!({
3882                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3883            }),
3884        )
3885        .await;
3886
3887        // Set up a fake language server.
3888        let mut language = Language::new(
3889            LanguageConfig {
3890                name: "Rust".into(),
3891                path_suffixes: vec!["rs".to_string()],
3892                ..Default::default()
3893            },
3894            Some(tree_sitter_rust::language()),
3895        );
3896        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3897        lang_registry.add(Arc::new(language));
3898
3899        // Connect to a server as 2 clients.
3900        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3901        let client_a = server.create_client(cx_a, "user_a").await;
3902        let mut client_b = server.create_client(cx_b, "user_b").await;
3903        server
3904            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3905            .await;
3906
3907        // Share a project as client A
3908        let project_a = cx_a.update(|cx| {
3909            Project::local(
3910                client_a.clone(),
3911                client_a.user_store.clone(),
3912                lang_registry.clone(),
3913                fs.clone(),
3914                cx,
3915            )
3916        });
3917        let (worktree_a, _) = project_a
3918            .update(cx_a, |p, cx| {
3919                p.find_or_create_local_worktree("/root-1", true, cx)
3920            })
3921            .await
3922            .unwrap();
3923        worktree_a
3924            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3925            .await;
3926        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3927
3928        // Join the worktree as client B.
3929        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3930
3931        // Open the file on client B.
3932        let buffer_b = cx_b
3933            .background()
3934            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3935            .await
3936            .unwrap();
3937
3938        // Request document highlights as the guest.
3939        let fake_language_server = fake_language_servers.next().await.unwrap();
3940        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3941            |params, _| async move {
3942                assert_eq!(
3943                    params
3944                        .text_document_position_params
3945                        .text_document
3946                        .uri
3947                        .as_str(),
3948                    "file:///root-1/main.rs"
3949                );
3950                assert_eq!(
3951                    params.text_document_position_params.position,
3952                    lsp::Position::new(0, 34)
3953                );
3954                Ok(Some(vec![
3955                    lsp::DocumentHighlight {
3956                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3957                        range: lsp::Range::new(
3958                            lsp::Position::new(0, 10),
3959                            lsp::Position::new(0, 16),
3960                        ),
3961                    },
3962                    lsp::DocumentHighlight {
3963                        kind: Some(lsp::DocumentHighlightKind::READ),
3964                        range: lsp::Range::new(
3965                            lsp::Position::new(0, 32),
3966                            lsp::Position::new(0, 38),
3967                        ),
3968                    },
3969                    lsp::DocumentHighlight {
3970                        kind: Some(lsp::DocumentHighlightKind::READ),
3971                        range: lsp::Range::new(
3972                            lsp::Position::new(0, 41),
3973                            lsp::Position::new(0, 47),
3974                        ),
3975                    },
3976                ]))
3977            },
3978        );
3979
3980        let highlights = project_b
3981            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3982            .await
3983            .unwrap();
3984        buffer_b.read_with(cx_b, |buffer, _| {
3985            let snapshot = buffer.snapshot();
3986
3987            let highlights = highlights
3988                .into_iter()
3989                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3990                .collect::<Vec<_>>();
3991            assert_eq!(
3992                highlights,
3993                &[
3994                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3995                    (lsp::DocumentHighlightKind::READ, 32..38),
3996                    (lsp::DocumentHighlightKind::READ, 41..47)
3997                ]
3998            )
3999        });
4000    }
4001
4002    #[gpui::test(iterations = 10)]
4003    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4004        cx_a.foreground().forbid_parking();
4005        let lang_registry = Arc::new(LanguageRegistry::test());
4006        let fs = FakeFs::new(cx_a.background());
4007        fs.insert_tree(
4008            "/code",
4009            json!({
4010                "crate-1": {
4011                    "one.rs": "const ONE: usize = 1;",
4012                },
4013                "crate-2": {
4014                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
4015                },
4016                "private": {
4017                    "passwords.txt": "the-password",
4018                }
4019            }),
4020        )
4021        .await;
4022
4023        // Set up a fake language server.
4024        let mut language = Language::new(
4025            LanguageConfig {
4026                name: "Rust".into(),
4027                path_suffixes: vec!["rs".to_string()],
4028                ..Default::default()
4029            },
4030            Some(tree_sitter_rust::language()),
4031        );
4032        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4033        lang_registry.add(Arc::new(language));
4034
4035        // Connect to a server as 2 clients.
4036        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4037        let client_a = server.create_client(cx_a, "user_a").await;
4038        let mut client_b = server.create_client(cx_b, "user_b").await;
4039        server
4040            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4041            .await;
4042
4043        // Share a project as client A
4044        let project_a = cx_a.update(|cx| {
4045            Project::local(
4046                client_a.clone(),
4047                client_a.user_store.clone(),
4048                lang_registry.clone(),
4049                fs.clone(),
4050                cx,
4051            )
4052        });
4053        let (worktree_a, _) = project_a
4054            .update(cx_a, |p, cx| {
4055                p.find_or_create_local_worktree("/code/crate-1", true, cx)
4056            })
4057            .await
4058            .unwrap();
4059        worktree_a
4060            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4061            .await;
4062        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4063
4064        // Join the worktree as client B.
4065        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4066
4067        // Cause the language server to start.
4068        let _buffer = cx_b
4069            .background()
4070            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
4071            .await
4072            .unwrap();
4073
4074        let fake_language_server = fake_language_servers.next().await.unwrap();
4075        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
4076            |_, _| async move {
4077                #[allow(deprecated)]
4078                Ok(Some(vec![lsp::SymbolInformation {
4079                    name: "TWO".into(),
4080                    location: lsp::Location {
4081                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
4082                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4083                    },
4084                    kind: lsp::SymbolKind::CONSTANT,
4085                    tags: None,
4086                    container_name: None,
4087                    deprecated: None,
4088                }]))
4089            },
4090        );
4091
4092        // Request the definition of a symbol as the guest.
4093        let symbols = project_b
4094            .update(cx_b, |p, cx| p.symbols("two", cx))
4095            .await
4096            .unwrap();
4097        assert_eq!(symbols.len(), 1);
4098        assert_eq!(symbols[0].name, "TWO");
4099
4100        // Open one of the returned symbols.
4101        let buffer_b_2 = project_b
4102            .update(cx_b, |project, cx| {
4103                project.open_buffer_for_symbol(&symbols[0], cx)
4104            })
4105            .await
4106            .unwrap();
4107        buffer_b_2.read_with(cx_b, |buffer, _| {
4108            assert_eq!(
4109                buffer.file().unwrap().path().as_ref(),
4110                Path::new("../crate-2/two.rs")
4111            );
4112        });
4113
4114        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
4115        let mut fake_symbol = symbols[0].clone();
4116        fake_symbol.path = Path::new("/code/secrets").into();
4117        let error = project_b
4118            .update(cx_b, |project, cx| {
4119                project.open_buffer_for_symbol(&fake_symbol, cx)
4120            })
4121            .await
4122            .unwrap_err();
4123        assert!(error.to_string().contains("invalid symbol signature"));
4124    }
4125
4126    #[gpui::test(iterations = 10)]
4127    async fn test_open_buffer_while_getting_definition_pointing_to_it(
4128        cx_a: &mut TestAppContext,
4129        cx_b: &mut TestAppContext,
4130        mut rng: StdRng,
4131    ) {
4132        cx_a.foreground().forbid_parking();
4133        let lang_registry = Arc::new(LanguageRegistry::test());
4134        let fs = FakeFs::new(cx_a.background());
4135        fs.insert_tree(
4136            "/root",
4137            json!({
4138                "a.rs": "const ONE: usize = b::TWO;",
4139                "b.rs": "const TWO: usize = 2",
4140            }),
4141        )
4142        .await;
4143
4144        // Set up a fake language server.
4145        let mut language = Language::new(
4146            LanguageConfig {
4147                name: "Rust".into(),
4148                path_suffixes: vec!["rs".to_string()],
4149                ..Default::default()
4150            },
4151            Some(tree_sitter_rust::language()),
4152        );
4153        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4154        lang_registry.add(Arc::new(language));
4155
4156        // Connect to a server as 2 clients.
4157        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4158        let client_a = server.create_client(cx_a, "user_a").await;
4159        let mut client_b = server.create_client(cx_b, "user_b").await;
4160        server
4161            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4162            .await;
4163
4164        // Share a project as client A
4165        let project_a = cx_a.update(|cx| {
4166            Project::local(
4167                client_a.clone(),
4168                client_a.user_store.clone(),
4169                lang_registry.clone(),
4170                fs.clone(),
4171                cx,
4172            )
4173        });
4174
4175        let (worktree_a, _) = project_a
4176            .update(cx_a, |p, cx| {
4177                p.find_or_create_local_worktree("/root", true, cx)
4178            })
4179            .await
4180            .unwrap();
4181        worktree_a
4182            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4183            .await;
4184        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4185
4186        // Join the project as client B.
4187        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4188
4189        let buffer_b1 = cx_b
4190            .background()
4191            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4192            .await
4193            .unwrap();
4194
4195        let fake_language_server = fake_language_servers.next().await.unwrap();
4196        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4197            |_, _| async move {
4198                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4199                    lsp::Location::new(
4200                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
4201                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4202                    ),
4203                )))
4204            },
4205        );
4206
4207        let definitions;
4208        let buffer_b2;
4209        if rng.gen() {
4210            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4211            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4212        } else {
4213            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4214            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4215        }
4216
4217        let buffer_b2 = buffer_b2.await.unwrap();
4218        let definitions = definitions.await.unwrap();
4219        assert_eq!(definitions.len(), 1);
4220        assert_eq!(definitions[0].buffer, buffer_b2);
4221    }
4222
4223    #[gpui::test(iterations = 10)]
4224    async fn test_collaborating_with_code_actions(
4225        cx_a: &mut TestAppContext,
4226        cx_b: &mut TestAppContext,
4227    ) {
4228        cx_a.foreground().forbid_parking();
4229        let lang_registry = Arc::new(LanguageRegistry::test());
4230        let fs = FakeFs::new(cx_a.background());
4231        cx_b.update(|cx| editor::init(cx));
4232
4233        // Set up a fake language server.
4234        let mut language = Language::new(
4235            LanguageConfig {
4236                name: "Rust".into(),
4237                path_suffixes: vec!["rs".to_string()],
4238                ..Default::default()
4239            },
4240            Some(tree_sitter_rust::language()),
4241        );
4242        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4243        lang_registry.add(Arc::new(language));
4244
4245        // Connect to a server as 2 clients.
4246        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4247        let client_a = server.create_client(cx_a, "user_a").await;
4248        let mut client_b = server.create_client(cx_b, "user_b").await;
4249        server
4250            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4251            .await;
4252
4253        // Share a project as client A
4254        fs.insert_tree(
4255            "/a",
4256            json!({
4257                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4258                "other.rs": "pub fn foo() -> usize { 4 }",
4259            }),
4260        )
4261        .await;
4262        let project_a = cx_a.update(|cx| {
4263            Project::local(
4264                client_a.clone(),
4265                client_a.user_store.clone(),
4266                lang_registry.clone(),
4267                fs.clone(),
4268                cx,
4269            )
4270        });
4271        let (worktree_a, _) = project_a
4272            .update(cx_a, |p, cx| {
4273                p.find_or_create_local_worktree("/a", true, cx)
4274            })
4275            .await
4276            .unwrap();
4277        worktree_a
4278            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4279            .await;
4280        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4281
4282        // Join the project as client B.
4283        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4284        let mut params = cx_b.update(WorkspaceParams::test);
4285        params.languages = lang_registry.clone();
4286        params.project = project_b.clone();
4287        params.client = client_b.client.clone();
4288        params.user_store = client_b.user_store.clone();
4289
4290        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4291        let editor_b = workspace_b
4292            .update(cx_b, |workspace, cx| {
4293                workspace.open_path((worktree_id, "main.rs"), true, cx)
4294            })
4295            .await
4296            .unwrap()
4297            .downcast::<Editor>()
4298            .unwrap();
4299
4300        let mut fake_language_server = fake_language_servers.next().await.unwrap();
4301        fake_language_server
4302            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4303                assert_eq!(
4304                    params.text_document.uri,
4305                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4306                );
4307                assert_eq!(params.range.start, lsp::Position::new(0, 0));
4308                assert_eq!(params.range.end, lsp::Position::new(0, 0));
4309                Ok(None)
4310            })
4311            .next()
4312            .await;
4313
4314        // Move cursor to a location that contains code actions.
4315        editor_b.update(cx_b, |editor, cx| {
4316            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4317            cx.focus(&editor_b);
4318        });
4319
4320        fake_language_server
4321            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4322                assert_eq!(
4323                    params.text_document.uri,
4324                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4325                );
4326                assert_eq!(params.range.start, lsp::Position::new(1, 31));
4327                assert_eq!(params.range.end, lsp::Position::new(1, 31));
4328
4329                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4330                    lsp::CodeAction {
4331                        title: "Inline into all callers".to_string(),
4332                        edit: Some(lsp::WorkspaceEdit {
4333                            changes: Some(
4334                                [
4335                                    (
4336                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
4337                                        vec![lsp::TextEdit::new(
4338                                            lsp::Range::new(
4339                                                lsp::Position::new(1, 22),
4340                                                lsp::Position::new(1, 34),
4341                                            ),
4342                                            "4".to_string(),
4343                                        )],
4344                                    ),
4345                                    (
4346                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
4347                                        vec![lsp::TextEdit::new(
4348                                            lsp::Range::new(
4349                                                lsp::Position::new(0, 0),
4350                                                lsp::Position::new(0, 27),
4351                                            ),
4352                                            "".to_string(),
4353                                        )],
4354                                    ),
4355                                ]
4356                                .into_iter()
4357                                .collect(),
4358                            ),
4359                            ..Default::default()
4360                        }),
4361                        data: Some(json!({
4362                            "codeActionParams": {
4363                                "range": {
4364                                    "start": {"line": 1, "column": 31},
4365                                    "end": {"line": 1, "column": 31},
4366                                }
4367                            }
4368                        })),
4369                        ..Default::default()
4370                    },
4371                )]))
4372            })
4373            .next()
4374            .await;
4375
4376        // Toggle code actions and wait for them to display.
4377        editor_b.update(cx_b, |editor, cx| {
4378            editor.toggle_code_actions(
4379                &ToggleCodeActions {
4380                    deployed_from_indicator: false,
4381                },
4382                cx,
4383            );
4384        });
4385        editor_b
4386            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4387            .await;
4388
4389        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4390
4391        // Confirming the code action will trigger a resolve request.
4392        let confirm_action = workspace_b
4393            .update(cx_b, |workspace, cx| {
4394                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4395            })
4396            .unwrap();
4397        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4398            |_, _| async move {
4399                Ok(lsp::CodeAction {
4400                    title: "Inline into all callers".to_string(),
4401                    edit: Some(lsp::WorkspaceEdit {
4402                        changes: Some(
4403                            [
4404                                (
4405                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4406                                    vec![lsp::TextEdit::new(
4407                                        lsp::Range::new(
4408                                            lsp::Position::new(1, 22),
4409                                            lsp::Position::new(1, 34),
4410                                        ),
4411                                        "4".to_string(),
4412                                    )],
4413                                ),
4414                                (
4415                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4416                                    vec![lsp::TextEdit::new(
4417                                        lsp::Range::new(
4418                                            lsp::Position::new(0, 0),
4419                                            lsp::Position::new(0, 27),
4420                                        ),
4421                                        "".to_string(),
4422                                    )],
4423                                ),
4424                            ]
4425                            .into_iter()
4426                            .collect(),
4427                        ),
4428                        ..Default::default()
4429                    }),
4430                    ..Default::default()
4431                })
4432            },
4433        );
4434
4435        // After the action is confirmed, an editor containing both modified files is opened.
4436        confirm_action.await.unwrap();
4437        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4438            workspace
4439                .active_item(cx)
4440                .unwrap()
4441                .downcast::<Editor>()
4442                .unwrap()
4443        });
4444        code_action_editor.update(cx_b, |editor, cx| {
4445            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4446            editor.undo(&Undo, cx);
4447            assert_eq!(
4448                editor.text(cx),
4449                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4450            );
4451            editor.redo(&Redo, cx);
4452            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4453        });
4454    }
4455
4456    #[gpui::test(iterations = 10)]
4457    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4458        cx_a.foreground().forbid_parking();
4459        let lang_registry = Arc::new(LanguageRegistry::test());
4460        let fs = FakeFs::new(cx_a.background());
4461        cx_b.update(|cx| editor::init(cx));
4462
4463        // Set up a fake language server.
4464        let mut language = Language::new(
4465            LanguageConfig {
4466                name: "Rust".into(),
4467                path_suffixes: vec!["rs".to_string()],
4468                ..Default::default()
4469            },
4470            Some(tree_sitter_rust::language()),
4471        );
4472        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4473            capabilities: lsp::ServerCapabilities {
4474                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4475                    prepare_provider: Some(true),
4476                    work_done_progress_options: Default::default(),
4477                })),
4478                ..Default::default()
4479            },
4480            ..Default::default()
4481        });
4482        lang_registry.add(Arc::new(language));
4483
4484        // Connect to a server as 2 clients.
4485        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4486        let client_a = server.create_client(cx_a, "user_a").await;
4487        let mut client_b = server.create_client(cx_b, "user_b").await;
4488        server
4489            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4490            .await;
4491
4492        // Share a project as client A
4493        fs.insert_tree(
4494            "/dir",
4495            json!({
4496                "one.rs": "const ONE: usize = 1;",
4497                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4498            }),
4499        )
4500        .await;
4501        let project_a = cx_a.update(|cx| {
4502            Project::local(
4503                client_a.clone(),
4504                client_a.user_store.clone(),
4505                lang_registry.clone(),
4506                fs.clone(),
4507                cx,
4508            )
4509        });
4510        let (worktree_a, _) = project_a
4511            .update(cx_a, |p, cx| {
4512                p.find_or_create_local_worktree("/dir", true, cx)
4513            })
4514            .await
4515            .unwrap();
4516        worktree_a
4517            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4518            .await;
4519        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4520
4521        // Join the worktree as client B.
4522        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4523        let mut params = cx_b.update(WorkspaceParams::test);
4524        params.languages = lang_registry.clone();
4525        params.project = project_b.clone();
4526        params.client = client_b.client.clone();
4527        params.user_store = client_b.user_store.clone();
4528
4529        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4530        let editor_b = workspace_b
4531            .update(cx_b, |workspace, cx| {
4532                workspace.open_path((worktree_id, "one.rs"), true, cx)
4533            })
4534            .await
4535            .unwrap()
4536            .downcast::<Editor>()
4537            .unwrap();
4538        let fake_language_server = fake_language_servers.next().await.unwrap();
4539
4540        // Move cursor to a location that can be renamed.
4541        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4542            editor.select_ranges([7..7], None, cx);
4543            editor.rename(&Rename, cx).unwrap()
4544        });
4545
4546        fake_language_server
4547            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4548                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4549                assert_eq!(params.position, lsp::Position::new(0, 7));
4550                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4551                    lsp::Position::new(0, 6),
4552                    lsp::Position::new(0, 9),
4553                ))))
4554            })
4555            .next()
4556            .await
4557            .unwrap();
4558        prepare_rename.await.unwrap();
4559        editor_b.update(cx_b, |editor, cx| {
4560            let rename = editor.pending_rename().unwrap();
4561            let buffer = editor.buffer().read(cx).snapshot(cx);
4562            assert_eq!(
4563                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4564                6..9
4565            );
4566            rename.editor.update(cx, |rename_editor, cx| {
4567                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4568                    rename_buffer.edit([(0..3, "THREE")], cx);
4569                });
4570            });
4571        });
4572
4573        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4574            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4575        });
4576        fake_language_server
4577            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4578                assert_eq!(
4579                    params.text_document_position.text_document.uri.as_str(),
4580                    "file:///dir/one.rs"
4581                );
4582                assert_eq!(
4583                    params.text_document_position.position,
4584                    lsp::Position::new(0, 6)
4585                );
4586                assert_eq!(params.new_name, "THREE");
4587                Ok(Some(lsp::WorkspaceEdit {
4588                    changes: Some(
4589                        [
4590                            (
4591                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4592                                vec![lsp::TextEdit::new(
4593                                    lsp::Range::new(
4594                                        lsp::Position::new(0, 6),
4595                                        lsp::Position::new(0, 9),
4596                                    ),
4597                                    "THREE".to_string(),
4598                                )],
4599                            ),
4600                            (
4601                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4602                                vec![
4603                                    lsp::TextEdit::new(
4604                                        lsp::Range::new(
4605                                            lsp::Position::new(0, 24),
4606                                            lsp::Position::new(0, 27),
4607                                        ),
4608                                        "THREE".to_string(),
4609                                    ),
4610                                    lsp::TextEdit::new(
4611                                        lsp::Range::new(
4612                                            lsp::Position::new(0, 35),
4613                                            lsp::Position::new(0, 38),
4614                                        ),
4615                                        "THREE".to_string(),
4616                                    ),
4617                                ],
4618                            ),
4619                        ]
4620                        .into_iter()
4621                        .collect(),
4622                    ),
4623                    ..Default::default()
4624                }))
4625            })
4626            .next()
4627            .await
4628            .unwrap();
4629        confirm_rename.await.unwrap();
4630
4631        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4632            workspace
4633                .active_item(cx)
4634                .unwrap()
4635                .downcast::<Editor>()
4636                .unwrap()
4637        });
4638        rename_editor.update(cx_b, |editor, cx| {
4639            assert_eq!(
4640                editor.text(cx),
4641                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4642            );
4643            editor.undo(&Undo, cx);
4644            assert_eq!(
4645                editor.text(cx),
4646                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4647            );
4648            editor.redo(&Redo, cx);
4649            assert_eq!(
4650                editor.text(cx),
4651                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4652            );
4653        });
4654
4655        // Ensure temporary rename edits cannot be undone/redone.
4656        editor_b.update(cx_b, |editor, cx| {
4657            editor.undo(&Undo, cx);
4658            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4659            editor.undo(&Undo, cx);
4660            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4661            editor.redo(&Redo, cx);
4662            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4663        })
4664    }
4665
4666    #[gpui::test(iterations = 10)]
4667    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4668        cx_a.foreground().forbid_parking();
4669
4670        // Connect to a server as 2 clients.
4671        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4672        let client_a = server.create_client(cx_a, "user_a").await;
4673        let client_b = server.create_client(cx_b, "user_b").await;
4674
4675        // Create an org that includes these 2 users.
4676        let db = &server.app_state.db;
4677        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4678        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4679            .await
4680            .unwrap();
4681        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4682            .await
4683            .unwrap();
4684
4685        // Create a channel that includes all the users.
4686        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4687        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4688            .await
4689            .unwrap();
4690        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4691            .await
4692            .unwrap();
4693        db.create_channel_message(
4694            channel_id,
4695            client_b.current_user_id(&cx_b),
4696            "hello A, it's B.",
4697            OffsetDateTime::now_utc(),
4698            1,
4699        )
4700        .await
4701        .unwrap();
4702
4703        let channels_a = cx_a
4704            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4705        channels_a
4706            .condition(cx_a, |list, _| list.available_channels().is_some())
4707            .await;
4708        channels_a.read_with(cx_a, |list, _| {
4709            assert_eq!(
4710                list.available_channels().unwrap(),
4711                &[ChannelDetails {
4712                    id: channel_id.to_proto(),
4713                    name: "test-channel".to_string()
4714                }]
4715            )
4716        });
4717        let channel_a = channels_a.update(cx_a, |this, cx| {
4718            this.get_channel(channel_id.to_proto(), cx).unwrap()
4719        });
4720        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4721        channel_a
4722            .condition(&cx_a, |channel, _| {
4723                channel_messages(channel)
4724                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4725            })
4726            .await;
4727
4728        let channels_b = cx_b
4729            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4730        channels_b
4731            .condition(cx_b, |list, _| list.available_channels().is_some())
4732            .await;
4733        channels_b.read_with(cx_b, |list, _| {
4734            assert_eq!(
4735                list.available_channels().unwrap(),
4736                &[ChannelDetails {
4737                    id: channel_id.to_proto(),
4738                    name: "test-channel".to_string()
4739                }]
4740            )
4741        });
4742
4743        let channel_b = channels_b.update(cx_b, |this, cx| {
4744            this.get_channel(channel_id.to_proto(), cx).unwrap()
4745        });
4746        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4747        channel_b
4748            .condition(&cx_b, |channel, _| {
4749                channel_messages(channel)
4750                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4751            })
4752            .await;
4753
4754        channel_a
4755            .update(cx_a, |channel, cx| {
4756                channel
4757                    .send_message("oh, hi B.".to_string(), cx)
4758                    .unwrap()
4759                    .detach();
4760                let task = channel.send_message("sup".to_string(), cx).unwrap();
4761                assert_eq!(
4762                    channel_messages(channel),
4763                    &[
4764                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4765                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4766                        ("user_a".to_string(), "sup".to_string(), true)
4767                    ]
4768                );
4769                task
4770            })
4771            .await
4772            .unwrap();
4773
4774        channel_b
4775            .condition(&cx_b, |channel, _| {
4776                channel_messages(channel)
4777                    == [
4778                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4779                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4780                        ("user_a".to_string(), "sup".to_string(), false),
4781                    ]
4782            })
4783            .await;
4784
4785        assert_eq!(
4786            server
4787                .state()
4788                .await
4789                .channel(channel_id)
4790                .unwrap()
4791                .connection_ids
4792                .len(),
4793            2
4794        );
4795        cx_b.update(|_| drop(channel_b));
4796        server
4797            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4798            .await;
4799
4800        cx_a.update(|_| drop(channel_a));
4801        server
4802            .condition(|state| state.channel(channel_id).is_none())
4803            .await;
4804    }
4805
4806    #[gpui::test(iterations = 10)]
4807    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4808        cx_a.foreground().forbid_parking();
4809
4810        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4811        let client_a = server.create_client(cx_a, "user_a").await;
4812
4813        let db = &server.app_state.db;
4814        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4815        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4816        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4817            .await
4818            .unwrap();
4819        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4820            .await
4821            .unwrap();
4822
4823        let channels_a = cx_a
4824            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4825        channels_a
4826            .condition(cx_a, |list, _| list.available_channels().is_some())
4827            .await;
4828        let channel_a = channels_a.update(cx_a, |this, cx| {
4829            this.get_channel(channel_id.to_proto(), cx).unwrap()
4830        });
4831
4832        // Messages aren't allowed to be too long.
4833        channel_a
4834            .update(cx_a, |channel, cx| {
4835                let long_body = "this is long.\n".repeat(1024);
4836                channel.send_message(long_body, cx).unwrap()
4837            })
4838            .await
4839            .unwrap_err();
4840
4841        // Messages aren't allowed to be blank.
4842        channel_a.update(cx_a, |channel, cx| {
4843            channel.send_message(String::new(), cx).unwrap_err()
4844        });
4845
4846        // Leading and trailing whitespace are trimmed.
4847        channel_a
4848            .update(cx_a, |channel, cx| {
4849                channel
4850                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4851                    .unwrap()
4852            })
4853            .await
4854            .unwrap();
4855        assert_eq!(
4856            db.get_channel_messages(channel_id, 10, None)
4857                .await
4858                .unwrap()
4859                .iter()
4860                .map(|m| &m.body)
4861                .collect::<Vec<_>>(),
4862            &["surrounded by whitespace"]
4863        );
4864    }
4865
4866    #[gpui::test(iterations = 10)]
4867    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4868        cx_a.foreground().forbid_parking();
4869
4870        // Connect to a server as 2 clients.
4871        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4872        let client_a = server.create_client(cx_a, "user_a").await;
4873        let client_b = server.create_client(cx_b, "user_b").await;
4874        let mut status_b = client_b.status();
4875
4876        // Create an org that includes these 2 users.
4877        let db = &server.app_state.db;
4878        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4879        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4880            .await
4881            .unwrap();
4882        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4883            .await
4884            .unwrap();
4885
4886        // Create a channel that includes all the users.
4887        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4888        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4889            .await
4890            .unwrap();
4891        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4892            .await
4893            .unwrap();
4894        db.create_channel_message(
4895            channel_id,
4896            client_b.current_user_id(&cx_b),
4897            "hello A, it's B.",
4898            OffsetDateTime::now_utc(),
4899            2,
4900        )
4901        .await
4902        .unwrap();
4903
4904        let channels_a = cx_a
4905            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4906        channels_a
4907            .condition(cx_a, |list, _| list.available_channels().is_some())
4908            .await;
4909
4910        channels_a.read_with(cx_a, |list, _| {
4911            assert_eq!(
4912                list.available_channels().unwrap(),
4913                &[ChannelDetails {
4914                    id: channel_id.to_proto(),
4915                    name: "test-channel".to_string()
4916                }]
4917            )
4918        });
4919        let channel_a = channels_a.update(cx_a, |this, cx| {
4920            this.get_channel(channel_id.to_proto(), cx).unwrap()
4921        });
4922        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4923        channel_a
4924            .condition(&cx_a, |channel, _| {
4925                channel_messages(channel)
4926                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4927            })
4928            .await;
4929
4930        let channels_b = cx_b
4931            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4932        channels_b
4933            .condition(cx_b, |list, _| list.available_channels().is_some())
4934            .await;
4935        channels_b.read_with(cx_b, |list, _| {
4936            assert_eq!(
4937                list.available_channels().unwrap(),
4938                &[ChannelDetails {
4939                    id: channel_id.to_proto(),
4940                    name: "test-channel".to_string()
4941                }]
4942            )
4943        });
4944
4945        let channel_b = channels_b.update(cx_b, |this, cx| {
4946            this.get_channel(channel_id.to_proto(), cx).unwrap()
4947        });
4948        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4949        channel_b
4950            .condition(&cx_b, |channel, _| {
4951                channel_messages(channel)
4952                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4953            })
4954            .await;
4955
4956        // Disconnect client B, ensuring we can still access its cached channel data.
4957        server.forbid_connections();
4958        server.disconnect_client(client_b.current_user_id(&cx_b));
4959        cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4960        while !matches!(
4961            status_b.next().await,
4962            Some(client::Status::ReconnectionError { .. })
4963        ) {}
4964
4965        channels_b.read_with(cx_b, |channels, _| {
4966            assert_eq!(
4967                channels.available_channels().unwrap(),
4968                [ChannelDetails {
4969                    id: channel_id.to_proto(),
4970                    name: "test-channel".to_string()
4971                }]
4972            )
4973        });
4974        channel_b.read_with(cx_b, |channel, _| {
4975            assert_eq!(
4976                channel_messages(channel),
4977                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4978            )
4979        });
4980
4981        // Send a message from client B while it is disconnected.
4982        channel_b
4983            .update(cx_b, |channel, cx| {
4984                let task = channel
4985                    .send_message("can you see this?".to_string(), cx)
4986                    .unwrap();
4987                assert_eq!(
4988                    channel_messages(channel),
4989                    &[
4990                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4991                        ("user_b".to_string(), "can you see this?".to_string(), true)
4992                    ]
4993                );
4994                task
4995            })
4996            .await
4997            .unwrap_err();
4998
4999        // Send a message from client A while B is disconnected.
5000        channel_a
5001            .update(cx_a, |channel, cx| {
5002                channel
5003                    .send_message("oh, hi B.".to_string(), cx)
5004                    .unwrap()
5005                    .detach();
5006                let task = channel.send_message("sup".to_string(), cx).unwrap();
5007                assert_eq!(
5008                    channel_messages(channel),
5009                    &[
5010                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5011                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
5012                        ("user_a".to_string(), "sup".to_string(), true)
5013                    ]
5014                );
5015                task
5016            })
5017            .await
5018            .unwrap();
5019
5020        // Give client B a chance to reconnect.
5021        server.allow_connections();
5022        cx_b.foreground().advance_clock(Duration::from_secs(10));
5023
5024        // Verify that B sees the new messages upon reconnection, as well as the message client B
5025        // sent while offline.
5026        channel_b
5027            .condition(&cx_b, |channel, _| {
5028                channel_messages(channel)
5029                    == [
5030                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5031                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
5032                        ("user_a".to_string(), "sup".to_string(), false),
5033                        ("user_b".to_string(), "can you see this?".to_string(), false),
5034                    ]
5035            })
5036            .await;
5037
5038        // Ensure client A and B can communicate normally after reconnection.
5039        channel_a
5040            .update(cx_a, |channel, cx| {
5041                channel.send_message("you online?".to_string(), cx).unwrap()
5042            })
5043            .await
5044            .unwrap();
5045        channel_b
5046            .condition(&cx_b, |channel, _| {
5047                channel_messages(channel)
5048                    == [
5049                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5050                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
5051                        ("user_a".to_string(), "sup".to_string(), false),
5052                        ("user_b".to_string(), "can you see this?".to_string(), false),
5053                        ("user_a".to_string(), "you online?".to_string(), false),
5054                    ]
5055            })
5056            .await;
5057
5058        channel_b
5059            .update(cx_b, |channel, cx| {
5060                channel.send_message("yep".to_string(), cx).unwrap()
5061            })
5062            .await
5063            .unwrap();
5064        channel_a
5065            .condition(&cx_a, |channel, _| {
5066                channel_messages(channel)
5067                    == [
5068                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5069                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
5070                        ("user_a".to_string(), "sup".to_string(), false),
5071                        ("user_b".to_string(), "can you see this?".to_string(), false),
5072                        ("user_a".to_string(), "you online?".to_string(), false),
5073                        ("user_b".to_string(), "yep".to_string(), false),
5074                    ]
5075            })
5076            .await;
5077    }
5078
5079    #[gpui::test(iterations = 10)]
5080    async fn test_contacts(
5081        deterministic: Arc<Deterministic>,
5082        cx_a: &mut TestAppContext,
5083        cx_b: &mut TestAppContext,
5084        cx_c: &mut TestAppContext,
5085    ) {
5086        cx_a.foreground().forbid_parking();
5087
5088        // Connect to a server as 3 clients.
5089        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5090        let mut client_a = server.create_client(cx_a, "user_a").await;
5091        let mut client_b = server.create_client(cx_b, "user_b").await;
5092        let client_c = server.create_client(cx_c, "user_c").await;
5093        server
5094            .make_contacts(vec![
5095                (&client_a, cx_a),
5096                (&client_b, cx_b),
5097                (&client_c, cx_c),
5098            ])
5099            .await;
5100
5101        deterministic.run_until_parked();
5102        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5103            client.user_store.read_with(*cx, |store, _| {
5104                assert_eq!(
5105                    contacts(store),
5106                    [
5107                        ("user_a", true, vec![]),
5108                        ("user_b", true, vec![]),
5109                        ("user_c", true, vec![])
5110                    ],
5111                    "{} has the wrong contacts",
5112                    client.username
5113                )
5114            });
5115        }
5116
5117        // Share a project as client A.
5118        let fs = FakeFs::new(cx_a.background());
5119        fs.create_dir(Path::new("/a")).await.unwrap();
5120        let (project_a, _) = client_a.build_local_project(fs, "/a", cx_a).await;
5121
5122        deterministic.run_until_parked();
5123        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5124            client.user_store.read_with(*cx, |store, _| {
5125                assert_eq!(
5126                    contacts(store),
5127                    [
5128                        ("user_a", true, vec![("a", vec![])]),
5129                        ("user_b", true, vec![]),
5130                        ("user_c", true, vec![])
5131                    ],
5132                    "{} has the wrong contacts",
5133                    client.username
5134                )
5135            });
5136        }
5137
5138        let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5139
5140        deterministic.run_until_parked();
5141        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5142            client.user_store.read_with(*cx, |store, _| {
5143                assert_eq!(
5144                    contacts(store),
5145                    [
5146                        ("user_a", true, vec![("a", vec!["user_b"])]),
5147                        ("user_b", true, vec![]),
5148                        ("user_c", true, vec![])
5149                    ],
5150                    "{} has the wrong contacts",
5151                    client.username
5152                )
5153            });
5154        }
5155
5156        // Add a local project as client B
5157        let fs = FakeFs::new(cx_b.background());
5158        fs.create_dir(Path::new("/b")).await.unwrap();
5159        let (_project_b, _) = client_b.build_local_project(fs, "/b", cx_a).await;
5160
5161        deterministic.run_until_parked();
5162        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5163            client.user_store.read_with(*cx, |store, _| {
5164                assert_eq!(
5165                    contacts(store),
5166                    [
5167                        ("user_a", true, vec![("a", vec!["user_b"])]),
5168                        ("user_b", true, vec![("b", vec![])]),
5169                        ("user_c", true, vec![])
5170                    ],
5171                    "{} has the wrong contacts",
5172                    client.username
5173                )
5174            });
5175        }
5176
5177        project_a
5178            .condition(&cx_a, |project, _| {
5179                project.collaborators().contains_key(&client_b.peer_id)
5180            })
5181            .await;
5182
5183        client_a.project.take();
5184        cx_a.update(move |_| drop(project_a));
5185        deterministic.run_until_parked();
5186        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5187            client.user_store.read_with(*cx, |store, _| {
5188                assert_eq!(
5189                    contacts(store),
5190                    [
5191                        ("user_a", true, vec![]),
5192                        ("user_b", true, vec![("b", vec![])]),
5193                        ("user_c", true, vec![])
5194                    ],
5195                    "{} has the wrong contacts",
5196                    client.username
5197                )
5198            });
5199        }
5200
5201        server.disconnect_client(client_c.current_user_id(cx_c));
5202        server.forbid_connections();
5203        deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5204        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5205            client.user_store.read_with(*cx, |store, _| {
5206                assert_eq!(
5207                    contacts(store),
5208                    [
5209                        ("user_a", true, vec![]),
5210                        ("user_b", true, vec![("b", vec![])]),
5211                        ("user_c", false, vec![])
5212                    ],
5213                    "{} has the wrong contacts",
5214                    client.username
5215                )
5216            });
5217        }
5218        client_c
5219            .user_store
5220            .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5221
5222        server.allow_connections();
5223        client_c
5224            .authenticate_and_connect(false, &cx_c.to_async())
5225            .await
5226            .unwrap();
5227
5228        deterministic.run_until_parked();
5229        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5230            client.user_store.read_with(*cx, |store, _| {
5231                assert_eq!(
5232                    contacts(store),
5233                    [
5234                        ("user_a", true, vec![]),
5235                        ("user_b", true, vec![("b", vec![])]),
5236                        ("user_c", true, vec![])
5237                    ],
5238                    "{} has the wrong contacts",
5239                    client.username
5240                )
5241            });
5242        }
5243
5244        fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, Vec<&str>)>)> {
5245            user_store
5246                .contacts()
5247                .iter()
5248                .map(|contact| {
5249                    let projects = contact
5250                        .projects
5251                        .iter()
5252                        .map(|p| {
5253                            (
5254                                p.worktree_root_names[0].as_str(),
5255                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5256                            )
5257                        })
5258                        .collect();
5259                    (contact.user.github_login.as_str(), contact.online, projects)
5260                })
5261                .collect()
5262        }
5263    }
5264
5265    #[gpui::test(iterations = 10)]
5266    async fn test_contact_requests(
5267        executor: Arc<Deterministic>,
5268        cx_a: &mut TestAppContext,
5269        cx_a2: &mut TestAppContext,
5270        cx_b: &mut TestAppContext,
5271        cx_b2: &mut TestAppContext,
5272        cx_c: &mut TestAppContext,
5273        cx_c2: &mut TestAppContext,
5274    ) {
5275        cx_a.foreground().forbid_parking();
5276
5277        // Connect to a server as 3 clients.
5278        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5279        let client_a = server.create_client(cx_a, "user_a").await;
5280        let client_a2 = server.create_client(cx_a2, "user_a").await;
5281        let client_b = server.create_client(cx_b, "user_b").await;
5282        let client_b2 = server.create_client(cx_b2, "user_b").await;
5283        let client_c = server.create_client(cx_c, "user_c").await;
5284        let client_c2 = server.create_client(cx_c2, "user_c").await;
5285
5286        assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5287        assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5288        assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5289
5290        // User A and User C request that user B become their contact.
5291        client_a
5292            .user_store
5293            .update(cx_a, |store, cx| {
5294                store.request_contact(client_b.user_id().unwrap(), cx)
5295            })
5296            .await
5297            .unwrap();
5298        client_c
5299            .user_store
5300            .update(cx_c, |store, cx| {
5301                store.request_contact(client_b.user_id().unwrap(), cx)
5302            })
5303            .await
5304            .unwrap();
5305        executor.run_until_parked();
5306
5307        // All users see the pending request appear in all their clients.
5308        assert_eq!(
5309            client_a.summarize_contacts(&cx_a).outgoing_requests,
5310            &["user_b"]
5311        );
5312        assert_eq!(
5313            client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5314            &["user_b"]
5315        );
5316        assert_eq!(
5317            client_b.summarize_contacts(&cx_b).incoming_requests,
5318            &["user_a", "user_c"]
5319        );
5320        assert_eq!(
5321            client_b2.summarize_contacts(&cx_b2).incoming_requests,
5322            &["user_a", "user_c"]
5323        );
5324        assert_eq!(
5325            client_c.summarize_contacts(&cx_c).outgoing_requests,
5326            &["user_b"]
5327        );
5328        assert_eq!(
5329            client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5330            &["user_b"]
5331        );
5332
5333        // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5334        disconnect_and_reconnect(&client_a, cx_a).await;
5335        disconnect_and_reconnect(&client_b, cx_b).await;
5336        disconnect_and_reconnect(&client_c, cx_c).await;
5337        executor.run_until_parked();
5338        assert_eq!(
5339            client_a.summarize_contacts(&cx_a).outgoing_requests,
5340            &["user_b"]
5341        );
5342        assert_eq!(
5343            client_b.summarize_contacts(&cx_b).incoming_requests,
5344            &["user_a", "user_c"]
5345        );
5346        assert_eq!(
5347            client_c.summarize_contacts(&cx_c).outgoing_requests,
5348            &["user_b"]
5349        );
5350
5351        // User B accepts the request from user A.
5352        client_b
5353            .user_store
5354            .update(cx_b, |store, cx| {
5355                store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5356            })
5357            .await
5358            .unwrap();
5359
5360        executor.run_until_parked();
5361
5362        // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5363        let contacts_b = client_b.summarize_contacts(&cx_b);
5364        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5365        assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5366        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5367        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5368        assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5369
5370        // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5371        let contacts_a = client_a.summarize_contacts(&cx_a);
5372        assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5373        assert!(contacts_a.outgoing_requests.is_empty());
5374        let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5375        assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5376        assert!(contacts_a2.outgoing_requests.is_empty());
5377
5378        // Contacts are present upon connecting (tested here via disconnect/reconnect)
5379        disconnect_and_reconnect(&client_a, cx_a).await;
5380        disconnect_and_reconnect(&client_b, cx_b).await;
5381        disconnect_and_reconnect(&client_c, cx_c).await;
5382        executor.run_until_parked();
5383        assert_eq!(
5384            client_a.summarize_contacts(&cx_a).current,
5385            &["user_a", "user_b"]
5386        );
5387        assert_eq!(
5388            client_b.summarize_contacts(&cx_b).current,
5389            &["user_a", "user_b"]
5390        );
5391        assert_eq!(
5392            client_b.summarize_contacts(&cx_b).incoming_requests,
5393            &["user_c"]
5394        );
5395        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5396        assert_eq!(
5397            client_c.summarize_contacts(&cx_c).outgoing_requests,
5398            &["user_b"]
5399        );
5400
5401        // User B rejects the request from user C.
5402        client_b
5403            .user_store
5404            .update(cx_b, |store, cx| {
5405                store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5406            })
5407            .await
5408            .unwrap();
5409
5410        executor.run_until_parked();
5411
5412        // User B doesn't see user C as their contact, and the incoming request from them is removed.
5413        let contacts_b = client_b.summarize_contacts(&cx_b);
5414        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5415        assert!(contacts_b.incoming_requests.is_empty());
5416        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5417        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5418        assert!(contacts_b2.incoming_requests.is_empty());
5419
5420        // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5421        let contacts_c = client_c.summarize_contacts(&cx_c);
5422        assert_eq!(contacts_c.current, &["user_c"]);
5423        assert!(contacts_c.outgoing_requests.is_empty());
5424        let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5425        assert_eq!(contacts_c2.current, &["user_c"]);
5426        assert!(contacts_c2.outgoing_requests.is_empty());
5427
5428        // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5429        disconnect_and_reconnect(&client_a, cx_a).await;
5430        disconnect_and_reconnect(&client_b, cx_b).await;
5431        disconnect_and_reconnect(&client_c, cx_c).await;
5432        executor.run_until_parked();
5433        assert_eq!(
5434            client_a.summarize_contacts(&cx_a).current,
5435            &["user_a", "user_b"]
5436        );
5437        assert_eq!(
5438            client_b.summarize_contacts(&cx_b).current,
5439            &["user_a", "user_b"]
5440        );
5441        assert!(client_b
5442            .summarize_contacts(&cx_b)
5443            .incoming_requests
5444            .is_empty());
5445        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5446        assert!(client_c
5447            .summarize_contacts(&cx_c)
5448            .outgoing_requests
5449            .is_empty());
5450
5451        async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5452            client.disconnect(&cx.to_async()).unwrap();
5453            client.clear_contacts(cx).await;
5454            client
5455                .authenticate_and_connect(false, &cx.to_async())
5456                .await
5457                .unwrap();
5458        }
5459    }
5460
5461    #[gpui::test(iterations = 10)]
5462    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5463        cx_a.foreground().forbid_parking();
5464        let fs = FakeFs::new(cx_a.background());
5465
5466        // 2 clients connect to a server.
5467        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5468        let mut client_a = server.create_client(cx_a, "user_a").await;
5469        let mut client_b = server.create_client(cx_b, "user_b").await;
5470        server
5471            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5472            .await;
5473        cx_a.update(editor::init);
5474        cx_b.update(editor::init);
5475
5476        // Client A shares a project.
5477        fs.insert_tree(
5478            "/a",
5479            json!({
5480                "1.txt": "one",
5481                "2.txt": "two",
5482                "3.txt": "three",
5483            }),
5484        )
5485        .await;
5486        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5487
5488        // Client B joins the project.
5489        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5490
5491        // Client A opens some editors.
5492        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5493        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5494        let editor_a1 = workspace_a
5495            .update(cx_a, |workspace, cx| {
5496                workspace.open_path((worktree_id, "1.txt"), true, cx)
5497            })
5498            .await
5499            .unwrap()
5500            .downcast::<Editor>()
5501            .unwrap();
5502        let editor_a2 = workspace_a
5503            .update(cx_a, |workspace, cx| {
5504                workspace.open_path((worktree_id, "2.txt"), true, cx)
5505            })
5506            .await
5507            .unwrap()
5508            .downcast::<Editor>()
5509            .unwrap();
5510
5511        // Client B opens an editor.
5512        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5513        let editor_b1 = workspace_b
5514            .update(cx_b, |workspace, cx| {
5515                workspace.open_path((worktree_id, "1.txt"), true, cx)
5516            })
5517            .await
5518            .unwrap()
5519            .downcast::<Editor>()
5520            .unwrap();
5521
5522        let client_a_id = project_b.read_with(cx_b, |project, _| {
5523            project.collaborators().values().next().unwrap().peer_id
5524        });
5525        let client_b_id = project_a.read_with(cx_a, |project, _| {
5526            project.collaborators().values().next().unwrap().peer_id
5527        });
5528
5529        // When client B starts following client A, all visible view states are replicated to client B.
5530        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5531        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5532        workspace_b
5533            .update(cx_b, |workspace, cx| {
5534                workspace
5535                    .toggle_follow(&ToggleFollow(client_a_id), cx)
5536                    .unwrap()
5537            })
5538            .await
5539            .unwrap();
5540
5541        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5542            workspace
5543                .active_item(cx)
5544                .unwrap()
5545                .downcast::<Editor>()
5546                .unwrap()
5547        });
5548        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5549        assert_eq!(
5550            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5551            Some((worktree_id, "2.txt").into())
5552        );
5553        assert_eq!(
5554            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5555            vec![2..3]
5556        );
5557        assert_eq!(
5558            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5559            vec![0..1]
5560        );
5561
5562        // When client A activates a different editor, client B does so as well.
5563        workspace_a.update(cx_a, |workspace, cx| {
5564            workspace.activate_item(&editor_a1, cx)
5565        });
5566        workspace_b
5567            .condition(cx_b, |workspace, cx| {
5568                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5569            })
5570            .await;
5571
5572        // When client A navigates back and forth, client B does so as well.
5573        workspace_a
5574            .update(cx_a, |workspace, cx| {
5575                workspace::Pane::go_back(workspace, None, cx)
5576            })
5577            .await;
5578        workspace_b
5579            .condition(cx_b, |workspace, cx| {
5580                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5581            })
5582            .await;
5583
5584        workspace_a
5585            .update(cx_a, |workspace, cx| {
5586                workspace::Pane::go_forward(workspace, None, cx)
5587            })
5588            .await;
5589        workspace_b
5590            .condition(cx_b, |workspace, cx| {
5591                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5592            })
5593            .await;
5594
5595        // Changes to client A's editor are reflected on client B.
5596        editor_a1.update(cx_a, |editor, cx| {
5597            editor.select_ranges([1..1, 2..2], None, cx);
5598        });
5599        editor_b1
5600            .condition(cx_b, |editor, cx| {
5601                editor.selected_ranges(cx) == vec![1..1, 2..2]
5602            })
5603            .await;
5604
5605        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5606        editor_b1
5607            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5608            .await;
5609
5610        editor_a1.update(cx_a, |editor, cx| {
5611            editor.select_ranges([3..3], None, cx);
5612            editor.set_scroll_position(vec2f(0., 100.), cx);
5613        });
5614        editor_b1
5615            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5616            .await;
5617
5618        // After unfollowing, client B stops receiving updates from client A.
5619        workspace_b.update(cx_b, |workspace, cx| {
5620            workspace.unfollow(&workspace.active_pane().clone(), cx)
5621        });
5622        workspace_a.update(cx_a, |workspace, cx| {
5623            workspace.activate_item(&editor_a2, cx)
5624        });
5625        cx_a.foreground().run_until_parked();
5626        assert_eq!(
5627            workspace_b.read_with(cx_b, |workspace, cx| workspace
5628                .active_item(cx)
5629                .unwrap()
5630                .id()),
5631            editor_b1.id()
5632        );
5633
5634        // Client A starts following client B.
5635        workspace_a
5636            .update(cx_a, |workspace, cx| {
5637                workspace
5638                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5639                    .unwrap()
5640            })
5641            .await
5642            .unwrap();
5643        assert_eq!(
5644            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5645            Some(client_b_id)
5646        );
5647        assert_eq!(
5648            workspace_a.read_with(cx_a, |workspace, cx| workspace
5649                .active_item(cx)
5650                .unwrap()
5651                .id()),
5652            editor_a1.id()
5653        );
5654
5655        // Following interrupts when client B disconnects.
5656        client_b.disconnect(&cx_b.to_async()).unwrap();
5657        cx_a.foreground().run_until_parked();
5658        assert_eq!(
5659            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5660            None
5661        );
5662    }
5663
5664    #[gpui::test(iterations = 10)]
5665    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5666        cx_a.foreground().forbid_parking();
5667        let fs = FakeFs::new(cx_a.background());
5668
5669        // 2 clients connect to a server.
5670        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5671        let mut client_a = server.create_client(cx_a, "user_a").await;
5672        let mut client_b = server.create_client(cx_b, "user_b").await;
5673        server
5674            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5675            .await;
5676        cx_a.update(editor::init);
5677        cx_b.update(editor::init);
5678
5679        // Client A shares a project.
5680        fs.insert_tree(
5681            "/a",
5682            json!({
5683                "1.txt": "one",
5684                "2.txt": "two",
5685                "3.txt": "three",
5686                "4.txt": "four",
5687            }),
5688        )
5689        .await;
5690        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5691
5692        // Client B joins the project.
5693        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5694
5695        // Client A opens some editors.
5696        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5697        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5698        let _editor_a1 = workspace_a
5699            .update(cx_a, |workspace, cx| {
5700                workspace.open_path((worktree_id, "1.txt"), true, cx)
5701            })
5702            .await
5703            .unwrap()
5704            .downcast::<Editor>()
5705            .unwrap();
5706
5707        // Client B opens an editor.
5708        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5709        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5710        let _editor_b1 = workspace_b
5711            .update(cx_b, |workspace, cx| {
5712                workspace.open_path((worktree_id, "2.txt"), true, cx)
5713            })
5714            .await
5715            .unwrap()
5716            .downcast::<Editor>()
5717            .unwrap();
5718
5719        // Clients A and B follow each other in split panes
5720        workspace_a
5721            .update(cx_a, |workspace, cx| {
5722                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5723                assert_ne!(*workspace.active_pane(), pane_a1);
5724                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5725                workspace
5726                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5727                    .unwrap()
5728            })
5729            .await
5730            .unwrap();
5731        workspace_b
5732            .update(cx_b, |workspace, cx| {
5733                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5734                assert_ne!(*workspace.active_pane(), pane_b1);
5735                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5736                workspace
5737                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5738                    .unwrap()
5739            })
5740            .await
5741            .unwrap();
5742
5743        workspace_a
5744            .update(cx_a, |workspace, cx| {
5745                workspace.activate_next_pane(cx);
5746                assert_eq!(*workspace.active_pane(), pane_a1);
5747                workspace.open_path((worktree_id, "3.txt"), true, cx)
5748            })
5749            .await
5750            .unwrap();
5751        workspace_b
5752            .update(cx_b, |workspace, cx| {
5753                workspace.activate_next_pane(cx);
5754                assert_eq!(*workspace.active_pane(), pane_b1);
5755                workspace.open_path((worktree_id, "4.txt"), true, cx)
5756            })
5757            .await
5758            .unwrap();
5759        cx_a.foreground().run_until_parked();
5760
5761        // Ensure leader updates don't change the active pane of followers
5762        workspace_a.read_with(cx_a, |workspace, _| {
5763            assert_eq!(*workspace.active_pane(), pane_a1);
5764        });
5765        workspace_b.read_with(cx_b, |workspace, _| {
5766            assert_eq!(*workspace.active_pane(), pane_b1);
5767        });
5768
5769        // Ensure peers following each other doesn't cause an infinite loop.
5770        assert_eq!(
5771            workspace_a.read_with(cx_a, |workspace, cx| workspace
5772                .active_item(cx)
5773                .unwrap()
5774                .project_path(cx)),
5775            Some((worktree_id, "3.txt").into())
5776        );
5777        workspace_a.update(cx_a, |workspace, cx| {
5778            assert_eq!(
5779                workspace.active_item(cx).unwrap().project_path(cx),
5780                Some((worktree_id, "3.txt").into())
5781            );
5782            workspace.activate_next_pane(cx);
5783            assert_eq!(
5784                workspace.active_item(cx).unwrap().project_path(cx),
5785                Some((worktree_id, "4.txt").into())
5786            );
5787        });
5788        workspace_b.update(cx_b, |workspace, cx| {
5789            assert_eq!(
5790                workspace.active_item(cx).unwrap().project_path(cx),
5791                Some((worktree_id, "4.txt").into())
5792            );
5793            workspace.activate_next_pane(cx);
5794            assert_eq!(
5795                workspace.active_item(cx).unwrap().project_path(cx),
5796                Some((worktree_id, "3.txt").into())
5797            );
5798        });
5799    }
5800
5801    #[gpui::test(iterations = 10)]
5802    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5803        cx_a.foreground().forbid_parking();
5804        let fs = FakeFs::new(cx_a.background());
5805
5806        // 2 clients connect to a server.
5807        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5808        let mut client_a = server.create_client(cx_a, "user_a").await;
5809        let mut client_b = server.create_client(cx_b, "user_b").await;
5810        server
5811            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5812            .await;
5813        cx_a.update(editor::init);
5814        cx_b.update(editor::init);
5815
5816        // Client A shares a project.
5817        fs.insert_tree(
5818            "/a",
5819            json!({
5820                "1.txt": "one",
5821                "2.txt": "two",
5822                "3.txt": "three",
5823            }),
5824        )
5825        .await;
5826        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5827
5828        // Client B joins the project.
5829        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5830
5831        // Client A opens some editors.
5832        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5833        let _editor_a1 = workspace_a
5834            .update(cx_a, |workspace, cx| {
5835                workspace.open_path((worktree_id, "1.txt"), true, cx)
5836            })
5837            .await
5838            .unwrap()
5839            .downcast::<Editor>()
5840            .unwrap();
5841
5842        // Client B starts following client A.
5843        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5844        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5845        let leader_id = project_b.read_with(cx_b, |project, _| {
5846            project.collaborators().values().next().unwrap().peer_id
5847        });
5848        workspace_b
5849            .update(cx_b, |workspace, cx| {
5850                workspace
5851                    .toggle_follow(&ToggleFollow(leader_id), cx)
5852                    .unwrap()
5853            })
5854            .await
5855            .unwrap();
5856        assert_eq!(
5857            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5858            Some(leader_id)
5859        );
5860        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5861            workspace
5862                .active_item(cx)
5863                .unwrap()
5864                .downcast::<Editor>()
5865                .unwrap()
5866        });
5867
5868        // When client B moves, it automatically stops following client A.
5869        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5870        assert_eq!(
5871            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5872            None
5873        );
5874
5875        workspace_b
5876            .update(cx_b, |workspace, cx| {
5877                workspace
5878                    .toggle_follow(&ToggleFollow(leader_id), cx)
5879                    .unwrap()
5880            })
5881            .await
5882            .unwrap();
5883        assert_eq!(
5884            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5885            Some(leader_id)
5886        );
5887
5888        // When client B edits, it automatically stops following client A.
5889        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5890        assert_eq!(
5891            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5892            None
5893        );
5894
5895        workspace_b
5896            .update(cx_b, |workspace, cx| {
5897                workspace
5898                    .toggle_follow(&ToggleFollow(leader_id), cx)
5899                    .unwrap()
5900            })
5901            .await
5902            .unwrap();
5903        assert_eq!(
5904            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5905            Some(leader_id)
5906        );
5907
5908        // When client B scrolls, it automatically stops following client A.
5909        editor_b2.update(cx_b, |editor, cx| {
5910            editor.set_scroll_position(vec2f(0., 3.), cx)
5911        });
5912        assert_eq!(
5913            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5914            None
5915        );
5916
5917        workspace_b
5918            .update(cx_b, |workspace, cx| {
5919                workspace
5920                    .toggle_follow(&ToggleFollow(leader_id), cx)
5921                    .unwrap()
5922            })
5923            .await
5924            .unwrap();
5925        assert_eq!(
5926            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5927            Some(leader_id)
5928        );
5929
5930        // When client B activates a different pane, it continues following client A in the original pane.
5931        workspace_b.update(cx_b, |workspace, cx| {
5932            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5933        });
5934        assert_eq!(
5935            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5936            Some(leader_id)
5937        );
5938
5939        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5940        assert_eq!(
5941            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5942            Some(leader_id)
5943        );
5944
5945        // When client B activates a different item in the original pane, it automatically stops following client A.
5946        workspace_b
5947            .update(cx_b, |workspace, cx| {
5948                workspace.open_path((worktree_id, "2.txt"), true, cx)
5949            })
5950            .await
5951            .unwrap();
5952        assert_eq!(
5953            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5954            None
5955        );
5956    }
5957
5958    #[gpui::test(iterations = 100)]
5959    async fn test_random_collaboration(
5960        cx: &mut TestAppContext,
5961        deterministic: Arc<Deterministic>,
5962        rng: StdRng,
5963    ) {
5964        cx.foreground().forbid_parking();
5965        let max_peers = env::var("MAX_PEERS")
5966            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5967            .unwrap_or(5);
5968        assert!(max_peers <= 5);
5969
5970        let max_operations = env::var("OPERATIONS")
5971            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5972            .unwrap_or(10);
5973
5974        let rng = Arc::new(Mutex::new(rng));
5975
5976        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5977        let host_language_registry = Arc::new(LanguageRegistry::test());
5978
5979        let fs = FakeFs::new(cx.background());
5980        fs.insert_tree("/_collab", json!({"init": ""})).await;
5981
5982        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5983        let db = server.app_state.db.clone();
5984        let host_user_id = db.create_user("host", false).await.unwrap();
5985        for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
5986            let guest_user_id = db.create_user(username, false).await.unwrap();
5987            server
5988                .app_state
5989                .db
5990                .send_contact_request(guest_user_id, host_user_id)
5991                .await
5992                .unwrap();
5993            server
5994                .app_state
5995                .db
5996                .respond_to_contact_request(host_user_id, guest_user_id, true)
5997                .await
5998                .unwrap();
5999        }
6000
6001        let mut clients = Vec::new();
6002        let mut user_ids = Vec::new();
6003        let mut op_start_signals = Vec::new();
6004
6005        let mut next_entity_id = 100000;
6006        let mut host_cx = TestAppContext::new(
6007            cx.foreground_platform(),
6008            cx.platform(),
6009            deterministic.build_foreground(next_entity_id),
6010            deterministic.build_background(),
6011            cx.font_cache(),
6012            cx.leak_detector(),
6013            next_entity_id,
6014        );
6015        let host = server.create_client(&mut host_cx, "host").await;
6016        let host_project = host_cx.update(|cx| {
6017            Project::local(
6018                host.client.clone(),
6019                host.user_store.clone(),
6020                host_language_registry.clone(),
6021                fs.clone(),
6022                cx,
6023            )
6024        });
6025        let host_project_id = host_project
6026            .update(&mut host_cx, |p, _| p.next_remote_id())
6027            .await;
6028
6029        let (collab_worktree, _) = host_project
6030            .update(&mut host_cx, |project, cx| {
6031                project.find_or_create_local_worktree("/_collab", true, cx)
6032            })
6033            .await
6034            .unwrap();
6035        collab_worktree
6036            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
6037            .await;
6038
6039        // Set up fake language servers.
6040        let mut language = Language::new(
6041            LanguageConfig {
6042                name: "Rust".into(),
6043                path_suffixes: vec!["rs".to_string()],
6044                ..Default::default()
6045            },
6046            None,
6047        );
6048        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6049            name: "the-fake-language-server",
6050            capabilities: lsp::LanguageServer::full_capabilities(),
6051            initializer: Some(Box::new({
6052                let rng = rng.clone();
6053                let fs = fs.clone();
6054                let project = host_project.downgrade();
6055                move |fake_server: &mut FakeLanguageServer| {
6056                    fake_server.handle_request::<lsp::request::Completion, _, _>(
6057                        |_, _| async move {
6058                            Ok(Some(lsp::CompletionResponse::Array(vec![
6059                                lsp::CompletionItem {
6060                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6061                                        range: lsp::Range::new(
6062                                            lsp::Position::new(0, 0),
6063                                            lsp::Position::new(0, 0),
6064                                        ),
6065                                        new_text: "the-new-text".to_string(),
6066                                    })),
6067                                    ..Default::default()
6068                                },
6069                            ])))
6070                        },
6071                    );
6072
6073                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6074                        |_, _| async move {
6075                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6076                                lsp::CodeAction {
6077                                    title: "the-code-action".to_string(),
6078                                    ..Default::default()
6079                                },
6080                            )]))
6081                        },
6082                    );
6083
6084                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6085                        |params, _| async move {
6086                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6087                                params.position,
6088                                params.position,
6089                            ))))
6090                        },
6091                    );
6092
6093                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6094                        let fs = fs.clone();
6095                        let rng = rng.clone();
6096                        move |_, _| {
6097                            let fs = fs.clone();
6098                            let rng = rng.clone();
6099                            async move {
6100                                let files = fs.files().await;
6101                                let mut rng = rng.lock();
6102                                let count = rng.gen_range::<usize, _>(1..3);
6103                                let files = (0..count)
6104                                    .map(|_| files.choose(&mut *rng).unwrap())
6105                                    .collect::<Vec<_>>();
6106                                log::info!("LSP: Returning definitions in files {:?}", &files);
6107                                Ok(Some(lsp::GotoDefinitionResponse::Array(
6108                                    files
6109                                        .into_iter()
6110                                        .map(|file| lsp::Location {
6111                                            uri: lsp::Url::from_file_path(file).unwrap(),
6112                                            range: Default::default(),
6113                                        })
6114                                        .collect(),
6115                                )))
6116                            }
6117                        }
6118                    });
6119
6120                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6121                        let rng = rng.clone();
6122                        let project = project.clone();
6123                        move |params, mut cx| {
6124                            let highlights = if let Some(project) = project.upgrade(&cx) {
6125                                project.update(&mut cx, |project, cx| {
6126                                    let path = params
6127                                        .text_document_position_params
6128                                        .text_document
6129                                        .uri
6130                                        .to_file_path()
6131                                        .unwrap();
6132                                    let (worktree, relative_path) =
6133                                        project.find_local_worktree(&path, cx)?;
6134                                    let project_path =
6135                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
6136                                    let buffer =
6137                                        project.get_open_buffer(&project_path, cx)?.read(cx);
6138
6139                                    let mut highlights = Vec::new();
6140                                    let highlight_count = rng.lock().gen_range(1..=5);
6141                                    let mut prev_end = 0;
6142                                    for _ in 0..highlight_count {
6143                                        let range =
6144                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
6145
6146                                        highlights.push(lsp::DocumentHighlight {
6147                                            range: range_to_lsp(range.to_point_utf16(buffer)),
6148                                            kind: Some(lsp::DocumentHighlightKind::READ),
6149                                        });
6150                                        prev_end = range.end;
6151                                    }
6152                                    Some(highlights)
6153                                })
6154                            } else {
6155                                None
6156                            };
6157                            async move { Ok(highlights) }
6158                        }
6159                    });
6160                }
6161            })),
6162            ..Default::default()
6163        });
6164        host_language_registry.add(Arc::new(language));
6165
6166        let op_start_signal = futures::channel::mpsc::unbounded();
6167        user_ids.push(host.current_user_id(&host_cx));
6168        op_start_signals.push(op_start_signal.0);
6169        clients.push(host_cx.foreground().spawn(host.simulate_host(
6170            host_project,
6171            op_start_signal.1,
6172            rng.clone(),
6173            host_cx,
6174        )));
6175
6176        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6177            rng.lock().gen_range(0..max_operations)
6178        } else {
6179            max_operations
6180        };
6181        let mut available_guests = vec![
6182            "guest-1".to_string(),
6183            "guest-2".to_string(),
6184            "guest-3".to_string(),
6185            "guest-4".to_string(),
6186        ];
6187        let mut operations = 0;
6188        while operations < max_operations {
6189            if operations == disconnect_host_at {
6190                server.disconnect_client(user_ids[0]);
6191                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6192                drop(op_start_signals);
6193                let mut clients = futures::future::join_all(clients).await;
6194                cx.foreground().run_until_parked();
6195
6196                let (host, mut host_cx, host_err) = clients.remove(0);
6197                if let Some(host_err) = host_err {
6198                    log::error!("host error - {:?}", host_err);
6199                }
6200                host.project
6201                    .as_ref()
6202                    .unwrap()
6203                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6204                for (guest, mut guest_cx, guest_err) in clients {
6205                    if let Some(guest_err) = guest_err {
6206                        log::error!("{} error - {:?}", guest.username, guest_err);
6207                    }
6208
6209                    let contacts = server
6210                        .app_state
6211                        .db
6212                        .get_contacts(guest.current_user_id(&guest_cx))
6213                        .await
6214                        .unwrap();
6215                    let contacts = server
6216                        .store
6217                        .read()
6218                        .await
6219                        .build_initial_contacts_update(contacts)
6220                        .contacts;
6221                    assert!(!contacts
6222                        .iter()
6223                        .flat_map(|contact| &contact.projects)
6224                        .any(|project| project.id == host_project_id));
6225                    guest
6226                        .project
6227                        .as_ref()
6228                        .unwrap()
6229                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6230                    guest_cx.update(|_| drop(guest));
6231                }
6232                host_cx.update(|_| drop(host));
6233
6234                return;
6235            }
6236
6237            let distribution = rng.lock().gen_range(0..100);
6238            match distribution {
6239                0..=19 if !available_guests.is_empty() => {
6240                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
6241                    let guest_username = available_guests.remove(guest_ix);
6242                    log::info!("Adding new connection for {}", guest_username);
6243                    next_entity_id += 100000;
6244                    let mut guest_cx = TestAppContext::new(
6245                        cx.foreground_platform(),
6246                        cx.platform(),
6247                        deterministic.build_foreground(next_entity_id),
6248                        deterministic.build_background(),
6249                        cx.font_cache(),
6250                        cx.leak_detector(),
6251                        next_entity_id,
6252                    );
6253                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
6254                    let guest_project = Project::remote(
6255                        host_project_id,
6256                        guest.client.clone(),
6257                        guest.user_store.clone(),
6258                        guest_lang_registry.clone(),
6259                        FakeFs::new(cx.background()),
6260                        &mut guest_cx.to_async(),
6261                    )
6262                    .await
6263                    .unwrap();
6264                    let op_start_signal = futures::channel::mpsc::unbounded();
6265                    user_ids.push(guest.current_user_id(&guest_cx));
6266                    op_start_signals.push(op_start_signal.0);
6267                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6268                        guest_username.clone(),
6269                        guest_project,
6270                        op_start_signal.1,
6271                        rng.clone(),
6272                        guest_cx,
6273                    )));
6274
6275                    log::info!("Added connection for {}", guest_username);
6276                    operations += 1;
6277                }
6278                20..=29 if clients.len() > 1 => {
6279                    let guest_ix = rng.lock().gen_range(1..clients.len());
6280                    log::info!("Removing guest {}", user_ids[guest_ix]);
6281                    let removed_guest_id = user_ids.remove(guest_ix);
6282                    let guest = clients.remove(guest_ix);
6283                    op_start_signals.remove(guest_ix);
6284                    server.forbid_connections();
6285                    server.disconnect_client(removed_guest_id);
6286                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6287                    let (guest, mut guest_cx, guest_err) = guest.await;
6288                    server.allow_connections();
6289
6290                    if let Some(guest_err) = guest_err {
6291                        log::error!("{} error - {:?}", guest.username, guest_err);
6292                    }
6293                    guest
6294                        .project
6295                        .as_ref()
6296                        .unwrap()
6297                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6298                    for user_id in &user_ids {
6299                        let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap();
6300                        let contacts = server
6301                            .store
6302                            .read()
6303                            .await
6304                            .build_initial_contacts_update(contacts)
6305                            .contacts;
6306                        for contact in contacts {
6307                            if contact.online {
6308                                assert_ne!(
6309                                    contact.user_id, removed_guest_id.0 as u64,
6310                                    "removed guest is still a contact of another peer"
6311                                );
6312                            }
6313                            for project in contact.projects {
6314                                for project_guest_id in project.guests {
6315                                    assert_ne!(
6316                                        project_guest_id, removed_guest_id.0 as u64,
6317                                        "removed guest appears as still participating on a project"
6318                                    );
6319                                }
6320                            }
6321                        }
6322                    }
6323
6324                    log::info!("{} removed", guest.username);
6325                    available_guests.push(guest.username.clone());
6326                    guest_cx.update(|_| drop(guest));
6327
6328                    operations += 1;
6329                }
6330                _ => {
6331                    while operations < max_operations && rng.lock().gen_bool(0.7) {
6332                        op_start_signals
6333                            .choose(&mut *rng.lock())
6334                            .unwrap()
6335                            .unbounded_send(())
6336                            .unwrap();
6337                        operations += 1;
6338                    }
6339
6340                    if rng.lock().gen_bool(0.8) {
6341                        cx.foreground().run_until_parked();
6342                    }
6343                }
6344            }
6345        }
6346
6347        drop(op_start_signals);
6348        let mut clients = futures::future::join_all(clients).await;
6349        cx.foreground().run_until_parked();
6350
6351        let (host_client, mut host_cx, host_err) = clients.remove(0);
6352        if let Some(host_err) = host_err {
6353            panic!("host error - {:?}", host_err);
6354        }
6355        let host_project = host_client.project.as_ref().unwrap();
6356        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6357            project
6358                .worktrees(cx)
6359                .map(|worktree| {
6360                    let snapshot = worktree.read(cx).snapshot();
6361                    (snapshot.id(), snapshot)
6362                })
6363                .collect::<BTreeMap<_, _>>()
6364        });
6365
6366        host_client
6367            .project
6368            .as_ref()
6369            .unwrap()
6370            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6371
6372        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6373            if let Some(guest_err) = guest_err {
6374                panic!("{} error - {:?}", guest_client.username, guest_err);
6375            }
6376            let worktree_snapshots =
6377                guest_client
6378                    .project
6379                    .as_ref()
6380                    .unwrap()
6381                    .read_with(&guest_cx, |project, cx| {
6382                        project
6383                            .worktrees(cx)
6384                            .map(|worktree| {
6385                                let worktree = worktree.read(cx);
6386                                (worktree.id(), worktree.snapshot())
6387                            })
6388                            .collect::<BTreeMap<_, _>>()
6389                    });
6390
6391            assert_eq!(
6392                worktree_snapshots.keys().collect::<Vec<_>>(),
6393                host_worktree_snapshots.keys().collect::<Vec<_>>(),
6394                "{} has different worktrees than the host",
6395                guest_client.username
6396            );
6397            for (id, host_snapshot) in &host_worktree_snapshots {
6398                let guest_snapshot = &worktree_snapshots[id];
6399                assert_eq!(
6400                    guest_snapshot.root_name(),
6401                    host_snapshot.root_name(),
6402                    "{} has different root name than the host for worktree {}",
6403                    guest_client.username,
6404                    id
6405                );
6406                assert_eq!(
6407                    guest_snapshot.entries(false).collect::<Vec<_>>(),
6408                    host_snapshot.entries(false).collect::<Vec<_>>(),
6409                    "{} has different snapshot than the host for worktree {}",
6410                    guest_client.username,
6411                    id
6412                );
6413                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6414            }
6415
6416            guest_client
6417                .project
6418                .as_ref()
6419                .unwrap()
6420                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6421
6422            for guest_buffer in &guest_client.buffers {
6423                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6424                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6425                    project.buffer_for_id(buffer_id, cx).expect(&format!(
6426                        "host does not have buffer for guest:{}, peer:{}, id:{}",
6427                        guest_client.username, guest_client.peer_id, buffer_id
6428                    ))
6429                });
6430                let path = host_buffer
6431                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6432
6433                assert_eq!(
6434                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6435                    0,
6436                    "{}, buffer {}, path {:?} has deferred operations",
6437                    guest_client.username,
6438                    buffer_id,
6439                    path,
6440                );
6441                assert_eq!(
6442                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6443                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6444                    "{}, buffer {}, path {:?}, differs from the host's buffer",
6445                    guest_client.username,
6446                    buffer_id,
6447                    path
6448                );
6449            }
6450
6451            guest_cx.update(|_| drop(guest_client));
6452        }
6453
6454        host_cx.update(|_| drop(host_client));
6455    }
6456
6457    struct TestServer {
6458        peer: Arc<Peer>,
6459        app_state: Arc<AppState>,
6460        server: Arc<Server>,
6461        foreground: Rc<executor::Foreground>,
6462        notifications: mpsc::UnboundedReceiver<()>,
6463        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6464        forbid_connections: Arc<AtomicBool>,
6465        _test_db: TestDb,
6466    }
6467
6468    impl TestServer {
6469        async fn start(
6470            foreground: Rc<executor::Foreground>,
6471            background: Arc<executor::Background>,
6472        ) -> Self {
6473            let test_db = TestDb::fake(background);
6474            let app_state = Self::build_app_state(&test_db).await;
6475            let peer = Peer::new();
6476            let notifications = mpsc::unbounded();
6477            let server = Server::new(app_state.clone(), Some(notifications.0));
6478            Self {
6479                peer,
6480                app_state,
6481                server,
6482                foreground,
6483                notifications: notifications.1,
6484                connection_killers: Default::default(),
6485                forbid_connections: Default::default(),
6486                _test_db: test_db,
6487            }
6488        }
6489
6490        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6491            cx.update(|cx| {
6492                let settings = Settings::test(cx);
6493                cx.set_global(settings);
6494            });
6495
6496            let http = FakeHttpClient::with_404_response();
6497            let user_id =
6498                if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6499                    user.id
6500                } else {
6501                    self.app_state.db.create_user(name, false).await.unwrap()
6502                };
6503            let client_name = name.to_string();
6504            let mut client = Client::new(http.clone());
6505            let server = self.server.clone();
6506            let connection_killers = self.connection_killers.clone();
6507            let forbid_connections = self.forbid_connections.clone();
6508            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6509
6510            Arc::get_mut(&mut client)
6511                .unwrap()
6512                .override_authenticate(move |cx| {
6513                    cx.spawn(|_| async move {
6514                        let access_token = "the-token".to_string();
6515                        Ok(Credentials {
6516                            user_id: user_id.0 as u64,
6517                            access_token,
6518                        })
6519                    })
6520                })
6521                .override_establish_connection(move |credentials, cx| {
6522                    assert_eq!(credentials.user_id, user_id.0 as u64);
6523                    assert_eq!(credentials.access_token, "the-token");
6524
6525                    let server = server.clone();
6526                    let connection_killers = connection_killers.clone();
6527                    let forbid_connections = forbid_connections.clone();
6528                    let client_name = client_name.clone();
6529                    let connection_id_tx = connection_id_tx.clone();
6530                    cx.spawn(move |cx| async move {
6531                        if forbid_connections.load(SeqCst) {
6532                            Err(EstablishConnectionError::other(anyhow!(
6533                                "server is forbidding connections"
6534                            )))
6535                        } else {
6536                            let (client_conn, server_conn, killed) =
6537                                Connection::in_memory(cx.background());
6538                            connection_killers.lock().insert(user_id, killed);
6539                            cx.background()
6540                                .spawn(server.handle_connection(
6541                                    server_conn,
6542                                    client_name,
6543                                    user_id,
6544                                    Some(connection_id_tx),
6545                                    cx.background(),
6546                                ))
6547                                .detach();
6548                            Ok(client_conn)
6549                        }
6550                    })
6551                });
6552
6553            Channel::init(&client);
6554            Project::init(&client);
6555            cx.update(|cx| {
6556                workspace::init(&client, cx);
6557            });
6558
6559            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6560            client
6561                .authenticate_and_connect(false, &cx.to_async())
6562                .await
6563                .unwrap();
6564            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6565
6566            let client = TestClient {
6567                client,
6568                peer_id,
6569                username: name.to_string(),
6570                user_store,
6571                language_registry: Arc::new(LanguageRegistry::test()),
6572                project: Default::default(),
6573                buffers: Default::default(),
6574            };
6575            client.wait_for_current_user(cx).await;
6576            client
6577        }
6578
6579        fn disconnect_client(&self, user_id: UserId) {
6580            self.connection_killers
6581                .lock()
6582                .remove(&user_id)
6583                .unwrap()
6584                .store(true, SeqCst);
6585        }
6586
6587        fn forbid_connections(&self) {
6588            self.forbid_connections.store(true, SeqCst);
6589        }
6590
6591        fn allow_connections(&self) {
6592            self.forbid_connections.store(false, SeqCst);
6593        }
6594
6595        async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6596            while let Some((client_a, cx_a)) = clients.pop() {
6597                for (client_b, cx_b) in &mut clients {
6598                    client_a
6599                        .user_store
6600                        .update(cx_a, |store, cx| {
6601                            store.request_contact(client_b.user_id().unwrap(), cx)
6602                        })
6603                        .await
6604                        .unwrap();
6605                    cx_a.foreground().run_until_parked();
6606                    client_b
6607                        .user_store
6608                        .update(*cx_b, |store, cx| {
6609                            store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6610                        })
6611                        .await
6612                        .unwrap();
6613                }
6614            }
6615        }
6616
6617        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6618            Arc::new(AppState {
6619                db: test_db.db().clone(),
6620                api_token: Default::default(),
6621            })
6622        }
6623
6624        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6625            self.server.store.read().await
6626        }
6627
6628        async fn condition<F>(&mut self, mut predicate: F)
6629        where
6630            F: FnMut(&Store) -> bool,
6631        {
6632            assert!(
6633                self.foreground.parking_forbidden(),
6634                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6635            );
6636            while !(predicate)(&*self.server.store.read().await) {
6637                self.foreground.start_waiting();
6638                self.notifications.next().await;
6639                self.foreground.finish_waiting();
6640            }
6641        }
6642    }
6643
6644    impl Deref for TestServer {
6645        type Target = Server;
6646
6647        fn deref(&self) -> &Self::Target {
6648            &self.server
6649        }
6650    }
6651
6652    impl Drop for TestServer {
6653        fn drop(&mut self) {
6654            self.peer.reset();
6655        }
6656    }
6657
6658    struct TestClient {
6659        client: Arc<Client>,
6660        username: String,
6661        pub peer_id: PeerId,
6662        pub user_store: ModelHandle<UserStore>,
6663        language_registry: Arc<LanguageRegistry>,
6664        project: Option<ModelHandle<Project>>,
6665        buffers: HashSet<ModelHandle<language::Buffer>>,
6666    }
6667
6668    impl Deref for TestClient {
6669        type Target = Arc<Client>;
6670
6671        fn deref(&self) -> &Self::Target {
6672            &self.client
6673        }
6674    }
6675
6676    struct ContactsSummary {
6677        pub current: Vec<String>,
6678        pub outgoing_requests: Vec<String>,
6679        pub incoming_requests: Vec<String>,
6680    }
6681
6682    impl TestClient {
6683        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6684            UserId::from_proto(
6685                self.user_store
6686                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6687            )
6688        }
6689
6690        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6691            let mut authed_user = self
6692                .user_store
6693                .read_with(cx, |user_store, _| user_store.watch_current_user());
6694            while authed_user.next().await.unwrap().is_none() {}
6695        }
6696
6697        async fn clear_contacts(&self, cx: &mut TestAppContext) {
6698            self.user_store
6699                .update(cx, |store, _| store.clear_contacts())
6700                .await;
6701        }
6702
6703        fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6704            self.user_store.read_with(cx, |store, _| ContactsSummary {
6705                current: store
6706                    .contacts()
6707                    .iter()
6708                    .map(|contact| contact.user.github_login.clone())
6709                    .collect(),
6710                outgoing_requests: store
6711                    .outgoing_contact_requests()
6712                    .iter()
6713                    .map(|user| user.github_login.clone())
6714                    .collect(),
6715                incoming_requests: store
6716                    .incoming_contact_requests()
6717                    .iter()
6718                    .map(|user| user.github_login.clone())
6719                    .collect(),
6720            })
6721        }
6722
6723        async fn build_local_project(
6724            &mut self,
6725            fs: Arc<FakeFs>,
6726            root_path: impl AsRef<Path>,
6727            cx: &mut TestAppContext,
6728        ) -> (ModelHandle<Project>, WorktreeId) {
6729            let project = cx.update(|cx| {
6730                Project::local(
6731                    self.client.clone(),
6732                    self.user_store.clone(),
6733                    self.language_registry.clone(),
6734                    fs,
6735                    cx,
6736                )
6737            });
6738            self.project = Some(project.clone());
6739            let (worktree, _) = project
6740                .update(cx, |p, cx| {
6741                    p.find_or_create_local_worktree(root_path, true, cx)
6742                })
6743                .await
6744                .unwrap();
6745            worktree
6746                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6747                .await;
6748            project
6749                .update(cx, |project, _| project.next_remote_id())
6750                .await;
6751            (project, worktree.read_with(cx, |tree, _| tree.id()))
6752        }
6753
6754        async fn build_remote_project(
6755            &mut self,
6756            host_project: &ModelHandle<Project>,
6757            host_cx: &mut TestAppContext,
6758            guest_cx: &mut TestAppContext,
6759        ) -> ModelHandle<Project> {
6760            let host_project_id = host_project
6761                .read_with(host_cx, |project, _| project.next_remote_id())
6762                .await;
6763            let guest_user_id = self.user_id().unwrap();
6764            let languages =
6765                host_project.read_with(host_cx, |project, _| project.languages().clone());
6766            let project_b = guest_cx.spawn(|mut cx| {
6767                let user_store = self.user_store.clone();
6768                let guest_client = self.client.clone();
6769                async move {
6770                    Project::remote(
6771                        host_project_id,
6772                        guest_client,
6773                        user_store.clone(),
6774                        languages,
6775                        FakeFs::new(cx.background()),
6776                        &mut cx,
6777                    )
6778                    .await
6779                    .unwrap()
6780                }
6781            });
6782            host_cx.foreground().run_until_parked();
6783            host_project.update(host_cx, |project, cx| {
6784                project.respond_to_join_request(guest_user_id, true, cx)
6785            });
6786            let project = project_b.await;
6787            self.project = Some(project.clone());
6788            project
6789        }
6790
6791        fn build_workspace(
6792            &self,
6793            project: &ModelHandle<Project>,
6794            cx: &mut TestAppContext,
6795        ) -> ViewHandle<Workspace> {
6796            let (window_id, _) = cx.add_window(|_| EmptyView);
6797            cx.add_view(window_id, |cx| {
6798                let fs = project.read(cx).fs().clone();
6799                Workspace::new(
6800                    &WorkspaceParams {
6801                        fs,
6802                        project: project.clone(),
6803                        user_store: self.user_store.clone(),
6804                        languages: self.language_registry.clone(),
6805                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6806                        channel_list: cx.add_model(|cx| {
6807                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6808                        }),
6809                        client: self.client.clone(),
6810                    },
6811                    cx,
6812                )
6813            })
6814        }
6815
6816        async fn simulate_host(
6817            mut self,
6818            project: ModelHandle<Project>,
6819            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6820            rng: Arc<Mutex<StdRng>>,
6821            mut cx: TestAppContext,
6822        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6823            async fn simulate_host_internal(
6824                client: &mut TestClient,
6825                project: ModelHandle<Project>,
6826                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6827                rng: Arc<Mutex<StdRng>>,
6828                cx: &mut TestAppContext,
6829            ) -> anyhow::Result<()> {
6830                let fs = project.read_with(cx, |project, _| project.fs().clone());
6831
6832                cx.update(|cx| {
6833                    cx.subscribe(&project, move |project, event, cx| {
6834                        if let project::Event::ContactRequestedJoin(user) = event {
6835                            log::info!("Host: accepting join request from {}", user.github_login);
6836                            project.update(cx, |project, cx| {
6837                                project.respond_to_join_request(user.id, true, cx)
6838                            });
6839                        }
6840                    })
6841                    .detach();
6842                });
6843
6844                while op_start_signal.next().await.is_some() {
6845                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6846                    let files = fs.as_fake().files().await;
6847                    match distribution {
6848                        0..=19 if !files.is_empty() => {
6849                            let path = files.choose(&mut *rng.lock()).unwrap();
6850                            let mut path = path.as_path();
6851                            while let Some(parent_path) = path.parent() {
6852                                path = parent_path;
6853                                if rng.lock().gen() {
6854                                    break;
6855                                }
6856                            }
6857
6858                            log::info!("Host: find/create local worktree {:?}", path);
6859                            let find_or_create_worktree = project.update(cx, |project, cx| {
6860                                project.find_or_create_local_worktree(path, true, cx)
6861                            });
6862                            if rng.lock().gen() {
6863                                cx.background().spawn(find_or_create_worktree).detach();
6864                            } else {
6865                                find_or_create_worktree.await?;
6866                            }
6867                        }
6868                        20..=79 if !files.is_empty() => {
6869                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6870                                let file = files.choose(&mut *rng.lock()).unwrap();
6871                                let (worktree, path) = project
6872                                    .update(cx, |project, cx| {
6873                                        project.find_or_create_local_worktree(
6874                                            file.clone(),
6875                                            true,
6876                                            cx,
6877                                        )
6878                                    })
6879                                    .await?;
6880                                let project_path =
6881                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6882                                log::info!(
6883                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6884                                    file,
6885                                    project_path.0,
6886                                    project_path.1
6887                                );
6888                                let buffer = project
6889                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6890                                    .await
6891                                    .unwrap();
6892                                client.buffers.insert(buffer.clone());
6893                                buffer
6894                            } else {
6895                                client
6896                                    .buffers
6897                                    .iter()
6898                                    .choose(&mut *rng.lock())
6899                                    .unwrap()
6900                                    .clone()
6901                            };
6902
6903                            if rng.lock().gen_bool(0.1) {
6904                                cx.update(|cx| {
6905                                    log::info!(
6906                                        "Host: dropping buffer {:?}",
6907                                        buffer.read(cx).file().unwrap().full_path(cx)
6908                                    );
6909                                    client.buffers.remove(&buffer);
6910                                    drop(buffer);
6911                                });
6912                            } else {
6913                                buffer.update(cx, |buffer, cx| {
6914                                    log::info!(
6915                                        "Host: updating buffer {:?} ({})",
6916                                        buffer.file().unwrap().full_path(cx),
6917                                        buffer.remote_id()
6918                                    );
6919
6920                                    if rng.lock().gen_bool(0.7) {
6921                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6922                                    } else {
6923                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6924                                    }
6925                                });
6926                            }
6927                        }
6928                        _ => loop {
6929                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6930                            let mut path = PathBuf::new();
6931                            path.push("/");
6932                            for _ in 0..path_component_count {
6933                                let letter = rng.lock().gen_range(b'a'..=b'z');
6934                                path.push(std::str::from_utf8(&[letter]).unwrap());
6935                            }
6936                            path.set_extension("rs");
6937                            let parent_path = path.parent().unwrap();
6938
6939                            log::info!("Host: creating file {:?}", path,);
6940
6941                            if fs.create_dir(&parent_path).await.is_ok()
6942                                && fs.create_file(&path, Default::default()).await.is_ok()
6943                            {
6944                                break;
6945                            } else {
6946                                log::info!("Host: cannot create file");
6947                            }
6948                        },
6949                    }
6950
6951                    cx.background().simulate_random_delay().await;
6952                }
6953
6954                Ok(())
6955            }
6956
6957            let result =
6958                simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
6959                    .await;
6960            log::info!("Host done");
6961            self.project = Some(project);
6962            (self, cx, result.err())
6963        }
6964
6965        pub async fn simulate_guest(
6966            mut self,
6967            guest_username: String,
6968            project: ModelHandle<Project>,
6969            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6970            rng: Arc<Mutex<StdRng>>,
6971            mut cx: TestAppContext,
6972        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6973            async fn simulate_guest_internal(
6974                client: &mut TestClient,
6975                guest_username: &str,
6976                project: ModelHandle<Project>,
6977                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6978                rng: Arc<Mutex<StdRng>>,
6979                cx: &mut TestAppContext,
6980            ) -> anyhow::Result<()> {
6981                while op_start_signal.next().await.is_some() {
6982                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6983                        let worktree = if let Some(worktree) =
6984                            project.read_with(cx, |project, cx| {
6985                                project
6986                                    .worktrees(&cx)
6987                                    .filter(|worktree| {
6988                                        let worktree = worktree.read(cx);
6989                                        worktree.is_visible()
6990                                            && worktree.entries(false).any(|e| e.is_file())
6991                                    })
6992                                    .choose(&mut *rng.lock())
6993                            }) {
6994                            worktree
6995                        } else {
6996                            cx.background().simulate_random_delay().await;
6997                            continue;
6998                        };
6999
7000                        let (worktree_root_name, project_path) =
7001                            worktree.read_with(cx, |worktree, _| {
7002                                let entry = worktree
7003                                    .entries(false)
7004                                    .filter(|e| e.is_file())
7005                                    .choose(&mut *rng.lock())
7006                                    .unwrap();
7007                                (
7008                                    worktree.root_name().to_string(),
7009                                    (worktree.id(), entry.path.clone()),
7010                                )
7011                            });
7012                        log::info!(
7013                            "{}: opening path {:?} in worktree {} ({})",
7014                            guest_username,
7015                            project_path.1,
7016                            project_path.0,
7017                            worktree_root_name,
7018                        );
7019                        let buffer = project
7020                            .update(cx, |project, cx| {
7021                                project.open_buffer(project_path.clone(), cx)
7022                            })
7023                            .await?;
7024                        log::info!(
7025                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
7026                            guest_username,
7027                            project_path.1,
7028                            project_path.0,
7029                            worktree_root_name,
7030                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
7031                        );
7032                        client.buffers.insert(buffer.clone());
7033                        buffer
7034                    } else {
7035                        client
7036                            .buffers
7037                            .iter()
7038                            .choose(&mut *rng.lock())
7039                            .unwrap()
7040                            .clone()
7041                    };
7042
7043                    let choice = rng.lock().gen_range(0..100);
7044                    match choice {
7045                        0..=9 => {
7046                            cx.update(|cx| {
7047                                log::info!(
7048                                    "{}: dropping buffer {:?}",
7049                                    guest_username,
7050                                    buffer.read(cx).file().unwrap().full_path(cx)
7051                                );
7052                                client.buffers.remove(&buffer);
7053                                drop(buffer);
7054                            });
7055                        }
7056                        10..=19 => {
7057                            let completions = project.update(cx, |project, cx| {
7058                                log::info!(
7059                                    "{}: requesting completions for buffer {} ({:?})",
7060                                    guest_username,
7061                                    buffer.read(cx).remote_id(),
7062                                    buffer.read(cx).file().unwrap().full_path(cx)
7063                                );
7064                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7065                                project.completions(&buffer, offset, cx)
7066                            });
7067                            let completions = cx.background().spawn(async move {
7068                                completions
7069                                    .await
7070                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
7071                            });
7072                            if rng.lock().gen_bool(0.3) {
7073                                log::info!("{}: detaching completions request", guest_username);
7074                                cx.update(|cx| completions.detach_and_log_err(cx));
7075                            } else {
7076                                completions.await?;
7077                            }
7078                        }
7079                        20..=29 => {
7080                            let code_actions = project.update(cx, |project, cx| {
7081                                log::info!(
7082                                    "{}: requesting code actions for buffer {} ({:?})",
7083                                    guest_username,
7084                                    buffer.read(cx).remote_id(),
7085                                    buffer.read(cx).file().unwrap().full_path(cx)
7086                                );
7087                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7088                                project.code_actions(&buffer, range, cx)
7089                            });
7090                            let code_actions = cx.background().spawn(async move {
7091                                code_actions.await.map_err(|err| {
7092                                    anyhow!("code actions request failed: {:?}", err)
7093                                })
7094                            });
7095                            if rng.lock().gen_bool(0.3) {
7096                                log::info!("{}: detaching code actions request", guest_username);
7097                                cx.update(|cx| code_actions.detach_and_log_err(cx));
7098                            } else {
7099                                code_actions.await?;
7100                            }
7101                        }
7102                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7103                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7104                                log::info!(
7105                                    "{}: saving buffer {} ({:?})",
7106                                    guest_username,
7107                                    buffer.remote_id(),
7108                                    buffer.file().unwrap().full_path(cx)
7109                                );
7110                                (buffer.version(), buffer.save(cx))
7111                            });
7112                            let save = cx.background().spawn(async move {
7113                                let (saved_version, _) = save
7114                                    .await
7115                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7116                                assert!(saved_version.observed_all(&requested_version));
7117                                Ok::<_, anyhow::Error>(())
7118                            });
7119                            if rng.lock().gen_bool(0.3) {
7120                                log::info!("{}: detaching save request", guest_username);
7121                                cx.update(|cx| save.detach_and_log_err(cx));
7122                            } else {
7123                                save.await?;
7124                            }
7125                        }
7126                        40..=44 => {
7127                            let prepare_rename = project.update(cx, |project, cx| {
7128                                log::info!(
7129                                    "{}: preparing rename for buffer {} ({:?})",
7130                                    guest_username,
7131                                    buffer.read(cx).remote_id(),
7132                                    buffer.read(cx).file().unwrap().full_path(cx)
7133                                );
7134                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7135                                project.prepare_rename(buffer, offset, cx)
7136                            });
7137                            let prepare_rename = cx.background().spawn(async move {
7138                                prepare_rename.await.map_err(|err| {
7139                                    anyhow!("prepare rename request failed: {:?}", err)
7140                                })
7141                            });
7142                            if rng.lock().gen_bool(0.3) {
7143                                log::info!("{}: detaching prepare rename request", guest_username);
7144                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7145                            } else {
7146                                prepare_rename.await?;
7147                            }
7148                        }
7149                        45..=49 => {
7150                            let definitions = project.update(cx, |project, cx| {
7151                                log::info!(
7152                                    "{}: requesting definitions for buffer {} ({:?})",
7153                                    guest_username,
7154                                    buffer.read(cx).remote_id(),
7155                                    buffer.read(cx).file().unwrap().full_path(cx)
7156                                );
7157                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7158                                project.definition(&buffer, offset, cx)
7159                            });
7160                            let definitions = cx.background().spawn(async move {
7161                                definitions
7162                                    .await
7163                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7164                            });
7165                            if rng.lock().gen_bool(0.3) {
7166                                log::info!("{}: detaching definitions request", guest_username);
7167                                cx.update(|cx| definitions.detach_and_log_err(cx));
7168                            } else {
7169                                client
7170                                    .buffers
7171                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7172                            }
7173                        }
7174                        50..=54 => {
7175                            let highlights = project.update(cx, |project, cx| {
7176                                log::info!(
7177                                    "{}: requesting highlights for buffer {} ({:?})",
7178                                    guest_username,
7179                                    buffer.read(cx).remote_id(),
7180                                    buffer.read(cx).file().unwrap().full_path(cx)
7181                                );
7182                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7183                                project.document_highlights(&buffer, offset, cx)
7184                            });
7185                            let highlights = cx.background().spawn(async move {
7186                                highlights
7187                                    .await
7188                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7189                            });
7190                            if rng.lock().gen_bool(0.3) {
7191                                log::info!("{}: detaching highlights request", guest_username);
7192                                cx.update(|cx| highlights.detach_and_log_err(cx));
7193                            } else {
7194                                highlights.await?;
7195                            }
7196                        }
7197                        55..=59 => {
7198                            let search = project.update(cx, |project, cx| {
7199                                let query = rng.lock().gen_range('a'..='z');
7200                                log::info!("{}: project-wide search {:?}", guest_username, query);
7201                                project.search(SearchQuery::text(query, false, false), cx)
7202                            });
7203                            let search = cx.background().spawn(async move {
7204                                search
7205                                    .await
7206                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
7207                            });
7208                            if rng.lock().gen_bool(0.3) {
7209                                log::info!("{}: detaching search request", guest_username);
7210                                cx.update(|cx| search.detach_and_log_err(cx));
7211                            } else {
7212                                client.buffers.extend(search.await?.into_keys());
7213                            }
7214                        }
7215                        60..=69 => {
7216                            let worktree = project
7217                                .read_with(cx, |project, cx| {
7218                                    project
7219                                        .worktrees(&cx)
7220                                        .filter(|worktree| {
7221                                            let worktree = worktree.read(cx);
7222                                            worktree.is_visible()
7223                                                && worktree.entries(false).any(|e| e.is_file())
7224                                                && worktree
7225                                                    .root_entry()
7226                                                    .map_or(false, |e| e.is_dir())
7227                                        })
7228                                        .choose(&mut *rng.lock())
7229                                })
7230                                .unwrap();
7231                            let (worktree_id, worktree_root_name) = worktree
7232                                .read_with(cx, |worktree, _| {
7233                                    (worktree.id(), worktree.root_name().to_string())
7234                                });
7235
7236                            let mut new_name = String::new();
7237                            for _ in 0..10 {
7238                                let letter = rng.lock().gen_range('a'..='z');
7239                                new_name.push(letter);
7240                            }
7241                            let mut new_path = PathBuf::new();
7242                            new_path.push(new_name);
7243                            new_path.set_extension("rs");
7244                            log::info!(
7245                                "{}: creating {:?} in worktree {} ({})",
7246                                guest_username,
7247                                new_path,
7248                                worktree_id,
7249                                worktree_root_name,
7250                            );
7251                            project
7252                                .update(cx, |project, cx| {
7253                                    project.create_entry((worktree_id, new_path), false, cx)
7254                                })
7255                                .unwrap()
7256                                .await?;
7257                        }
7258                        _ => {
7259                            buffer.update(cx, |buffer, cx| {
7260                                log::info!(
7261                                    "{}: updating buffer {} ({:?})",
7262                                    guest_username,
7263                                    buffer.remote_id(),
7264                                    buffer.file().unwrap().full_path(cx)
7265                                );
7266                                if rng.lock().gen_bool(0.7) {
7267                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7268                                } else {
7269                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7270                                }
7271                            });
7272                        }
7273                    }
7274                    cx.background().simulate_random_delay().await;
7275                }
7276                Ok(())
7277            }
7278
7279            let result = simulate_guest_internal(
7280                &mut self,
7281                &guest_username,
7282                project.clone(),
7283                op_start_signal,
7284                rng,
7285                &mut cx,
7286            )
7287            .await;
7288            log::info!("{}: done", guest_username);
7289
7290            self.project = Some(project);
7291            (self, cx, result.err())
7292        }
7293    }
7294
7295    impl Drop for TestClient {
7296        fn drop(&mut self) {
7297            self.client.tear_down();
7298        }
7299    }
7300
7301    impl Executor for Arc<gpui::executor::Background> {
7302        type Sleep = gpui::executor::Timer;
7303
7304        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7305            self.spawn(future).detach();
7306        }
7307
7308        fn sleep(&self, duration: Duration) -> Self::Sleep {
7309            self.as_ref().timer(duration)
7310        }
7311    }
7312
7313    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7314        channel
7315            .messages()
7316            .cursor::<()>()
7317            .map(|m| {
7318                (
7319                    m.sender.github_login.clone(),
7320                    m.body.clone(),
7321                    m.is_pending(),
7322                )
7323            })
7324            .collect()
7325    }
7326
7327    struct EmptyView;
7328
7329    impl gpui::Entity for EmptyView {
7330        type Event = ();
7331    }
7332
7333    impl gpui::View for EmptyView {
7334        fn ui_name() -> &'static str {
7335            "empty view"
7336        }
7337
7338        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7339            gpui::Element::boxed(gpui::elements::Empty::new())
7340        }
7341    }
7342}