rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{self, ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::IntoResponse,
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::HashMap;
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::{
  40        atomic::{AtomicBool, Ordering::SeqCst},
  41        Arc,
  42    },
  43    time::Duration,
  44};
  45use store::{Store, Worktree};
  46use time::OffsetDateTime;
  47use tokio::{
  48    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  49    time::Sleep,
  50};
  51use tower::ServiceBuilder;
  52use tracing::{info_span, Instrument};
  53
  54type MessageHandler =
  55    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  56
  57struct Response<R> {
  58    server: Arc<Server>,
  59    receipt: Receipt<R>,
  60    responded: Arc<AtomicBool>,
  61}
  62
  63impl<R: RequestMessage> Response<R> {
  64    fn send(self, payload: R::Response) -> Result<()> {
  65        self.responded.store(true, SeqCst);
  66        self.server.peer.respond(self.receipt, payload)?;
  67        Ok(())
  68    }
  69
  70    fn into_receipt(self) -> Receipt<R> {
  71        self.responded.store(true, SeqCst);
  72        self.receipt
  73    }
  74}
  75
  76pub struct Server {
  77    peer: Arc<Peer>,
  78    store: RwLock<Store>,
  79    app_state: Arc<AppState>,
  80    handlers: HashMap<TypeId, MessageHandler>,
  81    notifications: Option<mpsc::UnboundedSender<()>>,
  82}
  83
  84pub trait Executor: Send + Clone {
  85    type Sleep: Send + Future;
  86    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  87    fn sleep(&self, duration: Duration) -> Self::Sleep;
  88}
  89
  90#[derive(Clone)]
  91pub struct RealExecutor;
  92
  93const MESSAGE_COUNT_PER_PAGE: usize = 100;
  94const MAX_MESSAGE_LEN: usize = 1024;
  95
  96struct StoreReadGuard<'a> {
  97    guard: RwLockReadGuard<'a, Store>,
  98    _not_send: PhantomData<Rc<()>>,
  99}
 100
 101struct StoreWriteGuard<'a> {
 102    guard: RwLockWriteGuard<'a, Store>,
 103    _not_send: PhantomData<Rc<()>>,
 104}
 105
 106impl Server {
 107    pub fn new(
 108        app_state: Arc<AppState>,
 109        notifications: Option<mpsc::UnboundedSender<()>>,
 110    ) -> Arc<Self> {
 111        let mut server = Self {
 112            peer: Peer::new(),
 113            app_state,
 114            store: Default::default(),
 115            handlers: Default::default(),
 116            notifications,
 117        };
 118
 119        server
 120            .add_request_handler(Server::ping)
 121            .add_request_handler(Server::register_project)
 122            .add_message_handler(Server::unregister_project)
 123            .add_request_handler(Server::join_project)
 124            .add_message_handler(Server::leave_project)
 125            .add_message_handler(Server::respond_to_join_project_request)
 126            .add_request_handler(Server::register_worktree)
 127            .add_message_handler(Server::unregister_worktree)
 128            .add_request_handler(Server::update_worktree)
 129            .add_message_handler(Server::start_language_server)
 130            .add_message_handler(Server::update_language_server)
 131            .add_message_handler(Server::update_diagnostic_summary)
 132            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 133            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 134            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 135            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 136            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 137            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 138            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 139            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 140            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 141            .add_request_handler(
 142                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 143            )
 144            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 145            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 146            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 147            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 148            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 149            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 150            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 151            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 152            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 153            .add_request_handler(Server::update_buffer)
 154            .add_message_handler(Server::update_buffer_file)
 155            .add_message_handler(Server::buffer_reloaded)
 156            .add_message_handler(Server::buffer_saved)
 157            .add_request_handler(Server::save_buffer)
 158            .add_request_handler(Server::get_channels)
 159            .add_request_handler(Server::get_users)
 160            .add_request_handler(Server::fuzzy_search_users)
 161            .add_request_handler(Server::request_contact)
 162            .add_request_handler(Server::remove_contact)
 163            .add_request_handler(Server::respond_to_contact_request)
 164            .add_request_handler(Server::join_channel)
 165            .add_message_handler(Server::leave_channel)
 166            .add_request_handler(Server::send_channel_message)
 167            .add_request_handler(Server::follow)
 168            .add_message_handler(Server::unfollow)
 169            .add_message_handler(Server::update_followers)
 170            .add_request_handler(Server::get_channel_messages);
 171
 172        Arc::new(server)
 173    }
 174
 175    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 176    where
 177        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 178        Fut: 'static + Send + Future<Output = Result<()>>,
 179        M: EnvelopedMessage,
 180    {
 181        let prev_handler = self.handlers.insert(
 182            TypeId::of::<M>(),
 183            Box::new(move |server, envelope| {
 184                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 185                let span = info_span!(
 186                    "handle message",
 187                    payload_type = envelope.payload_type_name(),
 188                    payload = format!("{:?}", envelope.payload).as_str(),
 189                );
 190                let future = (handler)(server, *envelope);
 191                async move {
 192                    if let Err(error) = future.await {
 193                        tracing::error!(%error, "error handling message");
 194                    }
 195                }
 196                .instrument(span)
 197                .boxed()
 198            }),
 199        );
 200        if prev_handler.is_some() {
 201            panic!("registered a handler for the same message twice");
 202        }
 203        self
 204    }
 205
 206    /// Handle a request while holding a lock to the store. This is useful when we're registering
 207    /// a connection but we want to respond on the connection before anybody else can send on it.
 208    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 209    where
 210        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
 211        Fut: Send + Future<Output = Result<()>>,
 212        M: RequestMessage,
 213    {
 214        let handler = Arc::new(handler);
 215        self.add_message_handler(move |server, envelope| {
 216            let receipt = envelope.receipt();
 217            let handler = handler.clone();
 218            async move {
 219                let responded = Arc::new(AtomicBool::default());
 220                let response = Response {
 221                    server: server.clone(),
 222                    responded: responded.clone(),
 223                    receipt: envelope.receipt(),
 224                };
 225                match (handler)(server.clone(), envelope, response).await {
 226                    Ok(()) => {
 227                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 228                            Ok(())
 229                        } else {
 230                            Err(anyhow!("handler did not send a response"))?
 231                        }
 232                    }
 233                    Err(error) => {
 234                        server.peer.respond_with_error(
 235                            receipt,
 236                            proto::Error {
 237                                message: error.to_string(),
 238                            },
 239                        )?;
 240                        Err(error)
 241                    }
 242                }
 243            }
 244        })
 245    }
 246
 247    pub fn handle_connection<E: Executor>(
 248        self: &Arc<Self>,
 249        connection: Connection,
 250        address: String,
 251        user_id: UserId,
 252        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 253        executor: E,
 254    ) -> impl Future<Output = Result<()>> {
 255        let mut this = self.clone();
 256        let span = info_span!("handle connection", %user_id, %address);
 257        async move {
 258            let (connection_id, handle_io, mut incoming_rx) = this
 259                .peer
 260                .add_connection(connection, {
 261                    let executor = executor.clone();
 262                    move |duration| {
 263                        let timer = executor.sleep(duration);
 264                        async move {
 265                            timer.await;
 266                        }
 267                    }
 268                })
 269                .await;
 270
 271            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 272
 273            if let Some(send_connection_id) = send_connection_id.as_mut() {
 274                let _ = send_connection_id.send(connection_id).await;
 275            }
 276
 277            let contacts = this.app_state.db.get_contacts(user_id).await?;
 278
 279            {
 280                let mut store = this.store_mut().await;
 281                store.add_connection(connection_id, user_id);
 282                this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 283            }
 284            this.update_user_contacts(user_id).await?;
 285
 286            let handle_io = handle_io.fuse();
 287            futures::pin_mut!(handle_io);
 288            loop {
 289                let next_message = incoming_rx.next().fuse();
 290                futures::pin_mut!(next_message);
 291                futures::select_biased! {
 292                    result = handle_io => {
 293                        if let Err(error) = result {
 294                            tracing::error!(%error, "error handling I/O");
 295                        }
 296                        break;
 297                    }
 298                    message = next_message => {
 299                        if let Some(message) = message {
 300                            let type_name = message.payload_type_name();
 301                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 302                            async {
 303                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 304                                    let notifications = this.notifications.clone();
 305                                    let is_background = message.is_background();
 306                                    let handle_message = (handler)(this.clone(), message);
 307                                    let handle_message = async move {
 308                                        handle_message.await;
 309                                        if let Some(mut notifications) = notifications {
 310                                            let _ = notifications.send(()).await;
 311                                        }
 312                                    };
 313                                    if is_background {
 314                                        executor.spawn_detached(handle_message);
 315                                    } else {
 316                                        handle_message.await;
 317                                    }
 318                                } else {
 319                                    tracing::error!("no message handler");
 320                                }
 321                            }.instrument(span).await;
 322                        } else {
 323                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 324                            break;
 325                        }
 326                    }
 327                }
 328            }
 329
 330            if let Err(error) = this.sign_out(connection_id).await {
 331                tracing::error!(%error, "error signing out");
 332            }
 333
 334            Ok(())
 335        }.instrument(span)
 336    }
 337
 338    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 339        self.peer.disconnect(connection_id);
 340
 341        let removed_user_id = {
 342            let mut store = self.store_mut().await;
 343            let removed_connection = store.remove_connection(connection_id)?;
 344
 345            for (project_id, project) in removed_connection.hosted_projects {
 346                broadcast(connection_id, project.guests.keys().copied(), |conn_id| {
 347                    self.peer
 348                        .send(conn_id, proto::UnregisterProject { project_id })
 349                });
 350
 351                for (_, receipts) in project.join_requests {
 352                    for receipt in receipts {
 353                        self.peer.respond(
 354                            receipt,
 355                            proto::JoinProjectResponse {
 356                                variant: Some(proto::join_project_response::Variant::Decline(
 357                                    proto::join_project_response::Decline {
 358                                        reason: proto::join_project_response::decline::Reason::WentOffline as i32
 359                                    },
 360                                )),
 361                            },
 362                        )?;
 363                    }
 364                }
 365            }
 366
 367            for project_id in removed_connection.guest_project_ids {
 368                if let Some(project) = store.project(project_id).trace_err() {
 369                    broadcast(connection_id, project.connection_ids(), |conn_id| {
 370                        self.peer.send(
 371                            conn_id,
 372                            proto::RemoveProjectCollaborator {
 373                                project_id,
 374                                peer_id: connection_id.0,
 375                            },
 376                        )
 377                    });
 378                    if project.guests.is_empty() {
 379                        self.peer
 380                            .send(
 381                                project.host_connection_id,
 382                                proto::ProjectUnshared { project_id },
 383                            )
 384                            .trace_err();
 385                    }
 386                }
 387            }
 388
 389            removed_connection.user_id
 390        };
 391
 392        self.update_user_contacts(removed_user_id).await?;
 393
 394        Ok(())
 395    }
 396
 397    async fn ping(
 398        self: Arc<Server>,
 399        _: TypedEnvelope<proto::Ping>,
 400        response: Response<proto::Ping>,
 401    ) -> Result<()> {
 402        response.send(proto::Ack {})?;
 403        Ok(())
 404    }
 405
 406    async fn register_project(
 407        self: Arc<Server>,
 408        request: TypedEnvelope<proto::RegisterProject>,
 409        response: Response<proto::RegisterProject>,
 410    ) -> Result<()> {
 411        let user_id;
 412        let project_id;
 413        {
 414            let mut state = self.store_mut().await;
 415            user_id = state.user_id_for_connection(request.sender_id)?;
 416            project_id = state.register_project(request.sender_id, user_id);
 417        };
 418        self.update_user_contacts(user_id).await?;
 419        response.send(proto::RegisterProjectResponse { project_id })?;
 420        Ok(())
 421    }
 422
 423    async fn unregister_project(
 424        self: Arc<Server>,
 425        request: TypedEnvelope<proto::UnregisterProject>,
 426    ) -> Result<()> {
 427        let (user_id, project) = {
 428            let mut state = self.store_mut().await;
 429            let project =
 430                state.unregister_project(request.payload.project_id, request.sender_id)?;
 431            (state.user_id_for_connection(request.sender_id)?, project)
 432        };
 433        for (_, receipts) in project.join_requests {
 434            for receipt in receipts {
 435                self.peer.respond(
 436                    receipt,
 437                    proto::JoinProjectResponse {
 438                        variant: Some(proto::join_project_response::Variant::Decline(
 439                            proto::join_project_response::Decline {
 440                                reason: proto::join_project_response::decline::Reason::Closed
 441                                    as i32,
 442                            },
 443                        )),
 444                    },
 445                )?;
 446            }
 447        }
 448
 449        self.update_user_contacts(user_id).await?;
 450        Ok(())
 451    }
 452
 453    async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
 454        let contacts = self.app_state.db.get_contacts(user_id).await?;
 455        let store = self.store().await;
 456        let updated_contact = store.contact_for_user(user_id, false);
 457        for contact in contacts {
 458            if let db::Contact::Accepted {
 459                user_id: contact_user_id,
 460                ..
 461            } = contact
 462            {
 463                for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
 464                    self.peer
 465                        .send(
 466                            contact_conn_id,
 467                            proto::UpdateContacts {
 468                                contacts: vec![updated_contact.clone()],
 469                                remove_contacts: Default::default(),
 470                                incoming_requests: Default::default(),
 471                                remove_incoming_requests: Default::default(),
 472                                outgoing_requests: Default::default(),
 473                                remove_outgoing_requests: Default::default(),
 474                            },
 475                        )
 476                        .trace_err();
 477                }
 478            }
 479        }
 480        Ok(())
 481    }
 482
 483    async fn join_project(
 484        self: Arc<Server>,
 485        request: TypedEnvelope<proto::JoinProject>,
 486        response: Response<proto::JoinProject>,
 487    ) -> Result<()> {
 488        let project_id = request.payload.project_id;
 489        let host_user_id;
 490        let guest_user_id;
 491        let host_connection_id;
 492        {
 493            let state = self.store().await;
 494            let project = state.project(project_id)?;
 495            host_user_id = project.host_user_id;
 496            host_connection_id = project.host_connection_id;
 497            guest_user_id = state.user_id_for_connection(request.sender_id)?;
 498        };
 499
 500        let has_contact = self
 501            .app_state
 502            .db
 503            .has_contact(guest_user_id, host_user_id)
 504            .await?;
 505        if !has_contact {
 506            return Err(anyhow!("no such project"))?;
 507        }
 508
 509        self.store_mut().await.request_join_project(
 510            guest_user_id,
 511            project_id,
 512            response.into_receipt(),
 513        )?;
 514        self.peer.send(
 515            host_connection_id,
 516            proto::RequestJoinProject {
 517                project_id,
 518                requester_id: guest_user_id.to_proto(),
 519            },
 520        )?;
 521        Ok(())
 522    }
 523
 524    async fn respond_to_join_project_request(
 525        self: Arc<Server>,
 526        request: TypedEnvelope<proto::RespondToJoinProjectRequest>,
 527    ) -> Result<()> {
 528        let host_user_id;
 529
 530        {
 531            let mut state = self.store_mut().await;
 532            let project_id = request.payload.project_id;
 533            let project = state.project(project_id)?;
 534            if project.host_connection_id != request.sender_id {
 535                Err(anyhow!("no such connection"))?;
 536            }
 537
 538            host_user_id = project.host_user_id;
 539            let guest_user_id = UserId::from_proto(request.payload.requester_id);
 540
 541            if !request.payload.allow {
 542                let receipts = state
 543                    .deny_join_project_request(request.sender_id, guest_user_id, project_id)
 544                    .ok_or_else(|| anyhow!("no such request"))?;
 545                for receipt in receipts {
 546                    self.peer.respond(
 547                        receipt,
 548                        proto::JoinProjectResponse {
 549                            variant: Some(proto::join_project_response::Variant::Decline(
 550                                proto::join_project_response::Decline {
 551                                    reason: proto::join_project_response::decline::Reason::Declined
 552                                        as i32,
 553                                },
 554                            )),
 555                        },
 556                    )?;
 557                }
 558                return Ok(());
 559            }
 560
 561            let (receipts_with_replica_ids, project) = state
 562                .accept_join_project_request(request.sender_id, guest_user_id, project_id)
 563                .ok_or_else(|| anyhow!("no such request"))?;
 564
 565            let peer_count = project.guests.len();
 566            let mut collaborators = Vec::with_capacity(peer_count);
 567            collaborators.push(proto::Collaborator {
 568                peer_id: project.host_connection_id.0,
 569                replica_id: 0,
 570                user_id: project.host_user_id.to_proto(),
 571            });
 572            let worktrees = project
 573                .worktrees
 574                .iter()
 575                .filter_map(|(id, shared_worktree)| {
 576                    let worktree = project.worktrees.get(&id)?;
 577                    Some(proto::Worktree {
 578                        id: *id,
 579                        root_name: worktree.root_name.clone(),
 580                        entries: shared_worktree.entries.values().cloned().collect(),
 581                        diagnostic_summaries: shared_worktree
 582                            .diagnostic_summaries
 583                            .values()
 584                            .cloned()
 585                            .collect(),
 586                        visible: worktree.visible,
 587                        scan_id: shared_worktree.scan_id,
 588                    })
 589                })
 590                .collect::<Vec<_>>();
 591
 592            // Add all guests other than the requesting user's own connections as collaborators
 593            for (peer_conn_id, (peer_replica_id, peer_user_id)) in &project.guests {
 594                if receipts_with_replica_ids
 595                    .iter()
 596                    .all(|(receipt, _)| receipt.sender_id != *peer_conn_id)
 597                {
 598                    collaborators.push(proto::Collaborator {
 599                        peer_id: peer_conn_id.0,
 600                        replica_id: *peer_replica_id as u32,
 601                        user_id: peer_user_id.to_proto(),
 602                    });
 603                }
 604            }
 605
 606            for conn_id in project.connection_ids() {
 607                for (receipt, replica_id) in &receipts_with_replica_ids {
 608                    if conn_id != receipt.sender_id {
 609                        self.peer.send(
 610                            conn_id,
 611                            proto::AddProjectCollaborator {
 612                                project_id,
 613                                collaborator: Some(proto::Collaborator {
 614                                    peer_id: receipt.sender_id.0,
 615                                    replica_id: *replica_id as u32,
 616                                    user_id: guest_user_id.to_proto(),
 617                                }),
 618                            },
 619                        )?;
 620                    }
 621                }
 622            }
 623
 624            for (receipt, replica_id) in receipts_with_replica_ids {
 625                self.peer.respond(
 626                    receipt,
 627                    proto::JoinProjectResponse {
 628                        variant: Some(proto::join_project_response::Variant::Accept(
 629                            proto::join_project_response::Accept {
 630                                worktrees: worktrees.clone(),
 631                                replica_id: replica_id as u32,
 632                                collaborators: collaborators.clone(),
 633                                language_servers: project.language_servers.clone(),
 634                            },
 635                        )),
 636                    },
 637                )?;
 638            }
 639        }
 640
 641        self.update_user_contacts(host_user_id).await?;
 642        Ok(())
 643    }
 644
 645    async fn leave_project(
 646        self: Arc<Server>,
 647        request: TypedEnvelope<proto::LeaveProject>,
 648    ) -> Result<()> {
 649        let sender_id = request.sender_id;
 650        let project_id = request.payload.project_id;
 651        let project;
 652        {
 653            let mut state = self.store_mut().await;
 654            project = state.leave_project(sender_id, project_id)?;
 655            let unshare = project.connection_ids.len() <= 1;
 656            broadcast(sender_id, project.connection_ids, |conn_id| {
 657                self.peer.send(
 658                    conn_id,
 659                    proto::RemoveProjectCollaborator {
 660                        project_id,
 661                        peer_id: sender_id.0,
 662                    },
 663                )
 664            });
 665            if unshare {
 666                self.peer.send(
 667                    project.host_connection_id,
 668                    proto::ProjectUnshared { project_id },
 669                )?;
 670            }
 671        }
 672        self.update_user_contacts(project.host_user_id).await?;
 673        Ok(())
 674    }
 675
 676    async fn register_worktree(
 677        self: Arc<Server>,
 678        request: TypedEnvelope<proto::RegisterWorktree>,
 679        response: Response<proto::RegisterWorktree>,
 680    ) -> Result<()> {
 681        let host_user_id;
 682        {
 683            let mut state = self.store_mut().await;
 684            host_user_id = state.user_id_for_connection(request.sender_id)?;
 685
 686            let guest_connection_ids = state
 687                .read_project(request.payload.project_id, request.sender_id)?
 688                .guest_connection_ids();
 689            state.register_worktree(
 690                request.payload.project_id,
 691                request.payload.worktree_id,
 692                request.sender_id,
 693                Worktree {
 694                    root_name: request.payload.root_name.clone(),
 695                    visible: request.payload.visible,
 696                    ..Default::default()
 697                },
 698            )?;
 699
 700            broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 701                self.peer
 702                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 703            });
 704        }
 705        self.update_user_contacts(host_user_id).await?;
 706        response.send(proto::Ack {})?;
 707        Ok(())
 708    }
 709
 710    async fn unregister_worktree(
 711        self: Arc<Server>,
 712        request: TypedEnvelope<proto::UnregisterWorktree>,
 713    ) -> Result<()> {
 714        let host_user_id;
 715        let project_id = request.payload.project_id;
 716        let worktree_id = request.payload.worktree_id;
 717        {
 718            let mut state = self.store_mut().await;
 719            let (_, guest_connection_ids) =
 720                state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 721            host_user_id = state.user_id_for_connection(request.sender_id)?;
 722            broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 723                self.peer.send(
 724                    conn_id,
 725                    proto::UnregisterWorktree {
 726                        project_id,
 727                        worktree_id,
 728                    },
 729                )
 730            });
 731        }
 732        self.update_user_contacts(host_user_id).await?;
 733        Ok(())
 734    }
 735
 736    async fn update_worktree(
 737        self: Arc<Server>,
 738        request: TypedEnvelope<proto::UpdateWorktree>,
 739        response: Response<proto::UpdateWorktree>,
 740    ) -> Result<()> {
 741        let connection_ids = self.store_mut().await.update_worktree(
 742            request.sender_id,
 743            request.payload.project_id,
 744            request.payload.worktree_id,
 745            &request.payload.removed_entries,
 746            &request.payload.updated_entries,
 747            request.payload.scan_id,
 748        )?;
 749
 750        broadcast(request.sender_id, connection_ids, |connection_id| {
 751            self.peer
 752                .forward_send(request.sender_id, connection_id, request.payload.clone())
 753        });
 754        response.send(proto::Ack {})?;
 755        Ok(())
 756    }
 757
 758    async fn update_diagnostic_summary(
 759        self: Arc<Server>,
 760        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 761    ) -> Result<()> {
 762        let summary = request
 763            .payload
 764            .summary
 765            .clone()
 766            .ok_or_else(|| anyhow!("invalid summary"))?;
 767        let receiver_ids = self.store_mut().await.update_diagnostic_summary(
 768            request.payload.project_id,
 769            request.payload.worktree_id,
 770            request.sender_id,
 771            summary,
 772        )?;
 773
 774        broadcast(request.sender_id, receiver_ids, |connection_id| {
 775            self.peer
 776                .forward_send(request.sender_id, connection_id, request.payload.clone())
 777        });
 778        Ok(())
 779    }
 780
 781    async fn start_language_server(
 782        self: Arc<Server>,
 783        request: TypedEnvelope<proto::StartLanguageServer>,
 784    ) -> Result<()> {
 785        let receiver_ids = self.store_mut().await.start_language_server(
 786            request.payload.project_id,
 787            request.sender_id,
 788            request
 789                .payload
 790                .server
 791                .clone()
 792                .ok_or_else(|| anyhow!("invalid language server"))?,
 793        )?;
 794        broadcast(request.sender_id, receiver_ids, |connection_id| {
 795            self.peer
 796                .forward_send(request.sender_id, connection_id, request.payload.clone())
 797        });
 798        Ok(())
 799    }
 800
 801    async fn update_language_server(
 802        self: Arc<Server>,
 803        request: TypedEnvelope<proto::UpdateLanguageServer>,
 804    ) -> Result<()> {
 805        let receiver_ids = self
 806            .store()
 807            .await
 808            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 809        broadcast(request.sender_id, receiver_ids, |connection_id| {
 810            self.peer
 811                .forward_send(request.sender_id, connection_id, request.payload.clone())
 812        });
 813        Ok(())
 814    }
 815
 816    async fn forward_project_request<T>(
 817        self: Arc<Server>,
 818        request: TypedEnvelope<T>,
 819        response: Response<T>,
 820    ) -> Result<()>
 821    where
 822        T: EntityMessage + RequestMessage,
 823    {
 824        let host_connection_id = self
 825            .store()
 826            .await
 827            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 828            .host_connection_id;
 829
 830        response.send(
 831            self.peer
 832                .forward_request(request.sender_id, host_connection_id, request.payload)
 833                .await?,
 834        )?;
 835        Ok(())
 836    }
 837
 838    async fn save_buffer(
 839        self: Arc<Server>,
 840        request: TypedEnvelope<proto::SaveBuffer>,
 841        response: Response<proto::SaveBuffer>,
 842    ) -> Result<()> {
 843        let host = self
 844            .store()
 845            .await
 846            .read_project(request.payload.project_id, request.sender_id)?
 847            .host_connection_id;
 848        let response_payload = self
 849            .peer
 850            .forward_request(request.sender_id, host, request.payload.clone())
 851            .await?;
 852
 853        let mut guests = self
 854            .store()
 855            .await
 856            .read_project(request.payload.project_id, request.sender_id)?
 857            .connection_ids();
 858        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 859        broadcast(host, guests, |conn_id| {
 860            self.peer
 861                .forward_send(host, conn_id, response_payload.clone())
 862        });
 863        response.send(response_payload)?;
 864        Ok(())
 865    }
 866
 867    async fn update_buffer(
 868        self: Arc<Server>,
 869        request: TypedEnvelope<proto::UpdateBuffer>,
 870        response: Response<proto::UpdateBuffer>,
 871    ) -> Result<()> {
 872        let receiver_ids = self
 873            .store()
 874            .await
 875            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 876        broadcast(request.sender_id, receiver_ids, |connection_id| {
 877            self.peer
 878                .forward_send(request.sender_id, connection_id, request.payload.clone())
 879        });
 880        response.send(proto::Ack {})?;
 881        Ok(())
 882    }
 883
 884    async fn update_buffer_file(
 885        self: Arc<Server>,
 886        request: TypedEnvelope<proto::UpdateBufferFile>,
 887    ) -> Result<()> {
 888        let receiver_ids = self
 889            .store()
 890            .await
 891            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 892        broadcast(request.sender_id, receiver_ids, |connection_id| {
 893            self.peer
 894                .forward_send(request.sender_id, connection_id, request.payload.clone())
 895        });
 896        Ok(())
 897    }
 898
 899    async fn buffer_reloaded(
 900        self: Arc<Server>,
 901        request: TypedEnvelope<proto::BufferReloaded>,
 902    ) -> Result<()> {
 903        let receiver_ids = self
 904            .store()
 905            .await
 906            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 907        broadcast(request.sender_id, receiver_ids, |connection_id| {
 908            self.peer
 909                .forward_send(request.sender_id, connection_id, request.payload.clone())
 910        });
 911        Ok(())
 912    }
 913
 914    async fn buffer_saved(
 915        self: Arc<Server>,
 916        request: TypedEnvelope<proto::BufferSaved>,
 917    ) -> Result<()> {
 918        let receiver_ids = self
 919            .store()
 920            .await
 921            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 922        broadcast(request.sender_id, receiver_ids, |connection_id| {
 923            self.peer
 924                .forward_send(request.sender_id, connection_id, request.payload.clone())
 925        });
 926        Ok(())
 927    }
 928
 929    async fn follow(
 930        self: Arc<Self>,
 931        request: TypedEnvelope<proto::Follow>,
 932        response: Response<proto::Follow>,
 933    ) -> Result<()> {
 934        let leader_id = ConnectionId(request.payload.leader_id);
 935        let follower_id = request.sender_id;
 936        if !self
 937            .store()
 938            .await
 939            .project_connection_ids(request.payload.project_id, follower_id)?
 940            .contains(&leader_id)
 941        {
 942            Err(anyhow!("no such peer"))?;
 943        }
 944        let mut response_payload = self
 945            .peer
 946            .forward_request(request.sender_id, leader_id, request.payload)
 947            .await?;
 948        response_payload
 949            .views
 950            .retain(|view| view.leader_id != Some(follower_id.0));
 951        response.send(response_payload)?;
 952        Ok(())
 953    }
 954
 955    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 956        let leader_id = ConnectionId(request.payload.leader_id);
 957        if !self
 958            .store()
 959            .await
 960            .project_connection_ids(request.payload.project_id, request.sender_id)?
 961            .contains(&leader_id)
 962        {
 963            Err(anyhow!("no such peer"))?;
 964        }
 965        self.peer
 966            .forward_send(request.sender_id, leader_id, request.payload)?;
 967        Ok(())
 968    }
 969
 970    async fn update_followers(
 971        self: Arc<Self>,
 972        request: TypedEnvelope<proto::UpdateFollowers>,
 973    ) -> Result<()> {
 974        let connection_ids = self
 975            .store()
 976            .await
 977            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 978        let leader_id = request
 979            .payload
 980            .variant
 981            .as_ref()
 982            .and_then(|variant| match variant {
 983                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 984                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 985                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 986            });
 987        for follower_id in &request.payload.follower_ids {
 988            let follower_id = ConnectionId(*follower_id);
 989            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 990                self.peer
 991                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 992            }
 993        }
 994        Ok(())
 995    }
 996
 997    async fn get_channels(
 998        self: Arc<Server>,
 999        request: TypedEnvelope<proto::GetChannels>,
1000        response: Response<proto::GetChannels>,
1001    ) -> Result<()> {
1002        let user_id = self
1003            .store()
1004            .await
1005            .user_id_for_connection(request.sender_id)?;
1006        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
1007        response.send(proto::GetChannelsResponse {
1008            channels: channels
1009                .into_iter()
1010                .map(|chan| proto::Channel {
1011                    id: chan.id.to_proto(),
1012                    name: chan.name,
1013                })
1014                .collect(),
1015        })?;
1016        Ok(())
1017    }
1018
1019    async fn get_users(
1020        self: Arc<Server>,
1021        request: TypedEnvelope<proto::GetUsers>,
1022        response: Response<proto::GetUsers>,
1023    ) -> Result<()> {
1024        let user_ids = request
1025            .payload
1026            .user_ids
1027            .into_iter()
1028            .map(UserId::from_proto)
1029            .collect();
1030        let users = self
1031            .app_state
1032            .db
1033            .get_users_by_ids(user_ids)
1034            .await?
1035            .into_iter()
1036            .map(|user| proto::User {
1037                id: user.id.to_proto(),
1038                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1039                github_login: user.github_login,
1040            })
1041            .collect();
1042        response.send(proto::UsersResponse { users })?;
1043        Ok(())
1044    }
1045
1046    async fn fuzzy_search_users(
1047        self: Arc<Server>,
1048        request: TypedEnvelope<proto::FuzzySearchUsers>,
1049        response: Response<proto::FuzzySearchUsers>,
1050    ) -> Result<()> {
1051        let user_id = self
1052            .store()
1053            .await
1054            .user_id_for_connection(request.sender_id)?;
1055        let query = request.payload.query;
1056        let db = &self.app_state.db;
1057        let users = match query.len() {
1058            0 => vec![],
1059            1 | 2 => db
1060                .get_user_by_github_login(&query)
1061                .await?
1062                .into_iter()
1063                .collect(),
1064            _ => db.fuzzy_search_users(&query, 10).await?,
1065        };
1066        let users = users
1067            .into_iter()
1068            .filter(|user| user.id != user_id)
1069            .map(|user| proto::User {
1070                id: user.id.to_proto(),
1071                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1072                github_login: user.github_login,
1073            })
1074            .collect();
1075        response.send(proto::UsersResponse { users })?;
1076        Ok(())
1077    }
1078
1079    async fn request_contact(
1080        self: Arc<Server>,
1081        request: TypedEnvelope<proto::RequestContact>,
1082        response: Response<proto::RequestContact>,
1083    ) -> Result<()> {
1084        let requester_id = self
1085            .store()
1086            .await
1087            .user_id_for_connection(request.sender_id)?;
1088        let responder_id = UserId::from_proto(request.payload.responder_id);
1089        if requester_id == responder_id {
1090            return Err(anyhow!("cannot add yourself as a contact"))?;
1091        }
1092
1093        self.app_state
1094            .db
1095            .send_contact_request(requester_id, responder_id)
1096            .await?;
1097
1098        // Update outgoing contact requests of requester
1099        let mut update = proto::UpdateContacts::default();
1100        update.outgoing_requests.push(responder_id.to_proto());
1101        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1102            self.peer.send(connection_id, update.clone())?;
1103        }
1104
1105        // Update incoming contact requests of responder
1106        let mut update = proto::UpdateContacts::default();
1107        update
1108            .incoming_requests
1109            .push(proto::IncomingContactRequest {
1110                requester_id: requester_id.to_proto(),
1111                should_notify: true,
1112            });
1113        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1114            self.peer.send(connection_id, update.clone())?;
1115        }
1116
1117        response.send(proto::Ack {})?;
1118        Ok(())
1119    }
1120
1121    async fn respond_to_contact_request(
1122        self: Arc<Server>,
1123        request: TypedEnvelope<proto::RespondToContactRequest>,
1124        response: Response<proto::RespondToContactRequest>,
1125    ) -> Result<()> {
1126        let responder_id = self
1127            .store()
1128            .await
1129            .user_id_for_connection(request.sender_id)?;
1130        let requester_id = UserId::from_proto(request.payload.requester_id);
1131        if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1132            self.app_state
1133                .db
1134                .dismiss_contact_notification(responder_id, requester_id)
1135                .await?;
1136        } else {
1137            let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1138            self.app_state
1139                .db
1140                .respond_to_contact_request(responder_id, requester_id, accept)
1141                .await?;
1142
1143            let store = self.store().await;
1144            // Update responder with new contact
1145            let mut update = proto::UpdateContacts::default();
1146            if accept {
1147                update
1148                    .contacts
1149                    .push(store.contact_for_user(requester_id, false));
1150            }
1151            update
1152                .remove_incoming_requests
1153                .push(requester_id.to_proto());
1154            for connection_id in store.connection_ids_for_user(responder_id) {
1155                self.peer.send(connection_id, update.clone())?;
1156            }
1157
1158            // Update requester with new contact
1159            let mut update = proto::UpdateContacts::default();
1160            if accept {
1161                update
1162                    .contacts
1163                    .push(store.contact_for_user(responder_id, true));
1164            }
1165            update
1166                .remove_outgoing_requests
1167                .push(responder_id.to_proto());
1168            for connection_id in store.connection_ids_for_user(requester_id) {
1169                self.peer.send(connection_id, update.clone())?;
1170            }
1171        }
1172
1173        response.send(proto::Ack {})?;
1174        Ok(())
1175    }
1176
1177    async fn remove_contact(
1178        self: Arc<Server>,
1179        request: TypedEnvelope<proto::RemoveContact>,
1180        response: Response<proto::RemoveContact>,
1181    ) -> Result<()> {
1182        let requester_id = self
1183            .store()
1184            .await
1185            .user_id_for_connection(request.sender_id)?;
1186        let responder_id = UserId::from_proto(request.payload.user_id);
1187        self.app_state
1188            .db
1189            .remove_contact(requester_id, responder_id)
1190            .await?;
1191
1192        // Update outgoing contact requests of requester
1193        let mut update = proto::UpdateContacts::default();
1194        update
1195            .remove_outgoing_requests
1196            .push(responder_id.to_proto());
1197        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1198            self.peer.send(connection_id, update.clone())?;
1199        }
1200
1201        // Update incoming contact requests of responder
1202        let mut update = proto::UpdateContacts::default();
1203        update
1204            .remove_incoming_requests
1205            .push(requester_id.to_proto());
1206        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1207            self.peer.send(connection_id, update.clone())?;
1208        }
1209
1210        response.send(proto::Ack {})?;
1211        Ok(())
1212    }
1213
1214    async fn join_channel(
1215        self: Arc<Self>,
1216        request: TypedEnvelope<proto::JoinChannel>,
1217        response: Response<proto::JoinChannel>,
1218    ) -> Result<()> {
1219        let user_id = self
1220            .store()
1221            .await
1222            .user_id_for_connection(request.sender_id)?;
1223        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1224        if !self
1225            .app_state
1226            .db
1227            .can_user_access_channel(user_id, channel_id)
1228            .await?
1229        {
1230            Err(anyhow!("access denied"))?;
1231        }
1232
1233        self.store_mut()
1234            .await
1235            .join_channel(request.sender_id, channel_id);
1236        let messages = self
1237            .app_state
1238            .db
1239            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1240            .await?
1241            .into_iter()
1242            .map(|msg| proto::ChannelMessage {
1243                id: msg.id.to_proto(),
1244                body: msg.body,
1245                timestamp: msg.sent_at.unix_timestamp() as u64,
1246                sender_id: msg.sender_id.to_proto(),
1247                nonce: Some(msg.nonce.as_u128().into()),
1248            })
1249            .collect::<Vec<_>>();
1250        response.send(proto::JoinChannelResponse {
1251            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1252            messages,
1253        })?;
1254        Ok(())
1255    }
1256
1257    async fn leave_channel(
1258        self: Arc<Self>,
1259        request: TypedEnvelope<proto::LeaveChannel>,
1260    ) -> Result<()> {
1261        let user_id = self
1262            .store()
1263            .await
1264            .user_id_for_connection(request.sender_id)?;
1265        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1266        if !self
1267            .app_state
1268            .db
1269            .can_user_access_channel(user_id, channel_id)
1270            .await?
1271        {
1272            Err(anyhow!("access denied"))?;
1273        }
1274
1275        self.store_mut()
1276            .await
1277            .leave_channel(request.sender_id, channel_id);
1278
1279        Ok(())
1280    }
1281
1282    async fn send_channel_message(
1283        self: Arc<Self>,
1284        request: TypedEnvelope<proto::SendChannelMessage>,
1285        response: Response<proto::SendChannelMessage>,
1286    ) -> Result<()> {
1287        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1288        let user_id;
1289        let connection_ids;
1290        {
1291            let state = self.store().await;
1292            user_id = state.user_id_for_connection(request.sender_id)?;
1293            connection_ids = state.channel_connection_ids(channel_id)?;
1294        }
1295
1296        // Validate the message body.
1297        let body = request.payload.body.trim().to_string();
1298        if body.len() > MAX_MESSAGE_LEN {
1299            return Err(anyhow!("message is too long"))?;
1300        }
1301        if body.is_empty() {
1302            return Err(anyhow!("message can't be blank"))?;
1303        }
1304
1305        let timestamp = OffsetDateTime::now_utc();
1306        let nonce = request
1307            .payload
1308            .nonce
1309            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1310
1311        let message_id = self
1312            .app_state
1313            .db
1314            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1315            .await?
1316            .to_proto();
1317        let message = proto::ChannelMessage {
1318            sender_id: user_id.to_proto(),
1319            id: message_id,
1320            body,
1321            timestamp: timestamp.unix_timestamp() as u64,
1322            nonce: Some(nonce),
1323        };
1324        broadcast(request.sender_id, connection_ids, |conn_id| {
1325            self.peer.send(
1326                conn_id,
1327                proto::ChannelMessageSent {
1328                    channel_id: channel_id.to_proto(),
1329                    message: Some(message.clone()),
1330                },
1331            )
1332        });
1333        response.send(proto::SendChannelMessageResponse {
1334            message: Some(message),
1335        })?;
1336        Ok(())
1337    }
1338
1339    async fn get_channel_messages(
1340        self: Arc<Self>,
1341        request: TypedEnvelope<proto::GetChannelMessages>,
1342        response: Response<proto::GetChannelMessages>,
1343    ) -> Result<()> {
1344        let user_id = self
1345            .store()
1346            .await
1347            .user_id_for_connection(request.sender_id)?;
1348        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1349        if !self
1350            .app_state
1351            .db
1352            .can_user_access_channel(user_id, channel_id)
1353            .await?
1354        {
1355            Err(anyhow!("access denied"))?;
1356        }
1357
1358        let messages = self
1359            .app_state
1360            .db
1361            .get_channel_messages(
1362                channel_id,
1363                MESSAGE_COUNT_PER_PAGE,
1364                Some(MessageId::from_proto(request.payload.before_message_id)),
1365            )
1366            .await?
1367            .into_iter()
1368            .map(|msg| proto::ChannelMessage {
1369                id: msg.id.to_proto(),
1370                body: msg.body,
1371                timestamp: msg.sent_at.unix_timestamp() as u64,
1372                sender_id: msg.sender_id.to_proto(),
1373                nonce: Some(msg.nonce.as_u128().into()),
1374            })
1375            .collect::<Vec<_>>();
1376        response.send(proto::GetChannelMessagesResponse {
1377            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1378            messages,
1379        })?;
1380        Ok(())
1381    }
1382
1383    async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1384        #[cfg(test)]
1385        tokio::task::yield_now().await;
1386        let guard = self.store.read().await;
1387        #[cfg(test)]
1388        tokio::task::yield_now().await;
1389        StoreReadGuard {
1390            guard,
1391            _not_send: PhantomData,
1392        }
1393    }
1394
1395    async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1396        #[cfg(test)]
1397        tokio::task::yield_now().await;
1398        let guard = self.store.write().await;
1399        #[cfg(test)]
1400        tokio::task::yield_now().await;
1401        StoreWriteGuard {
1402            guard,
1403            _not_send: PhantomData,
1404        }
1405    }
1406}
1407
1408impl<'a> Deref for StoreReadGuard<'a> {
1409    type Target = Store;
1410
1411    fn deref(&self) -> &Self::Target {
1412        &*self.guard
1413    }
1414}
1415
1416impl<'a> Deref for StoreWriteGuard<'a> {
1417    type Target = Store;
1418
1419    fn deref(&self) -> &Self::Target {
1420        &*self.guard
1421    }
1422}
1423
1424impl<'a> DerefMut for StoreWriteGuard<'a> {
1425    fn deref_mut(&mut self) -> &mut Self::Target {
1426        &mut *self.guard
1427    }
1428}
1429
1430impl<'a> Drop for StoreWriteGuard<'a> {
1431    fn drop(&mut self) {
1432        #[cfg(test)]
1433        self.check_invariants();
1434
1435        let metrics = self.metrics();
1436        tracing::info!(
1437            connections = metrics.connections,
1438            registered_projects = metrics.registered_projects,
1439            shared_projects = metrics.shared_projects,
1440            "metrics"
1441        );
1442    }
1443}
1444
1445impl Executor for RealExecutor {
1446    type Sleep = Sleep;
1447
1448    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1449        tokio::task::spawn(future);
1450    }
1451
1452    fn sleep(&self, duration: Duration) -> Self::Sleep {
1453        tokio::time::sleep(duration)
1454    }
1455}
1456
1457fn broadcast<F>(
1458    sender_id: ConnectionId,
1459    receiver_ids: impl IntoIterator<Item = ConnectionId>,
1460    mut f: F,
1461) where
1462    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1463{
1464    for receiver_id in receiver_ids {
1465        if receiver_id != sender_id {
1466            f(receiver_id).trace_err();
1467        }
1468    }
1469}
1470
1471lazy_static! {
1472    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1473}
1474
1475pub struct ProtocolVersion(u32);
1476
1477impl Header for ProtocolVersion {
1478    fn name() -> &'static HeaderName {
1479        &ZED_PROTOCOL_VERSION
1480    }
1481
1482    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1483    where
1484        Self: Sized,
1485        I: Iterator<Item = &'i axum::http::HeaderValue>,
1486    {
1487        let version = values
1488            .next()
1489            .ok_or_else(|| axum::headers::Error::invalid())?
1490            .to_str()
1491            .map_err(|_| axum::headers::Error::invalid())?
1492            .parse()
1493            .map_err(|_| axum::headers::Error::invalid())?;
1494        Ok(Self(version))
1495    }
1496
1497    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1498        values.extend([self.0.to_string().parse().unwrap()]);
1499    }
1500}
1501
1502pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1503    let server = Server::new(app_state.clone(), None);
1504    Router::new()
1505        .route("/rpc", get(handle_websocket_request))
1506        .layer(
1507            ServiceBuilder::new()
1508                .layer(Extension(app_state))
1509                .layer(middleware::from_fn(auth::validate_header))
1510                .layer(Extension(server)),
1511        )
1512}
1513
1514pub async fn handle_websocket_request(
1515    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1516    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1517    Extension(server): Extension<Arc<Server>>,
1518    Extension(user_id): Extension<UserId>,
1519    ws: WebSocketUpgrade,
1520) -> axum::response::Response {
1521    if protocol_version != rpc::PROTOCOL_VERSION {
1522        return (
1523            StatusCode::UPGRADE_REQUIRED,
1524            "client must be upgraded".to_string(),
1525        )
1526            .into_response();
1527    }
1528    let socket_address = socket_address.to_string();
1529    ws.on_upgrade(move |socket| {
1530        use util::ResultExt;
1531        let socket = socket
1532            .map_ok(to_tungstenite_message)
1533            .err_into()
1534            .with(|message| async move { Ok(to_axum_message(message)) });
1535        let connection = Connection::new(Box::pin(socket));
1536        async move {
1537            server
1538                .handle_connection(connection, socket_address, user_id, None, RealExecutor)
1539                .await
1540                .log_err();
1541        }
1542    })
1543}
1544
1545fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1546    match message {
1547        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1548        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1549        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1550        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1551        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1552            code: frame.code.into(),
1553            reason: frame.reason,
1554        })),
1555    }
1556}
1557
1558fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1559    match message {
1560        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1561        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1562        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1563        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1564        AxumMessage::Close(frame) => {
1565            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1566                code: frame.code.into(),
1567                reason: frame.reason,
1568            }))
1569        }
1570    }
1571}
1572
1573pub trait ResultExt {
1574    type Ok;
1575
1576    fn trace_err(self) -> Option<Self::Ok>;
1577}
1578
1579impl<T, E> ResultExt for Result<T, E>
1580where
1581    E: std::fmt::Debug,
1582{
1583    type Ok = T;
1584
1585    fn trace_err(self) -> Option<T> {
1586        match self {
1587            Ok(value) => Some(value),
1588            Err(error) => {
1589                tracing::error!("{:?}", error);
1590                None
1591            }
1592        }
1593    }
1594}
1595
1596#[cfg(test)]
1597mod tests {
1598    use super::*;
1599    use crate::{
1600        db::{tests::TestDb, UserId},
1601        AppState,
1602    };
1603    use ::rpc::Peer;
1604    use client::{
1605        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1606        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1607    };
1608    use collections::{BTreeMap, HashSet};
1609    use editor::{
1610        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1611        ToOffset, ToggleCodeActions, Undo,
1612    };
1613    use gpui::{
1614        executor::{self, Deterministic},
1615        geometry::vector::vec2f,
1616        ModelHandle, TestAppContext, ViewHandle,
1617    };
1618    use language::{
1619        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1620        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1621    };
1622    use lsp::{self, FakeLanguageServer};
1623    use parking_lot::Mutex;
1624    use project::{
1625        fs::{FakeFs, Fs as _},
1626        search::SearchQuery,
1627        worktree::WorktreeHandle,
1628        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1629    };
1630    use rand::prelude::*;
1631    use rpc::PeerId;
1632    use serde_json::json;
1633    use settings::Settings;
1634    use sqlx::types::time::OffsetDateTime;
1635    use std::{
1636        env,
1637        ops::Deref,
1638        path::{Path, PathBuf},
1639        rc::Rc,
1640        sync::{
1641            atomic::{AtomicBool, Ordering::SeqCst},
1642            Arc,
1643        },
1644        time::Duration,
1645    };
1646    use theme::ThemeRegistry;
1647    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1648
1649    #[cfg(test)]
1650    #[ctor::ctor]
1651    fn init_logger() {
1652        if std::env::var("RUST_LOG").is_ok() {
1653            env_logger::init();
1654        }
1655    }
1656
1657    #[gpui::test(iterations = 10)]
1658    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1659        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1660        let lang_registry = Arc::new(LanguageRegistry::test());
1661        let fs = FakeFs::new(cx_a.background());
1662        cx_a.foreground().forbid_parking();
1663
1664        // Connect to a server as 2 clients.
1665        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1666        let client_a = server.create_client(cx_a, "user_a").await;
1667        let mut client_b = server.create_client(cx_b, "user_b").await;
1668        server
1669            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1670            .await;
1671
1672        // Share a project as client A
1673        fs.insert_tree(
1674            "/a",
1675            json!({
1676                "a.txt": "a-contents",
1677                "b.txt": "b-contents",
1678            }),
1679        )
1680        .await;
1681        let project_a = cx_a.update(|cx| {
1682            Project::local(
1683                client_a.clone(),
1684                client_a.user_store.clone(),
1685                lang_registry.clone(),
1686                fs.clone(),
1687                cx,
1688            )
1689        });
1690        let (worktree_a, _) = project_a
1691            .update(cx_a, |p, cx| {
1692                p.find_or_create_local_worktree("/a", true, cx)
1693            })
1694            .await
1695            .unwrap();
1696        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1697        worktree_a
1698            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1699            .await;
1700
1701        // Join that project as client B
1702        let client_b_peer_id = client_b.peer_id;
1703        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1704
1705        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1706            assert_eq!(
1707                project
1708                    .collaborators()
1709                    .get(&client_a.peer_id)
1710                    .unwrap()
1711                    .user
1712                    .github_login,
1713                "user_a"
1714            );
1715            project.replica_id()
1716        });
1717        project_a
1718            .condition(&cx_a, |tree, _| {
1719                tree.collaborators()
1720                    .get(&client_b_peer_id)
1721                    .map_or(false, |collaborator| {
1722                        collaborator.replica_id == replica_id_b
1723                            && collaborator.user.github_login == "user_b"
1724                    })
1725            })
1726            .await;
1727
1728        // Open the same file as client B and client A.
1729        let buffer_b = project_b
1730            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1731            .await
1732            .unwrap();
1733        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1734        project_a.read_with(cx_a, |project, cx| {
1735            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1736        });
1737        let buffer_a = project_a
1738            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1739            .await
1740            .unwrap();
1741
1742        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1743
1744        // TODO
1745        // // Create a selection set as client B and see that selection set as client A.
1746        // buffer_a
1747        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1748        //     .await;
1749
1750        // Edit the buffer as client B and see that edit as client A.
1751        editor_b.update(cx_b, |editor, cx| {
1752            editor.handle_input(&Input("ok, ".into()), cx)
1753        });
1754        buffer_a
1755            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1756            .await;
1757
1758        // TODO
1759        // // Remove the selection set as client B, see those selections disappear as client A.
1760        cx_b.update(move |_| drop(editor_b));
1761        // buffer_a
1762        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1763        //     .await;
1764
1765        // Dropping the client B's project removes client B from client A's collaborators.
1766        cx_b.update(move |_| {
1767            drop(client_b.project.take());
1768            drop(project_b);
1769        });
1770        project_a
1771            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1772            .await;
1773    }
1774
1775    #[gpui::test(iterations = 10)]
1776    async fn test_unshare_project(
1777        deterministic: Arc<Deterministic>,
1778        cx_a: &mut TestAppContext,
1779        cx_b: &mut TestAppContext,
1780    ) {
1781        let lang_registry = Arc::new(LanguageRegistry::test());
1782        let fs = FakeFs::new(cx_a.background());
1783        cx_a.foreground().forbid_parking();
1784
1785        // Connect to a server as 2 clients.
1786        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1787        let client_a = server.create_client(cx_a, "user_a").await;
1788        let mut client_b = server.create_client(cx_b, "user_b").await;
1789        server
1790            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1791            .await;
1792
1793        // Share a project as client A
1794        fs.insert_tree(
1795            "/a",
1796            json!({
1797                "a.txt": "a-contents",
1798                "b.txt": "b-contents",
1799            }),
1800        )
1801        .await;
1802        let project_a = cx_a.update(|cx| {
1803            Project::local(
1804                client_a.clone(),
1805                client_a.user_store.clone(),
1806                lang_registry.clone(),
1807                fs.clone(),
1808                cx,
1809            )
1810        });
1811        let (worktree_a, _) = project_a
1812            .update(cx_a, |p, cx| {
1813                p.find_or_create_local_worktree("/a", true, cx)
1814            })
1815            .await
1816            .unwrap();
1817        worktree_a
1818            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1819            .await;
1820        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1821
1822        // Join that project as client B
1823        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1824        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1825        project_b
1826            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1827            .await
1828            .unwrap();
1829
1830        // When client B leaves the project, it gets automatically unshared.
1831        cx_b.update(|_| {
1832            drop(client_b.project.take());
1833            drop(project_b);
1834        });
1835        deterministic.run_until_parked();
1836        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1837
1838        // When client B joins again, the project gets re-shared.
1839        let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1840        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1841        project_b2
1842            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1843            .await
1844            .unwrap();
1845    }
1846
1847    #[gpui::test(iterations = 10)]
1848    async fn test_host_disconnect(
1849        cx_a: &mut TestAppContext,
1850        cx_b: &mut TestAppContext,
1851        cx_c: &mut TestAppContext,
1852    ) {
1853        let lang_registry = Arc::new(LanguageRegistry::test());
1854        let fs = FakeFs::new(cx_a.background());
1855        cx_a.foreground().forbid_parking();
1856
1857        // Connect to a server as 3 clients.
1858        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1859        let client_a = server.create_client(cx_a, "user_a").await;
1860        let mut client_b = server.create_client(cx_b, "user_b").await;
1861        let client_c = server.create_client(cx_c, "user_c").await;
1862        server
1863            .make_contacts(vec![
1864                (&client_a, cx_a),
1865                (&client_b, cx_b),
1866                (&client_c, cx_c),
1867            ])
1868            .await;
1869
1870        // Share a project as client A
1871        fs.insert_tree(
1872            "/a",
1873            json!({
1874                "a.txt": "a-contents",
1875                "b.txt": "b-contents",
1876            }),
1877        )
1878        .await;
1879        let project_a = cx_a.update(|cx| {
1880            Project::local(
1881                client_a.clone(),
1882                client_a.user_store.clone(),
1883                lang_registry.clone(),
1884                fs.clone(),
1885                cx,
1886            )
1887        });
1888        let project_id = project_a
1889            .read_with(cx_a, |project, _| project.next_remote_id())
1890            .await;
1891        let (worktree_a, _) = project_a
1892            .update(cx_a, |p, cx| {
1893                p.find_or_create_local_worktree("/a", true, cx)
1894            })
1895            .await
1896            .unwrap();
1897        worktree_a
1898            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1899            .await;
1900        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1901
1902        // Join that project as client B
1903        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1904        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1905        project_b
1906            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1907            .await
1908            .unwrap();
1909
1910        // Request to join that project as client C
1911        let project_c = cx_c.spawn(|mut cx| {
1912            let client = client_c.client.clone();
1913            let user_store = client_c.user_store.clone();
1914            let lang_registry = lang_registry.clone();
1915            async move {
1916                Project::remote(
1917                    project_id,
1918                    client,
1919                    user_store,
1920                    lang_registry.clone(),
1921                    FakeFs::new(cx.background()),
1922                    &mut cx,
1923                )
1924                .await
1925            }
1926        });
1927
1928        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1929        server.disconnect_client(client_a.current_user_id(cx_a));
1930        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1931        project_a
1932            .condition(cx_a, |project, _| project.collaborators().is_empty())
1933            .await;
1934        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1935        project_b
1936            .condition(cx_b, |project, _| project.is_read_only())
1937            .await;
1938        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1939        cx_b.update(|_| {
1940            drop(project_b);
1941        });
1942        assert!(matches!(
1943            project_c.await.unwrap_err(),
1944            project::JoinProjectError::HostWentOffline
1945        ));
1946
1947        // Ensure guests can still join.
1948        let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1949        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1950        project_b2
1951            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1952            .await
1953            .unwrap();
1954    }
1955
1956    #[gpui::test(iterations = 10)]
1957    async fn test_decline_join_request(
1958        deterministic: Arc<Deterministic>,
1959        cx_a: &mut TestAppContext,
1960        cx_b: &mut TestAppContext,
1961    ) {
1962        let lang_registry = Arc::new(LanguageRegistry::test());
1963        let fs = FakeFs::new(cx_a.background());
1964        cx_a.foreground().forbid_parking();
1965
1966        // Connect to a server as 2 clients.
1967        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1968        let client_a = server.create_client(cx_a, "user_a").await;
1969        let client_b = server.create_client(cx_b, "user_b").await;
1970        server
1971            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1972            .await;
1973
1974        // Share a project as client A
1975        fs.insert_tree("/a", json!({})).await;
1976        let project_a = cx_a.update(|cx| {
1977            Project::local(
1978                client_a.clone(),
1979                client_a.user_store.clone(),
1980                lang_registry.clone(),
1981                fs.clone(),
1982                cx,
1983            )
1984        });
1985        let project_id = project_a
1986            .read_with(cx_a, |project, _| project.next_remote_id())
1987            .await;
1988        let (worktree_a, _) = project_a
1989            .update(cx_a, |p, cx| {
1990                p.find_or_create_local_worktree("/a", true, cx)
1991            })
1992            .await
1993            .unwrap();
1994        worktree_a
1995            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1996            .await;
1997
1998        // Request to join that project as client B
1999        let project_b = cx_b.spawn(|mut cx| {
2000            let client = client_b.client.clone();
2001            let user_store = client_b.user_store.clone();
2002            let lang_registry = lang_registry.clone();
2003            async move {
2004                Project::remote(
2005                    project_id,
2006                    client,
2007                    user_store,
2008                    lang_registry.clone(),
2009                    FakeFs::new(cx.background()),
2010                    &mut cx,
2011                )
2012                .await
2013            }
2014        });
2015        deterministic.run_until_parked();
2016        project_a.update(cx_a, |project, cx| {
2017            project.respond_to_join_request(client_b.user_id().unwrap(), false, cx)
2018        });
2019        assert!(matches!(
2020            project_b.await.unwrap_err(),
2021            project::JoinProjectError::HostDeclined
2022        ));
2023
2024        // Request to join the project again as client B
2025        let project_b = cx_b.spawn(|mut cx| {
2026            let client = client_b.client.clone();
2027            let user_store = client_b.user_store.clone();
2028            let lang_registry = lang_registry.clone();
2029            async move {
2030                Project::remote(
2031                    project_id,
2032                    client,
2033                    user_store,
2034                    lang_registry.clone(),
2035                    FakeFs::new(cx.background()),
2036                    &mut cx,
2037                )
2038                .await
2039            }
2040        });
2041
2042        // Close the project on the host
2043        deterministic.run_until_parked();
2044        cx_a.update(|_| drop(project_a));
2045        deterministic.run_until_parked();
2046        assert!(matches!(
2047            project_b.await.unwrap_err(),
2048            project::JoinProjectError::HostClosedProject
2049        ));
2050    }
2051
2052    #[gpui::test(iterations = 10)]
2053    async fn test_propagate_saves_and_fs_changes(
2054        cx_a: &mut TestAppContext,
2055        cx_b: &mut TestAppContext,
2056        cx_c: &mut TestAppContext,
2057    ) {
2058        let lang_registry = Arc::new(LanguageRegistry::test());
2059        let fs = FakeFs::new(cx_a.background());
2060        cx_a.foreground().forbid_parking();
2061
2062        // Connect to a server as 3 clients.
2063        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2064        let client_a = server.create_client(cx_a, "user_a").await;
2065        let mut client_b = server.create_client(cx_b, "user_b").await;
2066        let mut client_c = server.create_client(cx_c, "user_c").await;
2067        server
2068            .make_contacts(vec![
2069                (&client_a, cx_a),
2070                (&client_b, cx_b),
2071                (&client_c, cx_c),
2072            ])
2073            .await;
2074
2075        // Share a worktree as client A.
2076        fs.insert_tree(
2077            "/a",
2078            json!({
2079                "file1": "",
2080                "file2": ""
2081            }),
2082        )
2083        .await;
2084        let project_a = cx_a.update(|cx| {
2085            Project::local(
2086                client_a.clone(),
2087                client_a.user_store.clone(),
2088                lang_registry.clone(),
2089                fs.clone(),
2090                cx,
2091            )
2092        });
2093        let (worktree_a, _) = project_a
2094            .update(cx_a, |p, cx| {
2095                p.find_or_create_local_worktree("/a", true, cx)
2096            })
2097            .await
2098            .unwrap();
2099        worktree_a
2100            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2101            .await;
2102        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2103
2104        // Join that worktree as clients B and C.
2105        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2106        let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
2107        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2108        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
2109
2110        // Open and edit a buffer as both guests B and C.
2111        let buffer_b = project_b
2112            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2113            .await
2114            .unwrap();
2115        let buffer_c = project_c
2116            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2117            .await
2118            .unwrap();
2119        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
2120        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
2121
2122        // Open and edit that buffer as the host.
2123        let buffer_a = project_a
2124            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2125            .await
2126            .unwrap();
2127
2128        buffer_a
2129            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
2130            .await;
2131        buffer_a.update(cx_a, |buf, cx| {
2132            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
2133        });
2134
2135        // Wait for edits to propagate
2136        buffer_a
2137            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2138            .await;
2139        buffer_b
2140            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2141            .await;
2142        buffer_c
2143            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2144            .await;
2145
2146        // Edit the buffer as the host and concurrently save as guest B.
2147        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2148        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2149        save_b.await.unwrap();
2150        assert_eq!(
2151            fs.load("/a/file1".as_ref()).await.unwrap(),
2152            "hi-a, i-am-c, i-am-b, i-am-a"
2153        );
2154        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2155        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2156        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2157
2158        worktree_a.flush_fs_events(cx_a).await;
2159
2160        // Make changes on host's file system, see those changes on guest worktrees.
2161        fs.rename(
2162            "/a/file1".as_ref(),
2163            "/a/file1-renamed".as_ref(),
2164            Default::default(),
2165        )
2166        .await
2167        .unwrap();
2168
2169        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2170            .await
2171            .unwrap();
2172        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2173
2174        worktree_a
2175            .condition(&cx_a, |tree, _| {
2176                tree.paths()
2177                    .map(|p| p.to_string_lossy())
2178                    .collect::<Vec<_>>()
2179                    == ["file1-renamed", "file3", "file4"]
2180            })
2181            .await;
2182        worktree_b
2183            .condition(&cx_b, |tree, _| {
2184                tree.paths()
2185                    .map(|p| p.to_string_lossy())
2186                    .collect::<Vec<_>>()
2187                    == ["file1-renamed", "file3", "file4"]
2188            })
2189            .await;
2190        worktree_c
2191            .condition(&cx_c, |tree, _| {
2192                tree.paths()
2193                    .map(|p| p.to_string_lossy())
2194                    .collect::<Vec<_>>()
2195                    == ["file1-renamed", "file3", "file4"]
2196            })
2197            .await;
2198
2199        // Ensure buffer files are updated as well.
2200        buffer_a
2201            .condition(&cx_a, |buf, _| {
2202                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2203            })
2204            .await;
2205        buffer_b
2206            .condition(&cx_b, |buf, _| {
2207                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2208            })
2209            .await;
2210        buffer_c
2211            .condition(&cx_c, |buf, _| {
2212                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2213            })
2214            .await;
2215    }
2216
2217    #[gpui::test(iterations = 10)]
2218    async fn test_fs_operations(
2219        executor: Arc<Deterministic>,
2220        cx_a: &mut TestAppContext,
2221        cx_b: &mut TestAppContext,
2222    ) {
2223        executor.forbid_parking();
2224        let fs = FakeFs::new(cx_a.background());
2225
2226        // Connect to a server as 2 clients.
2227        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2228        let mut client_a = server.create_client(cx_a, "user_a").await;
2229        let mut client_b = server.create_client(cx_b, "user_b").await;
2230        server
2231            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2232            .await;
2233
2234        // Share a project as client A
2235        fs.insert_tree(
2236            "/dir",
2237            json!({
2238                "a.txt": "a-contents",
2239                "b.txt": "b-contents",
2240            }),
2241        )
2242        .await;
2243
2244        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2245        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2246
2247        let worktree_a =
2248            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2249        let worktree_b =
2250            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2251
2252        let entry = project_b
2253            .update(cx_b, |project, cx| {
2254                project
2255                    .create_entry((worktree_id, "c.txt"), false, cx)
2256                    .unwrap()
2257            })
2258            .await
2259            .unwrap();
2260        worktree_a.read_with(cx_a, |worktree, _| {
2261            assert_eq!(
2262                worktree
2263                    .paths()
2264                    .map(|p| p.to_string_lossy())
2265                    .collect::<Vec<_>>(),
2266                ["a.txt", "b.txt", "c.txt"]
2267            );
2268        });
2269        worktree_b.read_with(cx_b, |worktree, _| {
2270            assert_eq!(
2271                worktree
2272                    .paths()
2273                    .map(|p| p.to_string_lossy())
2274                    .collect::<Vec<_>>(),
2275                ["a.txt", "b.txt", "c.txt"]
2276            );
2277        });
2278
2279        project_b
2280            .update(cx_b, |project, cx| {
2281                project.rename_entry(entry.id, Path::new("d.txt"), cx)
2282            })
2283            .unwrap()
2284            .await
2285            .unwrap();
2286        worktree_a.read_with(cx_a, |worktree, _| {
2287            assert_eq!(
2288                worktree
2289                    .paths()
2290                    .map(|p| p.to_string_lossy())
2291                    .collect::<Vec<_>>(),
2292                ["a.txt", "b.txt", "d.txt"]
2293            );
2294        });
2295        worktree_b.read_with(cx_b, |worktree, _| {
2296            assert_eq!(
2297                worktree
2298                    .paths()
2299                    .map(|p| p.to_string_lossy())
2300                    .collect::<Vec<_>>(),
2301                ["a.txt", "b.txt", "d.txt"]
2302            );
2303        });
2304
2305        let dir_entry = project_b
2306            .update(cx_b, |project, cx| {
2307                project
2308                    .create_entry((worktree_id, "DIR"), true, cx)
2309                    .unwrap()
2310            })
2311            .await
2312            .unwrap();
2313        worktree_a.read_with(cx_a, |worktree, _| {
2314            assert_eq!(
2315                worktree
2316                    .paths()
2317                    .map(|p| p.to_string_lossy())
2318                    .collect::<Vec<_>>(),
2319                ["DIR", "a.txt", "b.txt", "d.txt"]
2320            );
2321        });
2322        worktree_b.read_with(cx_b, |worktree, _| {
2323            assert_eq!(
2324                worktree
2325                    .paths()
2326                    .map(|p| p.to_string_lossy())
2327                    .collect::<Vec<_>>(),
2328                ["DIR", "a.txt", "b.txt", "d.txt"]
2329            );
2330        });
2331
2332        project_b
2333            .update(cx_b, |project, cx| {
2334                project.delete_entry(dir_entry.id, cx).unwrap()
2335            })
2336            .await
2337            .unwrap();
2338        worktree_a.read_with(cx_a, |worktree, _| {
2339            assert_eq!(
2340                worktree
2341                    .paths()
2342                    .map(|p| p.to_string_lossy())
2343                    .collect::<Vec<_>>(),
2344                ["a.txt", "b.txt", "d.txt"]
2345            );
2346        });
2347        worktree_b.read_with(cx_b, |worktree, _| {
2348            assert_eq!(
2349                worktree
2350                    .paths()
2351                    .map(|p| p.to_string_lossy())
2352                    .collect::<Vec<_>>(),
2353                ["a.txt", "b.txt", "d.txt"]
2354            );
2355        });
2356
2357        project_b
2358            .update(cx_b, |project, cx| {
2359                project.delete_entry(entry.id, cx).unwrap()
2360            })
2361            .await
2362            .unwrap();
2363        worktree_a.read_with(cx_a, |worktree, _| {
2364            assert_eq!(
2365                worktree
2366                    .paths()
2367                    .map(|p| p.to_string_lossy())
2368                    .collect::<Vec<_>>(),
2369                ["a.txt", "b.txt"]
2370            );
2371        });
2372        worktree_b.read_with(cx_b, |worktree, _| {
2373            assert_eq!(
2374                worktree
2375                    .paths()
2376                    .map(|p| p.to_string_lossy())
2377                    .collect::<Vec<_>>(),
2378                ["a.txt", "b.txt"]
2379            );
2380        });
2381    }
2382
2383    #[gpui::test(iterations = 10)]
2384    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2385        cx_a.foreground().forbid_parking();
2386        let lang_registry = Arc::new(LanguageRegistry::test());
2387        let fs = FakeFs::new(cx_a.background());
2388
2389        // Connect to a server as 2 clients.
2390        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2391        let client_a = server.create_client(cx_a, "user_a").await;
2392        let mut client_b = server.create_client(cx_b, "user_b").await;
2393        server
2394            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2395            .await;
2396
2397        // Share a project as client A
2398        fs.insert_tree(
2399            "/dir",
2400            json!({
2401                "a.txt": "a-contents",
2402            }),
2403        )
2404        .await;
2405
2406        let project_a = cx_a.update(|cx| {
2407            Project::local(
2408                client_a.clone(),
2409                client_a.user_store.clone(),
2410                lang_registry.clone(),
2411                fs.clone(),
2412                cx,
2413            )
2414        });
2415        let (worktree_a, _) = project_a
2416            .update(cx_a, |p, cx| {
2417                p.find_or_create_local_worktree("/dir", true, cx)
2418            })
2419            .await
2420            .unwrap();
2421        worktree_a
2422            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2423            .await;
2424        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2425
2426        // Join that project as client B
2427        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2428
2429        // Open a buffer as client B
2430        let buffer_b = project_b
2431            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2432            .await
2433            .unwrap();
2434
2435        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2436        buffer_b.read_with(cx_b, |buf, _| {
2437            assert!(buf.is_dirty());
2438            assert!(!buf.has_conflict());
2439        });
2440
2441        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2442        buffer_b
2443            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2444            .await;
2445        buffer_b.read_with(cx_b, |buf, _| {
2446            assert!(!buf.has_conflict());
2447        });
2448
2449        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2450        buffer_b.read_with(cx_b, |buf, _| {
2451            assert!(buf.is_dirty());
2452            assert!(!buf.has_conflict());
2453        });
2454    }
2455
2456    #[gpui::test(iterations = 10)]
2457    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2458        cx_a.foreground().forbid_parking();
2459        let lang_registry = Arc::new(LanguageRegistry::test());
2460        let fs = FakeFs::new(cx_a.background());
2461
2462        // Connect to a server as 2 clients.
2463        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2464        let client_a = server.create_client(cx_a, "user_a").await;
2465        let mut client_b = server.create_client(cx_b, "user_b").await;
2466        server
2467            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2468            .await;
2469
2470        // Share a project as client A
2471        fs.insert_tree(
2472            "/dir",
2473            json!({
2474                "a.txt": "a-contents",
2475            }),
2476        )
2477        .await;
2478
2479        let project_a = cx_a.update(|cx| {
2480            Project::local(
2481                client_a.clone(),
2482                client_a.user_store.clone(),
2483                lang_registry.clone(),
2484                fs.clone(),
2485                cx,
2486            )
2487        });
2488        let (worktree_a, _) = project_a
2489            .update(cx_a, |p, cx| {
2490                p.find_or_create_local_worktree("/dir", true, cx)
2491            })
2492            .await
2493            .unwrap();
2494        worktree_a
2495            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2496            .await;
2497        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2498
2499        // Join that project as client B
2500        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2501        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2502
2503        // Open a buffer as client B
2504        let buffer_b = project_b
2505            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2506            .await
2507            .unwrap();
2508        buffer_b.read_with(cx_b, |buf, _| {
2509            assert!(!buf.is_dirty());
2510            assert!(!buf.has_conflict());
2511        });
2512
2513        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2514            .await
2515            .unwrap();
2516        buffer_b
2517            .condition(&cx_b, |buf, _| {
2518                buf.text() == "new contents" && !buf.is_dirty()
2519            })
2520            .await;
2521        buffer_b.read_with(cx_b, |buf, _| {
2522            assert!(!buf.has_conflict());
2523        });
2524    }
2525
2526    #[gpui::test(iterations = 10)]
2527    async fn test_editing_while_guest_opens_buffer(
2528        cx_a: &mut TestAppContext,
2529        cx_b: &mut TestAppContext,
2530    ) {
2531        cx_a.foreground().forbid_parking();
2532        let lang_registry = Arc::new(LanguageRegistry::test());
2533        let fs = FakeFs::new(cx_a.background());
2534
2535        // Connect to a server as 2 clients.
2536        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2537        let client_a = server.create_client(cx_a, "user_a").await;
2538        let mut client_b = server.create_client(cx_b, "user_b").await;
2539        server
2540            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2541            .await;
2542
2543        // Share a project as client A
2544        fs.insert_tree(
2545            "/dir",
2546            json!({
2547                "a.txt": "a-contents",
2548            }),
2549        )
2550        .await;
2551        let project_a = cx_a.update(|cx| {
2552            Project::local(
2553                client_a.clone(),
2554                client_a.user_store.clone(),
2555                lang_registry.clone(),
2556                fs.clone(),
2557                cx,
2558            )
2559        });
2560        let (worktree_a, _) = project_a
2561            .update(cx_a, |p, cx| {
2562                p.find_or_create_local_worktree("/dir", true, cx)
2563            })
2564            .await
2565            .unwrap();
2566        worktree_a
2567            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2568            .await;
2569        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2570
2571        // Join that project as client B
2572        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2573
2574        // Open a buffer as client A
2575        let buffer_a = project_a
2576            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2577            .await
2578            .unwrap();
2579
2580        // Start opening the same buffer as client B
2581        let buffer_b = cx_b
2582            .background()
2583            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2584
2585        // Edit the buffer as client A while client B is still opening it.
2586        cx_b.background().simulate_random_delay().await;
2587        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2588        cx_b.background().simulate_random_delay().await;
2589        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2590
2591        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2592        let buffer_b = buffer_b.await.unwrap();
2593        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2594    }
2595
2596    #[gpui::test(iterations = 10)]
2597    async fn test_leaving_worktree_while_opening_buffer(
2598        cx_a: &mut TestAppContext,
2599        cx_b: &mut TestAppContext,
2600    ) {
2601        cx_a.foreground().forbid_parking();
2602        let lang_registry = Arc::new(LanguageRegistry::test());
2603        let fs = FakeFs::new(cx_a.background());
2604
2605        // Connect to a server as 2 clients.
2606        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2607        let client_a = server.create_client(cx_a, "user_a").await;
2608        let mut client_b = server.create_client(cx_b, "user_b").await;
2609        server
2610            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2611            .await;
2612
2613        // Share a project as client A
2614        fs.insert_tree(
2615            "/dir",
2616            json!({
2617                "a.txt": "a-contents",
2618            }),
2619        )
2620        .await;
2621        let project_a = cx_a.update(|cx| {
2622            Project::local(
2623                client_a.clone(),
2624                client_a.user_store.clone(),
2625                lang_registry.clone(),
2626                fs.clone(),
2627                cx,
2628            )
2629        });
2630        let (worktree_a, _) = project_a
2631            .update(cx_a, |p, cx| {
2632                p.find_or_create_local_worktree("/dir", true, cx)
2633            })
2634            .await
2635            .unwrap();
2636        worktree_a
2637            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2638            .await;
2639        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2640
2641        // Join that project as client B
2642        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2643
2644        // See that a guest has joined as client A.
2645        project_a
2646            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2647            .await;
2648
2649        // Begin opening a buffer as client B, but leave the project before the open completes.
2650        let buffer_b = cx_b
2651            .background()
2652            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2653        cx_b.update(|_| {
2654            drop(client_b.project.take());
2655            drop(project_b);
2656        });
2657        drop(buffer_b);
2658
2659        // See that the guest has left.
2660        project_a
2661            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2662            .await;
2663    }
2664
2665    #[gpui::test(iterations = 10)]
2666    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2667        cx_a.foreground().forbid_parking();
2668        let lang_registry = Arc::new(LanguageRegistry::test());
2669        let fs = FakeFs::new(cx_a.background());
2670
2671        // Connect to a server as 2 clients.
2672        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2673        let client_a = server.create_client(cx_a, "user_a").await;
2674        let mut client_b = server.create_client(cx_b, "user_b").await;
2675        server
2676            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2677            .await;
2678
2679        // Share a project as client A
2680        fs.insert_tree(
2681            "/a",
2682            json!({
2683                "a.txt": "a-contents",
2684                "b.txt": "b-contents",
2685            }),
2686        )
2687        .await;
2688        let project_a = cx_a.update(|cx| {
2689            Project::local(
2690                client_a.clone(),
2691                client_a.user_store.clone(),
2692                lang_registry.clone(),
2693                fs.clone(),
2694                cx,
2695            )
2696        });
2697        let (worktree_a, _) = project_a
2698            .update(cx_a, |p, cx| {
2699                p.find_or_create_local_worktree("/a", true, cx)
2700            })
2701            .await
2702            .unwrap();
2703        worktree_a
2704            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2705            .await;
2706
2707        // Join that project as client B
2708        let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2709
2710        // Client A sees that a guest has joined.
2711        project_a
2712            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2713            .await;
2714
2715        // Drop client B's connection and ensure client A observes client B leaving the project.
2716        client_b.disconnect(&cx_b.to_async()).unwrap();
2717        project_a
2718            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2719            .await;
2720
2721        // Rejoin the project as client B
2722        let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2723
2724        // Client A sees that a guest has re-joined.
2725        project_a
2726            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2727            .await;
2728
2729        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2730        client_b.wait_for_current_user(cx_b).await;
2731        server.disconnect_client(client_b.current_user_id(cx_b));
2732        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2733        project_a
2734            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2735            .await;
2736    }
2737
2738    #[gpui::test(iterations = 10)]
2739    async fn test_collaborating_with_diagnostics(
2740        deterministic: Arc<Deterministic>,
2741        cx_a: &mut TestAppContext,
2742        cx_b: &mut TestAppContext,
2743        cx_c: &mut TestAppContext,
2744    ) {
2745        deterministic.forbid_parking();
2746        let lang_registry = Arc::new(LanguageRegistry::test());
2747        let fs = FakeFs::new(cx_a.background());
2748
2749        // Set up a fake language server.
2750        let mut language = Language::new(
2751            LanguageConfig {
2752                name: "Rust".into(),
2753                path_suffixes: vec!["rs".to_string()],
2754                ..Default::default()
2755            },
2756            Some(tree_sitter_rust::language()),
2757        );
2758        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2759        lang_registry.add(Arc::new(language));
2760
2761        // Connect to a server as 2 clients.
2762        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2763        let client_a = server.create_client(cx_a, "user_a").await;
2764        let mut client_b = server.create_client(cx_b, "user_b").await;
2765        let mut client_c = server.create_client(cx_c, "user_c").await;
2766        server
2767            .make_contacts(vec![
2768                (&client_a, cx_a),
2769                (&client_b, cx_b),
2770                (&client_c, cx_c),
2771            ])
2772            .await;
2773
2774        // Share a project as client A
2775        fs.insert_tree(
2776            "/a",
2777            json!({
2778                "a.rs": "let one = two",
2779                "other.rs": "",
2780            }),
2781        )
2782        .await;
2783        let project_a = cx_a.update(|cx| {
2784            Project::local(
2785                client_a.clone(),
2786                client_a.user_store.clone(),
2787                lang_registry.clone(),
2788                fs.clone(),
2789                cx,
2790            )
2791        });
2792        let (worktree_a, _) = project_a
2793            .update(cx_a, |p, cx| {
2794                p.find_or_create_local_worktree("/a", true, cx)
2795            })
2796            .await
2797            .unwrap();
2798        worktree_a
2799            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2800            .await;
2801        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2802        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2803
2804        // Cause the language server to start.
2805        let _buffer = cx_a
2806            .background()
2807            .spawn(project_a.update(cx_a, |project, cx| {
2808                project.open_buffer(
2809                    ProjectPath {
2810                        worktree_id,
2811                        path: Path::new("other.rs").into(),
2812                    },
2813                    cx,
2814                )
2815            }))
2816            .await
2817            .unwrap();
2818
2819        // Join the worktree as client B.
2820        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2821
2822        // Simulate a language server reporting errors for a file.
2823        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2824        fake_language_server
2825            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2826            .await;
2827        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2828            lsp::PublishDiagnosticsParams {
2829                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2830                version: None,
2831                diagnostics: vec![lsp::Diagnostic {
2832                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2833                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2834                    message: "message 1".to_string(),
2835                    ..Default::default()
2836                }],
2837            },
2838        );
2839
2840        // Wait for server to see the diagnostics update.
2841        deterministic.run_until_parked();
2842        {
2843            let store = server.store.read().await;
2844            let project = store.project(project_id).unwrap();
2845            let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap();
2846            assert!(!worktree.diagnostic_summaries.is_empty());
2847        }
2848
2849        // Ensure client B observes the new diagnostics.
2850        project_b.read_with(cx_b, |project, cx| {
2851            assert_eq!(
2852                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2853                &[(
2854                    ProjectPath {
2855                        worktree_id,
2856                        path: Arc::from(Path::new("a.rs")),
2857                    },
2858                    DiagnosticSummary {
2859                        error_count: 1,
2860                        warning_count: 0,
2861                        ..Default::default()
2862                    },
2863                )]
2864            )
2865        });
2866
2867        // Join project as client C and observe the diagnostics.
2868        let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
2869        project_c.read_with(cx_c, |project, cx| {
2870            assert_eq!(
2871                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2872                &[(
2873                    ProjectPath {
2874                        worktree_id,
2875                        path: Arc::from(Path::new("a.rs")),
2876                    },
2877                    DiagnosticSummary {
2878                        error_count: 1,
2879                        warning_count: 0,
2880                        ..Default::default()
2881                    },
2882                )]
2883            )
2884        });
2885
2886        // Simulate a language server reporting more errors for a file.
2887        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2888            lsp::PublishDiagnosticsParams {
2889                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2890                version: None,
2891                diagnostics: vec![
2892                    lsp::Diagnostic {
2893                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2894                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2895                        message: "message 1".to_string(),
2896                        ..Default::default()
2897                    },
2898                    lsp::Diagnostic {
2899                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2900                        range: lsp::Range::new(
2901                            lsp::Position::new(0, 10),
2902                            lsp::Position::new(0, 13),
2903                        ),
2904                        message: "message 2".to_string(),
2905                        ..Default::default()
2906                    },
2907                ],
2908            },
2909        );
2910
2911        // Clients B and C get the updated summaries
2912        deterministic.run_until_parked();
2913        project_b.read_with(cx_b, |project, cx| {
2914            assert_eq!(
2915                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2916                [(
2917                    ProjectPath {
2918                        worktree_id,
2919                        path: Arc::from(Path::new("a.rs")),
2920                    },
2921                    DiagnosticSummary {
2922                        error_count: 1,
2923                        warning_count: 1,
2924                        ..Default::default()
2925                    },
2926                )]
2927            );
2928        });
2929        project_c.read_with(cx_c, |project, cx| {
2930            assert_eq!(
2931                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2932                [(
2933                    ProjectPath {
2934                        worktree_id,
2935                        path: Arc::from(Path::new("a.rs")),
2936                    },
2937                    DiagnosticSummary {
2938                        error_count: 1,
2939                        warning_count: 1,
2940                        ..Default::default()
2941                    },
2942                )]
2943            );
2944        });
2945
2946        // Open the file with the errors on client B. They should be present.
2947        let buffer_b = cx_b
2948            .background()
2949            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2950            .await
2951            .unwrap();
2952
2953        buffer_b.read_with(cx_b, |buffer, _| {
2954            assert_eq!(
2955                buffer
2956                    .snapshot()
2957                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2958                    .map(|entry| entry)
2959                    .collect::<Vec<_>>(),
2960                &[
2961                    DiagnosticEntry {
2962                        range: Point::new(0, 4)..Point::new(0, 7),
2963                        diagnostic: Diagnostic {
2964                            group_id: 0,
2965                            message: "message 1".to_string(),
2966                            severity: lsp::DiagnosticSeverity::ERROR,
2967                            is_primary: true,
2968                            ..Default::default()
2969                        }
2970                    },
2971                    DiagnosticEntry {
2972                        range: Point::new(0, 10)..Point::new(0, 13),
2973                        diagnostic: Diagnostic {
2974                            group_id: 1,
2975                            severity: lsp::DiagnosticSeverity::WARNING,
2976                            message: "message 2".to_string(),
2977                            is_primary: true,
2978                            ..Default::default()
2979                        }
2980                    }
2981                ]
2982            );
2983        });
2984
2985        // Simulate a language server reporting no errors for a file.
2986        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2987            lsp::PublishDiagnosticsParams {
2988                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2989                version: None,
2990                diagnostics: vec![],
2991            },
2992        );
2993        deterministic.run_until_parked();
2994        project_a.read_with(cx_a, |project, cx| {
2995            assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
2996        });
2997        project_b.read_with(cx_b, |project, cx| {
2998            assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
2999        });
3000        project_c.read_with(cx_c, |project, cx| {
3001            assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3002        });
3003    }
3004
3005    #[gpui::test(iterations = 10)]
3006    async fn test_collaborating_with_completion(
3007        cx_a: &mut TestAppContext,
3008        cx_b: &mut TestAppContext,
3009    ) {
3010        cx_a.foreground().forbid_parking();
3011        let lang_registry = Arc::new(LanguageRegistry::test());
3012        let fs = FakeFs::new(cx_a.background());
3013
3014        // Set up a fake language server.
3015        let mut language = Language::new(
3016            LanguageConfig {
3017                name: "Rust".into(),
3018                path_suffixes: vec!["rs".to_string()],
3019                ..Default::default()
3020            },
3021            Some(tree_sitter_rust::language()),
3022        );
3023        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3024            capabilities: lsp::ServerCapabilities {
3025                completion_provider: Some(lsp::CompletionOptions {
3026                    trigger_characters: Some(vec![".".to_string()]),
3027                    ..Default::default()
3028                }),
3029                ..Default::default()
3030            },
3031            ..Default::default()
3032        });
3033        lang_registry.add(Arc::new(language));
3034
3035        // Connect to a server as 2 clients.
3036        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3037        let client_a = server.create_client(cx_a, "user_a").await;
3038        let mut client_b = server.create_client(cx_b, "user_b").await;
3039        server
3040            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3041            .await;
3042
3043        // Share a project as client A
3044        fs.insert_tree(
3045            "/a",
3046            json!({
3047                "main.rs": "fn main() { a }",
3048                "other.rs": "",
3049            }),
3050        )
3051        .await;
3052        let project_a = cx_a.update(|cx| {
3053            Project::local(
3054                client_a.clone(),
3055                client_a.user_store.clone(),
3056                lang_registry.clone(),
3057                fs.clone(),
3058                cx,
3059            )
3060        });
3061        let (worktree_a, _) = project_a
3062            .update(cx_a, |p, cx| {
3063                p.find_or_create_local_worktree("/a", true, cx)
3064            })
3065            .await
3066            .unwrap();
3067        worktree_a
3068            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3069            .await;
3070        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3071
3072        // Join the worktree as client B.
3073        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3074
3075        // Open a file in an editor as the guest.
3076        let buffer_b = project_b
3077            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3078            .await
3079            .unwrap();
3080        let (window_b, _) = cx_b.add_window(|_| EmptyView);
3081        let editor_b = cx_b.add_view(window_b, |cx| {
3082            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
3083        });
3084
3085        let fake_language_server = fake_language_servers.next().await.unwrap();
3086        buffer_b
3087            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3088            .await;
3089
3090        // Type a completion trigger character as the guest.
3091        editor_b.update(cx_b, |editor, cx| {
3092            editor.select_ranges([13..13], None, cx);
3093            editor.handle_input(&Input(".".into()), cx);
3094            cx.focus(&editor_b);
3095        });
3096
3097        // Receive a completion request as the host's language server.
3098        // Return some completions from the host's language server.
3099        cx_a.foreground().start_waiting();
3100        fake_language_server
3101            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3102                assert_eq!(
3103                    params.text_document_position.text_document.uri,
3104                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3105                );
3106                assert_eq!(
3107                    params.text_document_position.position,
3108                    lsp::Position::new(0, 14),
3109                );
3110
3111                Ok(Some(lsp::CompletionResponse::Array(vec![
3112                    lsp::CompletionItem {
3113                        label: "first_method(…)".into(),
3114                        detail: Some("fn(&mut self, B) -> C".into()),
3115                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3116                            new_text: "first_method($1)".to_string(),
3117                            range: lsp::Range::new(
3118                                lsp::Position::new(0, 14),
3119                                lsp::Position::new(0, 14),
3120                            ),
3121                        })),
3122                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3123                        ..Default::default()
3124                    },
3125                    lsp::CompletionItem {
3126                        label: "second_method(…)".into(),
3127                        detail: Some("fn(&mut self, C) -> D<E>".into()),
3128                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3129                            new_text: "second_method()".to_string(),
3130                            range: lsp::Range::new(
3131                                lsp::Position::new(0, 14),
3132                                lsp::Position::new(0, 14),
3133                            ),
3134                        })),
3135                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3136                        ..Default::default()
3137                    },
3138                ])))
3139            })
3140            .next()
3141            .await
3142            .unwrap();
3143        cx_a.foreground().finish_waiting();
3144
3145        // Open the buffer on the host.
3146        let buffer_a = project_a
3147            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3148            .await
3149            .unwrap();
3150        buffer_a
3151            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3152            .await;
3153
3154        // Confirm a completion on the guest.
3155        editor_b
3156            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3157            .await;
3158        editor_b.update(cx_b, |editor, cx| {
3159            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3160            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3161        });
3162
3163        // Return a resolved completion from the host's language server.
3164        // The resolved completion has an additional text edit.
3165        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3166            |params, _| async move {
3167                assert_eq!(params.label, "first_method(…)");
3168                Ok(lsp::CompletionItem {
3169                    label: "first_method(…)".into(),
3170                    detail: Some("fn(&mut self, B) -> C".into()),
3171                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3172                        new_text: "first_method($1)".to_string(),
3173                        range: lsp::Range::new(
3174                            lsp::Position::new(0, 14),
3175                            lsp::Position::new(0, 14),
3176                        ),
3177                    })),
3178                    additional_text_edits: Some(vec![lsp::TextEdit {
3179                        new_text: "use d::SomeTrait;\n".to_string(),
3180                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3181                    }]),
3182                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3183                    ..Default::default()
3184                })
3185            },
3186        );
3187
3188        // The additional edit is applied.
3189        buffer_a
3190            .condition(&cx_a, |buffer, _| {
3191                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3192            })
3193            .await;
3194        buffer_b
3195            .condition(&cx_b, |buffer, _| {
3196                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3197            })
3198            .await;
3199    }
3200
3201    #[gpui::test(iterations = 10)]
3202    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3203        cx_a.foreground().forbid_parking();
3204        let lang_registry = Arc::new(LanguageRegistry::test());
3205        let fs = FakeFs::new(cx_a.background());
3206
3207        // Connect to a server as 2 clients.
3208        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3209        let client_a = server.create_client(cx_a, "user_a").await;
3210        let mut client_b = server.create_client(cx_b, "user_b").await;
3211        server
3212            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3213            .await;
3214
3215        // Share a project as client A
3216        fs.insert_tree(
3217            "/a",
3218            json!({
3219                "a.rs": "let one = 1;",
3220            }),
3221        )
3222        .await;
3223        let project_a = cx_a.update(|cx| {
3224            Project::local(
3225                client_a.clone(),
3226                client_a.user_store.clone(),
3227                lang_registry.clone(),
3228                fs.clone(),
3229                cx,
3230            )
3231        });
3232        let (worktree_a, _) = project_a
3233            .update(cx_a, |p, cx| {
3234                p.find_or_create_local_worktree("/a", true, cx)
3235            })
3236            .await
3237            .unwrap();
3238        worktree_a
3239            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3240            .await;
3241        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3242        let buffer_a = project_a
3243            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3244            .await
3245            .unwrap();
3246
3247        // Join the worktree as client B.
3248        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3249
3250        let buffer_b = cx_b
3251            .background()
3252            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3253            .await
3254            .unwrap();
3255        buffer_b.update(cx_b, |buffer, cx| {
3256            buffer.edit([(4..7, "six")], cx);
3257            buffer.edit([(10..11, "6")], cx);
3258            assert_eq!(buffer.text(), "let six = 6;");
3259            assert!(buffer.is_dirty());
3260            assert!(!buffer.has_conflict());
3261        });
3262        buffer_a
3263            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3264            .await;
3265
3266        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3267            .await
3268            .unwrap();
3269        buffer_a
3270            .condition(cx_a, |buffer, _| buffer.has_conflict())
3271            .await;
3272        buffer_b
3273            .condition(cx_b, |buffer, _| buffer.has_conflict())
3274            .await;
3275
3276        project_b
3277            .update(cx_b, |project, cx| {
3278                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3279            })
3280            .await
3281            .unwrap();
3282        buffer_a.read_with(cx_a, |buffer, _| {
3283            assert_eq!(buffer.text(), "let seven = 7;");
3284            assert!(!buffer.is_dirty());
3285            assert!(!buffer.has_conflict());
3286        });
3287        buffer_b.read_with(cx_b, |buffer, _| {
3288            assert_eq!(buffer.text(), "let seven = 7;");
3289            assert!(!buffer.is_dirty());
3290            assert!(!buffer.has_conflict());
3291        });
3292
3293        buffer_a.update(cx_a, |buffer, cx| {
3294            // Undoing on the host is a no-op when the reload was initiated by the guest.
3295            buffer.undo(cx);
3296            assert_eq!(buffer.text(), "let seven = 7;");
3297            assert!(!buffer.is_dirty());
3298            assert!(!buffer.has_conflict());
3299        });
3300        buffer_b.update(cx_b, |buffer, cx| {
3301            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3302            buffer.undo(cx);
3303            assert_eq!(buffer.text(), "let six = 6;");
3304            assert!(buffer.is_dirty());
3305            assert!(!buffer.has_conflict());
3306        });
3307    }
3308
3309    #[gpui::test(iterations = 10)]
3310    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3311        cx_a.foreground().forbid_parking();
3312        let lang_registry = Arc::new(LanguageRegistry::test());
3313        let fs = FakeFs::new(cx_a.background());
3314
3315        // Set up a fake language server.
3316        let mut language = Language::new(
3317            LanguageConfig {
3318                name: "Rust".into(),
3319                path_suffixes: vec!["rs".to_string()],
3320                ..Default::default()
3321            },
3322            Some(tree_sitter_rust::language()),
3323        );
3324        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3325        lang_registry.add(Arc::new(language));
3326
3327        // Connect to a server as 2 clients.
3328        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3329        let client_a = server.create_client(cx_a, "user_a").await;
3330        let mut client_b = server.create_client(cx_b, "user_b").await;
3331        server
3332            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3333            .await;
3334
3335        // Share a project as client A
3336        fs.insert_tree(
3337            "/a",
3338            json!({
3339                "a.rs": "let one = two",
3340            }),
3341        )
3342        .await;
3343        let project_a = cx_a.update(|cx| {
3344            Project::local(
3345                client_a.clone(),
3346                client_a.user_store.clone(),
3347                lang_registry.clone(),
3348                fs.clone(),
3349                cx,
3350            )
3351        });
3352        let (worktree_a, _) = project_a
3353            .update(cx_a, |p, cx| {
3354                p.find_or_create_local_worktree("/a", true, cx)
3355            })
3356            .await
3357            .unwrap();
3358        worktree_a
3359            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3360            .await;
3361        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3362
3363        // Join the project as client B.
3364        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3365
3366        let buffer_b = cx_b
3367            .background()
3368            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3369            .await
3370            .unwrap();
3371
3372        let fake_language_server = fake_language_servers.next().await.unwrap();
3373        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3374            Ok(Some(vec![
3375                lsp::TextEdit {
3376                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3377                    new_text: "h".to_string(),
3378                },
3379                lsp::TextEdit {
3380                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3381                    new_text: "y".to_string(),
3382                },
3383            ]))
3384        });
3385
3386        project_b
3387            .update(cx_b, |project, cx| {
3388                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3389            })
3390            .await
3391            .unwrap();
3392        assert_eq!(
3393            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3394            "let honey = two"
3395        );
3396    }
3397
3398    #[gpui::test(iterations = 10)]
3399    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3400        cx_a.foreground().forbid_parking();
3401        let lang_registry = Arc::new(LanguageRegistry::test());
3402        let fs = FakeFs::new(cx_a.background());
3403        fs.insert_tree(
3404            "/root-1",
3405            json!({
3406                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3407            }),
3408        )
3409        .await;
3410        fs.insert_tree(
3411            "/root-2",
3412            json!({
3413                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3414            }),
3415        )
3416        .await;
3417
3418        // Set up a fake language server.
3419        let mut language = Language::new(
3420            LanguageConfig {
3421                name: "Rust".into(),
3422                path_suffixes: vec!["rs".to_string()],
3423                ..Default::default()
3424            },
3425            Some(tree_sitter_rust::language()),
3426        );
3427        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3428        lang_registry.add(Arc::new(language));
3429
3430        // Connect to a server as 2 clients.
3431        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3432        let client_a = server.create_client(cx_a, "user_a").await;
3433        let mut client_b = server.create_client(cx_b, "user_b").await;
3434        server
3435            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3436            .await;
3437
3438        // Share a project as client A
3439        let project_a = cx_a.update(|cx| {
3440            Project::local(
3441                client_a.clone(),
3442                client_a.user_store.clone(),
3443                lang_registry.clone(),
3444                fs.clone(),
3445                cx,
3446            )
3447        });
3448        let (worktree_a, _) = project_a
3449            .update(cx_a, |p, cx| {
3450                p.find_or_create_local_worktree("/root-1", true, cx)
3451            })
3452            .await
3453            .unwrap();
3454        worktree_a
3455            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3456            .await;
3457        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3458
3459        // Join the project as client B.
3460        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3461
3462        // Open the file on client B.
3463        let buffer_b = cx_b
3464            .background()
3465            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3466            .await
3467            .unwrap();
3468
3469        // Request the definition of a symbol as the guest.
3470        let fake_language_server = fake_language_servers.next().await.unwrap();
3471        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3472            |_, _| async move {
3473                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3474                    lsp::Location::new(
3475                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3476                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3477                    ),
3478                )))
3479            },
3480        );
3481
3482        let definitions_1 = project_b
3483            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3484            .await
3485            .unwrap();
3486        cx_b.read(|cx| {
3487            assert_eq!(definitions_1.len(), 1);
3488            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3489            let target_buffer = definitions_1[0].buffer.read(cx);
3490            assert_eq!(
3491                target_buffer.text(),
3492                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3493            );
3494            assert_eq!(
3495                definitions_1[0].range.to_point(target_buffer),
3496                Point::new(0, 6)..Point::new(0, 9)
3497            );
3498        });
3499
3500        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3501        // the previous call to `definition`.
3502        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3503            |_, _| async move {
3504                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3505                    lsp::Location::new(
3506                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3507                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3508                    ),
3509                )))
3510            },
3511        );
3512
3513        let definitions_2 = project_b
3514            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3515            .await
3516            .unwrap();
3517        cx_b.read(|cx| {
3518            assert_eq!(definitions_2.len(), 1);
3519            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3520            let target_buffer = definitions_2[0].buffer.read(cx);
3521            assert_eq!(
3522                target_buffer.text(),
3523                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3524            );
3525            assert_eq!(
3526                definitions_2[0].range.to_point(target_buffer),
3527                Point::new(1, 6)..Point::new(1, 11)
3528            );
3529        });
3530        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3531    }
3532
3533    #[gpui::test(iterations = 10)]
3534    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3535        cx_a.foreground().forbid_parking();
3536        let lang_registry = Arc::new(LanguageRegistry::test());
3537        let fs = FakeFs::new(cx_a.background());
3538        fs.insert_tree(
3539            "/root-1",
3540            json!({
3541                "one.rs": "const ONE: usize = 1;",
3542                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3543            }),
3544        )
3545        .await;
3546        fs.insert_tree(
3547            "/root-2",
3548            json!({
3549                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3550            }),
3551        )
3552        .await;
3553
3554        // Set up a fake language server.
3555        let mut language = Language::new(
3556            LanguageConfig {
3557                name: "Rust".into(),
3558                path_suffixes: vec!["rs".to_string()],
3559                ..Default::default()
3560            },
3561            Some(tree_sitter_rust::language()),
3562        );
3563        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3564        lang_registry.add(Arc::new(language));
3565
3566        // Connect to a server as 2 clients.
3567        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3568        let client_a = server.create_client(cx_a, "user_a").await;
3569        let mut client_b = server.create_client(cx_b, "user_b").await;
3570        server
3571            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3572            .await;
3573
3574        // Share a project as client A
3575        let project_a = cx_a.update(|cx| {
3576            Project::local(
3577                client_a.clone(),
3578                client_a.user_store.clone(),
3579                lang_registry.clone(),
3580                fs.clone(),
3581                cx,
3582            )
3583        });
3584        let (worktree_a, _) = project_a
3585            .update(cx_a, |p, cx| {
3586                p.find_or_create_local_worktree("/root-1", true, cx)
3587            })
3588            .await
3589            .unwrap();
3590        worktree_a
3591            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3592            .await;
3593        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3594
3595        // Join the worktree as client B.
3596        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3597
3598        // Open the file on client B.
3599        let buffer_b = cx_b
3600            .background()
3601            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3602            .await
3603            .unwrap();
3604
3605        // Request references to a symbol as the guest.
3606        let fake_language_server = fake_language_servers.next().await.unwrap();
3607        fake_language_server.handle_request::<lsp::request::References, _, _>(
3608            |params, _| async move {
3609                assert_eq!(
3610                    params.text_document_position.text_document.uri.as_str(),
3611                    "file:///root-1/one.rs"
3612                );
3613                Ok(Some(vec![
3614                    lsp::Location {
3615                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3616                        range: lsp::Range::new(
3617                            lsp::Position::new(0, 24),
3618                            lsp::Position::new(0, 27),
3619                        ),
3620                    },
3621                    lsp::Location {
3622                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3623                        range: lsp::Range::new(
3624                            lsp::Position::new(0, 35),
3625                            lsp::Position::new(0, 38),
3626                        ),
3627                    },
3628                    lsp::Location {
3629                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3630                        range: lsp::Range::new(
3631                            lsp::Position::new(0, 37),
3632                            lsp::Position::new(0, 40),
3633                        ),
3634                    },
3635                ]))
3636            },
3637        );
3638
3639        let references = project_b
3640            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3641            .await
3642            .unwrap();
3643        cx_b.read(|cx| {
3644            assert_eq!(references.len(), 3);
3645            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3646
3647            let two_buffer = references[0].buffer.read(cx);
3648            let three_buffer = references[2].buffer.read(cx);
3649            assert_eq!(
3650                two_buffer.file().unwrap().path().as_ref(),
3651                Path::new("two.rs")
3652            );
3653            assert_eq!(references[1].buffer, references[0].buffer);
3654            assert_eq!(
3655                three_buffer.file().unwrap().full_path(cx),
3656                Path::new("three.rs")
3657            );
3658
3659            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3660            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3661            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3662        });
3663    }
3664
3665    #[gpui::test(iterations = 10)]
3666    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3667        cx_a.foreground().forbid_parking();
3668        let lang_registry = Arc::new(LanguageRegistry::test());
3669        let fs = FakeFs::new(cx_a.background());
3670        fs.insert_tree(
3671            "/root-1",
3672            json!({
3673                "a": "hello world",
3674                "b": "goodnight moon",
3675                "c": "a world of goo",
3676                "d": "world champion of clown world",
3677            }),
3678        )
3679        .await;
3680        fs.insert_tree(
3681            "/root-2",
3682            json!({
3683                "e": "disney world is fun",
3684            }),
3685        )
3686        .await;
3687
3688        // Connect to a server as 2 clients.
3689        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3690        let client_a = server.create_client(cx_a, "user_a").await;
3691        let mut client_b = server.create_client(cx_b, "user_b").await;
3692        server
3693            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3694            .await;
3695
3696        // Share a project as client A
3697        let project_a = cx_a.update(|cx| {
3698            Project::local(
3699                client_a.clone(),
3700                client_a.user_store.clone(),
3701                lang_registry.clone(),
3702                fs.clone(),
3703                cx,
3704            )
3705        });
3706
3707        let (worktree_1, _) = project_a
3708            .update(cx_a, |p, cx| {
3709                p.find_or_create_local_worktree("/root-1", true, cx)
3710            })
3711            .await
3712            .unwrap();
3713        worktree_1
3714            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3715            .await;
3716        let (worktree_2, _) = project_a
3717            .update(cx_a, |p, cx| {
3718                p.find_or_create_local_worktree("/root-2", true, cx)
3719            })
3720            .await
3721            .unwrap();
3722        worktree_2
3723            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3724            .await;
3725
3726        // Join the worktree as client B.
3727        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3728        let results = project_b
3729            .update(cx_b, |project, cx| {
3730                project.search(SearchQuery::text("world", false, false), cx)
3731            })
3732            .await
3733            .unwrap();
3734
3735        let mut ranges_by_path = results
3736            .into_iter()
3737            .map(|(buffer, ranges)| {
3738                buffer.read_with(cx_b, |buffer, cx| {
3739                    let path = buffer.file().unwrap().full_path(cx);
3740                    let offset_ranges = ranges
3741                        .into_iter()
3742                        .map(|range| range.to_offset(buffer))
3743                        .collect::<Vec<_>>();
3744                    (path, offset_ranges)
3745                })
3746            })
3747            .collect::<Vec<_>>();
3748        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3749
3750        assert_eq!(
3751            ranges_by_path,
3752            &[
3753                (PathBuf::from("root-1/a"), vec![6..11]),
3754                (PathBuf::from("root-1/c"), vec![2..7]),
3755                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3756                (PathBuf::from("root-2/e"), vec![7..12]),
3757            ]
3758        );
3759    }
3760
3761    #[gpui::test(iterations = 10)]
3762    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3763        cx_a.foreground().forbid_parking();
3764        let lang_registry = Arc::new(LanguageRegistry::test());
3765        let fs = FakeFs::new(cx_a.background());
3766        fs.insert_tree(
3767            "/root-1",
3768            json!({
3769                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3770            }),
3771        )
3772        .await;
3773
3774        // Set up a fake language server.
3775        let mut language = Language::new(
3776            LanguageConfig {
3777                name: "Rust".into(),
3778                path_suffixes: vec!["rs".to_string()],
3779                ..Default::default()
3780            },
3781            Some(tree_sitter_rust::language()),
3782        );
3783        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3784        lang_registry.add(Arc::new(language));
3785
3786        // Connect to a server as 2 clients.
3787        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3788        let client_a = server.create_client(cx_a, "user_a").await;
3789        let mut client_b = server.create_client(cx_b, "user_b").await;
3790        server
3791            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3792            .await;
3793
3794        // Share a project as client A
3795        let project_a = cx_a.update(|cx| {
3796            Project::local(
3797                client_a.clone(),
3798                client_a.user_store.clone(),
3799                lang_registry.clone(),
3800                fs.clone(),
3801                cx,
3802            )
3803        });
3804        let (worktree_a, _) = project_a
3805            .update(cx_a, |p, cx| {
3806                p.find_or_create_local_worktree("/root-1", true, cx)
3807            })
3808            .await
3809            .unwrap();
3810        worktree_a
3811            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3812            .await;
3813        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3814
3815        // Join the worktree as client B.
3816        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3817
3818        // Open the file on client B.
3819        let buffer_b = cx_b
3820            .background()
3821            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3822            .await
3823            .unwrap();
3824
3825        // Request document highlights as the guest.
3826        let fake_language_server = fake_language_servers.next().await.unwrap();
3827        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3828            |params, _| async move {
3829                assert_eq!(
3830                    params
3831                        .text_document_position_params
3832                        .text_document
3833                        .uri
3834                        .as_str(),
3835                    "file:///root-1/main.rs"
3836                );
3837                assert_eq!(
3838                    params.text_document_position_params.position,
3839                    lsp::Position::new(0, 34)
3840                );
3841                Ok(Some(vec![
3842                    lsp::DocumentHighlight {
3843                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3844                        range: lsp::Range::new(
3845                            lsp::Position::new(0, 10),
3846                            lsp::Position::new(0, 16),
3847                        ),
3848                    },
3849                    lsp::DocumentHighlight {
3850                        kind: Some(lsp::DocumentHighlightKind::READ),
3851                        range: lsp::Range::new(
3852                            lsp::Position::new(0, 32),
3853                            lsp::Position::new(0, 38),
3854                        ),
3855                    },
3856                    lsp::DocumentHighlight {
3857                        kind: Some(lsp::DocumentHighlightKind::READ),
3858                        range: lsp::Range::new(
3859                            lsp::Position::new(0, 41),
3860                            lsp::Position::new(0, 47),
3861                        ),
3862                    },
3863                ]))
3864            },
3865        );
3866
3867        let highlights = project_b
3868            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3869            .await
3870            .unwrap();
3871        buffer_b.read_with(cx_b, |buffer, _| {
3872            let snapshot = buffer.snapshot();
3873
3874            let highlights = highlights
3875                .into_iter()
3876                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3877                .collect::<Vec<_>>();
3878            assert_eq!(
3879                highlights,
3880                &[
3881                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3882                    (lsp::DocumentHighlightKind::READ, 32..38),
3883                    (lsp::DocumentHighlightKind::READ, 41..47)
3884                ]
3885            )
3886        });
3887    }
3888
3889    #[gpui::test(iterations = 10)]
3890    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3891        cx_a.foreground().forbid_parking();
3892        let lang_registry = Arc::new(LanguageRegistry::test());
3893        let fs = FakeFs::new(cx_a.background());
3894        fs.insert_tree(
3895            "/code",
3896            json!({
3897                "crate-1": {
3898                    "one.rs": "const ONE: usize = 1;",
3899                },
3900                "crate-2": {
3901                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3902                },
3903                "private": {
3904                    "passwords.txt": "the-password",
3905                }
3906            }),
3907        )
3908        .await;
3909
3910        // Set up a fake language server.
3911        let mut language = Language::new(
3912            LanguageConfig {
3913                name: "Rust".into(),
3914                path_suffixes: vec!["rs".to_string()],
3915                ..Default::default()
3916            },
3917            Some(tree_sitter_rust::language()),
3918        );
3919        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3920        lang_registry.add(Arc::new(language));
3921
3922        // Connect to a server as 2 clients.
3923        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3924        let client_a = server.create_client(cx_a, "user_a").await;
3925        let mut client_b = server.create_client(cx_b, "user_b").await;
3926        server
3927            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3928            .await;
3929
3930        // Share a project as client A
3931        let project_a = cx_a.update(|cx| {
3932            Project::local(
3933                client_a.clone(),
3934                client_a.user_store.clone(),
3935                lang_registry.clone(),
3936                fs.clone(),
3937                cx,
3938            )
3939        });
3940        let (worktree_a, _) = project_a
3941            .update(cx_a, |p, cx| {
3942                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3943            })
3944            .await
3945            .unwrap();
3946        worktree_a
3947            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3948            .await;
3949        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3950
3951        // Join the worktree as client B.
3952        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3953
3954        // Cause the language server to start.
3955        let _buffer = cx_b
3956            .background()
3957            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3958            .await
3959            .unwrap();
3960
3961        let fake_language_server = fake_language_servers.next().await.unwrap();
3962        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3963            |_, _| async move {
3964                #[allow(deprecated)]
3965                Ok(Some(vec![lsp::SymbolInformation {
3966                    name: "TWO".into(),
3967                    location: lsp::Location {
3968                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3969                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3970                    },
3971                    kind: lsp::SymbolKind::CONSTANT,
3972                    tags: None,
3973                    container_name: None,
3974                    deprecated: None,
3975                }]))
3976            },
3977        );
3978
3979        // Request the definition of a symbol as the guest.
3980        let symbols = project_b
3981            .update(cx_b, |p, cx| p.symbols("two", cx))
3982            .await
3983            .unwrap();
3984        assert_eq!(symbols.len(), 1);
3985        assert_eq!(symbols[0].name, "TWO");
3986
3987        // Open one of the returned symbols.
3988        let buffer_b_2 = project_b
3989            .update(cx_b, |project, cx| {
3990                project.open_buffer_for_symbol(&symbols[0], cx)
3991            })
3992            .await
3993            .unwrap();
3994        buffer_b_2.read_with(cx_b, |buffer, _| {
3995            assert_eq!(
3996                buffer.file().unwrap().path().as_ref(),
3997                Path::new("../crate-2/two.rs")
3998            );
3999        });
4000
4001        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
4002        let mut fake_symbol = symbols[0].clone();
4003        fake_symbol.path = Path::new("/code/secrets").into();
4004        let error = project_b
4005            .update(cx_b, |project, cx| {
4006                project.open_buffer_for_symbol(&fake_symbol, cx)
4007            })
4008            .await
4009            .unwrap_err();
4010        assert!(error.to_string().contains("invalid symbol signature"));
4011    }
4012
4013    #[gpui::test(iterations = 10)]
4014    async fn test_open_buffer_while_getting_definition_pointing_to_it(
4015        cx_a: &mut TestAppContext,
4016        cx_b: &mut TestAppContext,
4017        mut rng: StdRng,
4018    ) {
4019        cx_a.foreground().forbid_parking();
4020        let lang_registry = Arc::new(LanguageRegistry::test());
4021        let fs = FakeFs::new(cx_a.background());
4022        fs.insert_tree(
4023            "/root",
4024            json!({
4025                "a.rs": "const ONE: usize = b::TWO;",
4026                "b.rs": "const TWO: usize = 2",
4027            }),
4028        )
4029        .await;
4030
4031        // Set up a fake language server.
4032        let mut language = Language::new(
4033            LanguageConfig {
4034                name: "Rust".into(),
4035                path_suffixes: vec!["rs".to_string()],
4036                ..Default::default()
4037            },
4038            Some(tree_sitter_rust::language()),
4039        );
4040        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4041        lang_registry.add(Arc::new(language));
4042
4043        // Connect to a server as 2 clients.
4044        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4045        let client_a = server.create_client(cx_a, "user_a").await;
4046        let mut client_b = server.create_client(cx_b, "user_b").await;
4047        server
4048            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4049            .await;
4050
4051        // Share a project as client A
4052        let project_a = cx_a.update(|cx| {
4053            Project::local(
4054                client_a.clone(),
4055                client_a.user_store.clone(),
4056                lang_registry.clone(),
4057                fs.clone(),
4058                cx,
4059            )
4060        });
4061
4062        let (worktree_a, _) = project_a
4063            .update(cx_a, |p, cx| {
4064                p.find_or_create_local_worktree("/root", true, cx)
4065            })
4066            .await
4067            .unwrap();
4068        worktree_a
4069            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4070            .await;
4071        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4072
4073        // Join the project as client B.
4074        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4075
4076        let buffer_b1 = cx_b
4077            .background()
4078            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4079            .await
4080            .unwrap();
4081
4082        let fake_language_server = fake_language_servers.next().await.unwrap();
4083        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4084            |_, _| async move {
4085                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4086                    lsp::Location::new(
4087                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
4088                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4089                    ),
4090                )))
4091            },
4092        );
4093
4094        let definitions;
4095        let buffer_b2;
4096        if rng.gen() {
4097            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4098            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4099        } else {
4100            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4101            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4102        }
4103
4104        let buffer_b2 = buffer_b2.await.unwrap();
4105        let definitions = definitions.await.unwrap();
4106        assert_eq!(definitions.len(), 1);
4107        assert_eq!(definitions[0].buffer, buffer_b2);
4108    }
4109
4110    #[gpui::test(iterations = 10)]
4111    async fn test_collaborating_with_code_actions(
4112        cx_a: &mut TestAppContext,
4113        cx_b: &mut TestAppContext,
4114    ) {
4115        cx_a.foreground().forbid_parking();
4116        let lang_registry = Arc::new(LanguageRegistry::test());
4117        let fs = FakeFs::new(cx_a.background());
4118        cx_b.update(|cx| editor::init(cx));
4119
4120        // Set up a fake language server.
4121        let mut language = Language::new(
4122            LanguageConfig {
4123                name: "Rust".into(),
4124                path_suffixes: vec!["rs".to_string()],
4125                ..Default::default()
4126            },
4127            Some(tree_sitter_rust::language()),
4128        );
4129        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4130        lang_registry.add(Arc::new(language));
4131
4132        // Connect to a server as 2 clients.
4133        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4134        let client_a = server.create_client(cx_a, "user_a").await;
4135        let mut client_b = server.create_client(cx_b, "user_b").await;
4136        server
4137            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4138            .await;
4139
4140        // Share a project as client A
4141        fs.insert_tree(
4142            "/a",
4143            json!({
4144                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4145                "other.rs": "pub fn foo() -> usize { 4 }",
4146            }),
4147        )
4148        .await;
4149        let project_a = cx_a.update(|cx| {
4150            Project::local(
4151                client_a.clone(),
4152                client_a.user_store.clone(),
4153                lang_registry.clone(),
4154                fs.clone(),
4155                cx,
4156            )
4157        });
4158        let (worktree_a, _) = project_a
4159            .update(cx_a, |p, cx| {
4160                p.find_or_create_local_worktree("/a", true, cx)
4161            })
4162            .await
4163            .unwrap();
4164        worktree_a
4165            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4166            .await;
4167        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4168
4169        // Join the project as client B.
4170        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4171        let mut params = cx_b.update(WorkspaceParams::test);
4172        params.languages = lang_registry.clone();
4173        params.project = project_b.clone();
4174        params.client = client_b.client.clone();
4175        params.user_store = client_b.user_store.clone();
4176
4177        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4178        let editor_b = workspace_b
4179            .update(cx_b, |workspace, cx| {
4180                workspace.open_path((worktree_id, "main.rs"), true, cx)
4181            })
4182            .await
4183            .unwrap()
4184            .downcast::<Editor>()
4185            .unwrap();
4186
4187        let mut fake_language_server = fake_language_servers.next().await.unwrap();
4188        fake_language_server
4189            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4190                assert_eq!(
4191                    params.text_document.uri,
4192                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4193                );
4194                assert_eq!(params.range.start, lsp::Position::new(0, 0));
4195                assert_eq!(params.range.end, lsp::Position::new(0, 0));
4196                Ok(None)
4197            })
4198            .next()
4199            .await;
4200
4201        // Move cursor to a location that contains code actions.
4202        editor_b.update(cx_b, |editor, cx| {
4203            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4204            cx.focus(&editor_b);
4205        });
4206
4207        fake_language_server
4208            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4209                assert_eq!(
4210                    params.text_document.uri,
4211                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4212                );
4213                assert_eq!(params.range.start, lsp::Position::new(1, 31));
4214                assert_eq!(params.range.end, lsp::Position::new(1, 31));
4215
4216                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4217                    lsp::CodeAction {
4218                        title: "Inline into all callers".to_string(),
4219                        edit: Some(lsp::WorkspaceEdit {
4220                            changes: Some(
4221                                [
4222                                    (
4223                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
4224                                        vec![lsp::TextEdit::new(
4225                                            lsp::Range::new(
4226                                                lsp::Position::new(1, 22),
4227                                                lsp::Position::new(1, 34),
4228                                            ),
4229                                            "4".to_string(),
4230                                        )],
4231                                    ),
4232                                    (
4233                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
4234                                        vec![lsp::TextEdit::new(
4235                                            lsp::Range::new(
4236                                                lsp::Position::new(0, 0),
4237                                                lsp::Position::new(0, 27),
4238                                            ),
4239                                            "".to_string(),
4240                                        )],
4241                                    ),
4242                                ]
4243                                .into_iter()
4244                                .collect(),
4245                            ),
4246                            ..Default::default()
4247                        }),
4248                        data: Some(json!({
4249                            "codeActionParams": {
4250                                "range": {
4251                                    "start": {"line": 1, "column": 31},
4252                                    "end": {"line": 1, "column": 31},
4253                                }
4254                            }
4255                        })),
4256                        ..Default::default()
4257                    },
4258                )]))
4259            })
4260            .next()
4261            .await;
4262
4263        // Toggle code actions and wait for them to display.
4264        editor_b.update(cx_b, |editor, cx| {
4265            editor.toggle_code_actions(
4266                &ToggleCodeActions {
4267                    deployed_from_indicator: false,
4268                },
4269                cx,
4270            );
4271        });
4272        editor_b
4273            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4274            .await;
4275
4276        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4277
4278        // Confirming the code action will trigger a resolve request.
4279        let confirm_action = workspace_b
4280            .update(cx_b, |workspace, cx| {
4281                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4282            })
4283            .unwrap();
4284        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4285            |_, _| async move {
4286                Ok(lsp::CodeAction {
4287                    title: "Inline into all callers".to_string(),
4288                    edit: Some(lsp::WorkspaceEdit {
4289                        changes: Some(
4290                            [
4291                                (
4292                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4293                                    vec![lsp::TextEdit::new(
4294                                        lsp::Range::new(
4295                                            lsp::Position::new(1, 22),
4296                                            lsp::Position::new(1, 34),
4297                                        ),
4298                                        "4".to_string(),
4299                                    )],
4300                                ),
4301                                (
4302                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4303                                    vec![lsp::TextEdit::new(
4304                                        lsp::Range::new(
4305                                            lsp::Position::new(0, 0),
4306                                            lsp::Position::new(0, 27),
4307                                        ),
4308                                        "".to_string(),
4309                                    )],
4310                                ),
4311                            ]
4312                            .into_iter()
4313                            .collect(),
4314                        ),
4315                        ..Default::default()
4316                    }),
4317                    ..Default::default()
4318                })
4319            },
4320        );
4321
4322        // After the action is confirmed, an editor containing both modified files is opened.
4323        confirm_action.await.unwrap();
4324        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4325            workspace
4326                .active_item(cx)
4327                .unwrap()
4328                .downcast::<Editor>()
4329                .unwrap()
4330        });
4331        code_action_editor.update(cx_b, |editor, cx| {
4332            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4333            editor.undo(&Undo, cx);
4334            assert_eq!(
4335                editor.text(cx),
4336                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4337            );
4338            editor.redo(&Redo, cx);
4339            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4340        });
4341    }
4342
4343    #[gpui::test(iterations = 10)]
4344    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4345        cx_a.foreground().forbid_parking();
4346        let lang_registry = Arc::new(LanguageRegistry::test());
4347        let fs = FakeFs::new(cx_a.background());
4348        cx_b.update(|cx| editor::init(cx));
4349
4350        // Set up a fake language server.
4351        let mut language = Language::new(
4352            LanguageConfig {
4353                name: "Rust".into(),
4354                path_suffixes: vec!["rs".to_string()],
4355                ..Default::default()
4356            },
4357            Some(tree_sitter_rust::language()),
4358        );
4359        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4360            capabilities: lsp::ServerCapabilities {
4361                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4362                    prepare_provider: Some(true),
4363                    work_done_progress_options: Default::default(),
4364                })),
4365                ..Default::default()
4366            },
4367            ..Default::default()
4368        });
4369        lang_registry.add(Arc::new(language));
4370
4371        // Connect to a server as 2 clients.
4372        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4373        let client_a = server.create_client(cx_a, "user_a").await;
4374        let mut client_b = server.create_client(cx_b, "user_b").await;
4375        server
4376            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4377            .await;
4378
4379        // Share a project as client A
4380        fs.insert_tree(
4381            "/dir",
4382            json!({
4383                "one.rs": "const ONE: usize = 1;",
4384                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4385            }),
4386        )
4387        .await;
4388        let project_a = cx_a.update(|cx| {
4389            Project::local(
4390                client_a.clone(),
4391                client_a.user_store.clone(),
4392                lang_registry.clone(),
4393                fs.clone(),
4394                cx,
4395            )
4396        });
4397        let (worktree_a, _) = project_a
4398            .update(cx_a, |p, cx| {
4399                p.find_or_create_local_worktree("/dir", true, cx)
4400            })
4401            .await
4402            .unwrap();
4403        worktree_a
4404            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4405            .await;
4406        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4407
4408        // Join the worktree as client B.
4409        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4410        let mut params = cx_b.update(WorkspaceParams::test);
4411        params.languages = lang_registry.clone();
4412        params.project = project_b.clone();
4413        params.client = client_b.client.clone();
4414        params.user_store = client_b.user_store.clone();
4415
4416        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4417        let editor_b = workspace_b
4418            .update(cx_b, |workspace, cx| {
4419                workspace.open_path((worktree_id, "one.rs"), true, cx)
4420            })
4421            .await
4422            .unwrap()
4423            .downcast::<Editor>()
4424            .unwrap();
4425        let fake_language_server = fake_language_servers.next().await.unwrap();
4426
4427        // Move cursor to a location that can be renamed.
4428        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4429            editor.select_ranges([7..7], None, cx);
4430            editor.rename(&Rename, cx).unwrap()
4431        });
4432
4433        fake_language_server
4434            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4435                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4436                assert_eq!(params.position, lsp::Position::new(0, 7));
4437                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4438                    lsp::Position::new(0, 6),
4439                    lsp::Position::new(0, 9),
4440                ))))
4441            })
4442            .next()
4443            .await
4444            .unwrap();
4445        prepare_rename.await.unwrap();
4446        editor_b.update(cx_b, |editor, cx| {
4447            let rename = editor.pending_rename().unwrap();
4448            let buffer = editor.buffer().read(cx).snapshot(cx);
4449            assert_eq!(
4450                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4451                6..9
4452            );
4453            rename.editor.update(cx, |rename_editor, cx| {
4454                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4455                    rename_buffer.edit([(0..3, "THREE")], cx);
4456                });
4457            });
4458        });
4459
4460        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4461            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4462        });
4463        fake_language_server
4464            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4465                assert_eq!(
4466                    params.text_document_position.text_document.uri.as_str(),
4467                    "file:///dir/one.rs"
4468                );
4469                assert_eq!(
4470                    params.text_document_position.position,
4471                    lsp::Position::new(0, 6)
4472                );
4473                assert_eq!(params.new_name, "THREE");
4474                Ok(Some(lsp::WorkspaceEdit {
4475                    changes: Some(
4476                        [
4477                            (
4478                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4479                                vec![lsp::TextEdit::new(
4480                                    lsp::Range::new(
4481                                        lsp::Position::new(0, 6),
4482                                        lsp::Position::new(0, 9),
4483                                    ),
4484                                    "THREE".to_string(),
4485                                )],
4486                            ),
4487                            (
4488                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4489                                vec![
4490                                    lsp::TextEdit::new(
4491                                        lsp::Range::new(
4492                                            lsp::Position::new(0, 24),
4493                                            lsp::Position::new(0, 27),
4494                                        ),
4495                                        "THREE".to_string(),
4496                                    ),
4497                                    lsp::TextEdit::new(
4498                                        lsp::Range::new(
4499                                            lsp::Position::new(0, 35),
4500                                            lsp::Position::new(0, 38),
4501                                        ),
4502                                        "THREE".to_string(),
4503                                    ),
4504                                ],
4505                            ),
4506                        ]
4507                        .into_iter()
4508                        .collect(),
4509                    ),
4510                    ..Default::default()
4511                }))
4512            })
4513            .next()
4514            .await
4515            .unwrap();
4516        confirm_rename.await.unwrap();
4517
4518        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4519            workspace
4520                .active_item(cx)
4521                .unwrap()
4522                .downcast::<Editor>()
4523                .unwrap()
4524        });
4525        rename_editor.update(cx_b, |editor, cx| {
4526            assert_eq!(
4527                editor.text(cx),
4528                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4529            );
4530            editor.undo(&Undo, cx);
4531            assert_eq!(
4532                editor.text(cx),
4533                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4534            );
4535            editor.redo(&Redo, cx);
4536            assert_eq!(
4537                editor.text(cx),
4538                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4539            );
4540        });
4541
4542        // Ensure temporary rename edits cannot be undone/redone.
4543        editor_b.update(cx_b, |editor, cx| {
4544            editor.undo(&Undo, cx);
4545            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4546            editor.undo(&Undo, cx);
4547            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4548            editor.redo(&Redo, cx);
4549            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4550        })
4551    }
4552
4553    #[gpui::test(iterations = 10)]
4554    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4555        cx_a.foreground().forbid_parking();
4556
4557        // Connect to a server as 2 clients.
4558        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4559        let client_a = server.create_client(cx_a, "user_a").await;
4560        let client_b = server.create_client(cx_b, "user_b").await;
4561
4562        // Create an org that includes these 2 users.
4563        let db = &server.app_state.db;
4564        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4565        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4566            .await
4567            .unwrap();
4568        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4569            .await
4570            .unwrap();
4571
4572        // Create a channel that includes all the users.
4573        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4574        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4575            .await
4576            .unwrap();
4577        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4578            .await
4579            .unwrap();
4580        db.create_channel_message(
4581            channel_id,
4582            client_b.current_user_id(&cx_b),
4583            "hello A, it's B.",
4584            OffsetDateTime::now_utc(),
4585            1,
4586        )
4587        .await
4588        .unwrap();
4589
4590        let channels_a = cx_a
4591            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4592        channels_a
4593            .condition(cx_a, |list, _| list.available_channels().is_some())
4594            .await;
4595        channels_a.read_with(cx_a, |list, _| {
4596            assert_eq!(
4597                list.available_channels().unwrap(),
4598                &[ChannelDetails {
4599                    id: channel_id.to_proto(),
4600                    name: "test-channel".to_string()
4601                }]
4602            )
4603        });
4604        let channel_a = channels_a.update(cx_a, |this, cx| {
4605            this.get_channel(channel_id.to_proto(), cx).unwrap()
4606        });
4607        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4608        channel_a
4609            .condition(&cx_a, |channel, _| {
4610                channel_messages(channel)
4611                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4612            })
4613            .await;
4614
4615        let channels_b = cx_b
4616            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4617        channels_b
4618            .condition(cx_b, |list, _| list.available_channels().is_some())
4619            .await;
4620        channels_b.read_with(cx_b, |list, _| {
4621            assert_eq!(
4622                list.available_channels().unwrap(),
4623                &[ChannelDetails {
4624                    id: channel_id.to_proto(),
4625                    name: "test-channel".to_string()
4626                }]
4627            )
4628        });
4629
4630        let channel_b = channels_b.update(cx_b, |this, cx| {
4631            this.get_channel(channel_id.to_proto(), cx).unwrap()
4632        });
4633        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4634        channel_b
4635            .condition(&cx_b, |channel, _| {
4636                channel_messages(channel)
4637                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4638            })
4639            .await;
4640
4641        channel_a
4642            .update(cx_a, |channel, cx| {
4643                channel
4644                    .send_message("oh, hi B.".to_string(), cx)
4645                    .unwrap()
4646                    .detach();
4647                let task = channel.send_message("sup".to_string(), cx).unwrap();
4648                assert_eq!(
4649                    channel_messages(channel),
4650                    &[
4651                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4652                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4653                        ("user_a".to_string(), "sup".to_string(), true)
4654                    ]
4655                );
4656                task
4657            })
4658            .await
4659            .unwrap();
4660
4661        channel_b
4662            .condition(&cx_b, |channel, _| {
4663                channel_messages(channel)
4664                    == [
4665                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4666                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4667                        ("user_a".to_string(), "sup".to_string(), false),
4668                    ]
4669            })
4670            .await;
4671
4672        assert_eq!(
4673            server
4674                .state()
4675                .await
4676                .channel(channel_id)
4677                .unwrap()
4678                .connection_ids
4679                .len(),
4680            2
4681        );
4682        cx_b.update(|_| drop(channel_b));
4683        server
4684            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4685            .await;
4686
4687        cx_a.update(|_| drop(channel_a));
4688        server
4689            .condition(|state| state.channel(channel_id).is_none())
4690            .await;
4691    }
4692
4693    #[gpui::test(iterations = 10)]
4694    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4695        cx_a.foreground().forbid_parking();
4696
4697        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4698        let client_a = server.create_client(cx_a, "user_a").await;
4699
4700        let db = &server.app_state.db;
4701        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4702        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4703        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4704            .await
4705            .unwrap();
4706        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4707            .await
4708            .unwrap();
4709
4710        let channels_a = cx_a
4711            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4712        channels_a
4713            .condition(cx_a, |list, _| list.available_channels().is_some())
4714            .await;
4715        let channel_a = channels_a.update(cx_a, |this, cx| {
4716            this.get_channel(channel_id.to_proto(), cx).unwrap()
4717        });
4718
4719        // Messages aren't allowed to be too long.
4720        channel_a
4721            .update(cx_a, |channel, cx| {
4722                let long_body = "this is long.\n".repeat(1024);
4723                channel.send_message(long_body, cx).unwrap()
4724            })
4725            .await
4726            .unwrap_err();
4727
4728        // Messages aren't allowed to be blank.
4729        channel_a.update(cx_a, |channel, cx| {
4730            channel.send_message(String::new(), cx).unwrap_err()
4731        });
4732
4733        // Leading and trailing whitespace are trimmed.
4734        channel_a
4735            .update(cx_a, |channel, cx| {
4736                channel
4737                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4738                    .unwrap()
4739            })
4740            .await
4741            .unwrap();
4742        assert_eq!(
4743            db.get_channel_messages(channel_id, 10, None)
4744                .await
4745                .unwrap()
4746                .iter()
4747                .map(|m| &m.body)
4748                .collect::<Vec<_>>(),
4749            &["surrounded by whitespace"]
4750        );
4751    }
4752
4753    #[gpui::test(iterations = 10)]
4754    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4755        cx_a.foreground().forbid_parking();
4756
4757        // Connect to a server as 2 clients.
4758        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4759        let client_a = server.create_client(cx_a, "user_a").await;
4760        let client_b = server.create_client(cx_b, "user_b").await;
4761        let mut status_b = client_b.status();
4762
4763        // Create an org that includes these 2 users.
4764        let db = &server.app_state.db;
4765        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4766        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4767            .await
4768            .unwrap();
4769        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4770            .await
4771            .unwrap();
4772
4773        // Create a channel that includes all the users.
4774        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4775        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4776            .await
4777            .unwrap();
4778        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4779            .await
4780            .unwrap();
4781        db.create_channel_message(
4782            channel_id,
4783            client_b.current_user_id(&cx_b),
4784            "hello A, it's B.",
4785            OffsetDateTime::now_utc(),
4786            2,
4787        )
4788        .await
4789        .unwrap();
4790
4791        let channels_a = cx_a
4792            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4793        channels_a
4794            .condition(cx_a, |list, _| list.available_channels().is_some())
4795            .await;
4796
4797        channels_a.read_with(cx_a, |list, _| {
4798            assert_eq!(
4799                list.available_channels().unwrap(),
4800                &[ChannelDetails {
4801                    id: channel_id.to_proto(),
4802                    name: "test-channel".to_string()
4803                }]
4804            )
4805        });
4806        let channel_a = channels_a.update(cx_a, |this, cx| {
4807            this.get_channel(channel_id.to_proto(), cx).unwrap()
4808        });
4809        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4810        channel_a
4811            .condition(&cx_a, |channel, _| {
4812                channel_messages(channel)
4813                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4814            })
4815            .await;
4816
4817        let channels_b = cx_b
4818            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4819        channels_b
4820            .condition(cx_b, |list, _| list.available_channels().is_some())
4821            .await;
4822        channels_b.read_with(cx_b, |list, _| {
4823            assert_eq!(
4824                list.available_channels().unwrap(),
4825                &[ChannelDetails {
4826                    id: channel_id.to_proto(),
4827                    name: "test-channel".to_string()
4828                }]
4829            )
4830        });
4831
4832        let channel_b = channels_b.update(cx_b, |this, cx| {
4833            this.get_channel(channel_id.to_proto(), cx).unwrap()
4834        });
4835        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4836        channel_b
4837            .condition(&cx_b, |channel, _| {
4838                channel_messages(channel)
4839                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4840            })
4841            .await;
4842
4843        // Disconnect client B, ensuring we can still access its cached channel data.
4844        server.forbid_connections();
4845        server.disconnect_client(client_b.current_user_id(&cx_b));
4846        cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4847        while !matches!(
4848            status_b.next().await,
4849            Some(client::Status::ReconnectionError { .. })
4850        ) {}
4851
4852        channels_b.read_with(cx_b, |channels, _| {
4853            assert_eq!(
4854                channels.available_channels().unwrap(),
4855                [ChannelDetails {
4856                    id: channel_id.to_proto(),
4857                    name: "test-channel".to_string()
4858                }]
4859            )
4860        });
4861        channel_b.read_with(cx_b, |channel, _| {
4862            assert_eq!(
4863                channel_messages(channel),
4864                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4865            )
4866        });
4867
4868        // Send a message from client B while it is disconnected.
4869        channel_b
4870            .update(cx_b, |channel, cx| {
4871                let task = channel
4872                    .send_message("can you see this?".to_string(), cx)
4873                    .unwrap();
4874                assert_eq!(
4875                    channel_messages(channel),
4876                    &[
4877                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4878                        ("user_b".to_string(), "can you see this?".to_string(), true)
4879                    ]
4880                );
4881                task
4882            })
4883            .await
4884            .unwrap_err();
4885
4886        // Send a message from client A while B is disconnected.
4887        channel_a
4888            .update(cx_a, |channel, cx| {
4889                channel
4890                    .send_message("oh, hi B.".to_string(), cx)
4891                    .unwrap()
4892                    .detach();
4893                let task = channel.send_message("sup".to_string(), cx).unwrap();
4894                assert_eq!(
4895                    channel_messages(channel),
4896                    &[
4897                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4898                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4899                        ("user_a".to_string(), "sup".to_string(), true)
4900                    ]
4901                );
4902                task
4903            })
4904            .await
4905            .unwrap();
4906
4907        // Give client B a chance to reconnect.
4908        server.allow_connections();
4909        cx_b.foreground().advance_clock(Duration::from_secs(10));
4910
4911        // Verify that B sees the new messages upon reconnection, as well as the message client B
4912        // sent while offline.
4913        channel_b
4914            .condition(&cx_b, |channel, _| {
4915                channel_messages(channel)
4916                    == [
4917                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4918                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4919                        ("user_a".to_string(), "sup".to_string(), false),
4920                        ("user_b".to_string(), "can you see this?".to_string(), false),
4921                    ]
4922            })
4923            .await;
4924
4925        // Ensure client A and B can communicate normally after reconnection.
4926        channel_a
4927            .update(cx_a, |channel, cx| {
4928                channel.send_message("you online?".to_string(), cx).unwrap()
4929            })
4930            .await
4931            .unwrap();
4932        channel_b
4933            .condition(&cx_b, |channel, _| {
4934                channel_messages(channel)
4935                    == [
4936                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4937                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4938                        ("user_a".to_string(), "sup".to_string(), false),
4939                        ("user_b".to_string(), "can you see this?".to_string(), false),
4940                        ("user_a".to_string(), "you online?".to_string(), false),
4941                    ]
4942            })
4943            .await;
4944
4945        channel_b
4946            .update(cx_b, |channel, cx| {
4947                channel.send_message("yep".to_string(), cx).unwrap()
4948            })
4949            .await
4950            .unwrap();
4951        channel_a
4952            .condition(&cx_a, |channel, _| {
4953                channel_messages(channel)
4954                    == [
4955                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4956                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4957                        ("user_a".to_string(), "sup".to_string(), false),
4958                        ("user_b".to_string(), "can you see this?".to_string(), false),
4959                        ("user_a".to_string(), "you online?".to_string(), false),
4960                        ("user_b".to_string(), "yep".to_string(), false),
4961                    ]
4962            })
4963            .await;
4964    }
4965
4966    #[gpui::test(iterations = 10)]
4967    async fn test_contacts(
4968        deterministic: Arc<Deterministic>,
4969        cx_a: &mut TestAppContext,
4970        cx_b: &mut TestAppContext,
4971        cx_c: &mut TestAppContext,
4972    ) {
4973        cx_a.foreground().forbid_parking();
4974
4975        // Connect to a server as 3 clients.
4976        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4977        let mut client_a = server.create_client(cx_a, "user_a").await;
4978        let mut client_b = server.create_client(cx_b, "user_b").await;
4979        let client_c = server.create_client(cx_c, "user_c").await;
4980        server
4981            .make_contacts(vec![
4982                (&client_a, cx_a),
4983                (&client_b, cx_b),
4984                (&client_c, cx_c),
4985            ])
4986            .await;
4987
4988        deterministic.run_until_parked();
4989        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4990            client.user_store.read_with(*cx, |store, _| {
4991                assert_eq!(
4992                    contacts(store),
4993                    [
4994                        ("user_a", true, vec![]),
4995                        ("user_b", true, vec![]),
4996                        ("user_c", true, vec![])
4997                    ],
4998                    "{} has the wrong contacts",
4999                    client.username
5000                )
5001            });
5002        }
5003
5004        // Share a project as client A.
5005        let fs = FakeFs::new(cx_a.background());
5006        fs.create_dir(Path::new("/a")).await.unwrap();
5007        let (project_a, _) = client_a.build_local_project(fs, "/a", cx_a).await;
5008
5009        deterministic.run_until_parked();
5010        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5011            client.user_store.read_with(*cx, |store, _| {
5012                assert_eq!(
5013                    contacts(store),
5014                    [
5015                        ("user_a", true, vec![("a", vec![])]),
5016                        ("user_b", true, vec![]),
5017                        ("user_c", true, vec![])
5018                    ],
5019                    "{} has the wrong contacts",
5020                    client.username
5021                )
5022            });
5023        }
5024
5025        let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5026
5027        deterministic.run_until_parked();
5028        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5029            client.user_store.read_with(*cx, |store, _| {
5030                assert_eq!(
5031                    contacts(store),
5032                    [
5033                        ("user_a", true, vec![("a", vec!["user_b"])]),
5034                        ("user_b", true, vec![]),
5035                        ("user_c", true, vec![])
5036                    ],
5037                    "{} has the wrong contacts",
5038                    client.username
5039                )
5040            });
5041        }
5042
5043        // Add a local project as client B
5044        let fs = FakeFs::new(cx_b.background());
5045        fs.create_dir(Path::new("/b")).await.unwrap();
5046        let (_project_b, _) = client_b.build_local_project(fs, "/b", cx_a).await;
5047
5048        deterministic.run_until_parked();
5049        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5050            client.user_store.read_with(*cx, |store, _| {
5051                assert_eq!(
5052                    contacts(store),
5053                    [
5054                        ("user_a", true, vec![("a", vec!["user_b"])]),
5055                        ("user_b", true, vec![("b", vec![])]),
5056                        ("user_c", true, vec![])
5057                    ],
5058                    "{} has the wrong contacts",
5059                    client.username
5060                )
5061            });
5062        }
5063
5064        project_a
5065            .condition(&cx_a, |project, _| {
5066                project.collaborators().contains_key(&client_b.peer_id)
5067            })
5068            .await;
5069
5070        client_a.project.take();
5071        cx_a.update(move |_| drop(project_a));
5072        deterministic.run_until_parked();
5073        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5074            client.user_store.read_with(*cx, |store, _| {
5075                assert_eq!(
5076                    contacts(store),
5077                    [
5078                        ("user_a", true, vec![]),
5079                        ("user_b", true, vec![("b", vec![])]),
5080                        ("user_c", true, vec![])
5081                    ],
5082                    "{} has the wrong contacts",
5083                    client.username
5084                )
5085            });
5086        }
5087
5088        server.disconnect_client(client_c.current_user_id(cx_c));
5089        server.forbid_connections();
5090        deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5091        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5092            client.user_store.read_with(*cx, |store, _| {
5093                assert_eq!(
5094                    contacts(store),
5095                    [
5096                        ("user_a", true, vec![]),
5097                        ("user_b", true, vec![("b", vec![])]),
5098                        ("user_c", false, vec![])
5099                    ],
5100                    "{} has the wrong contacts",
5101                    client.username
5102                )
5103            });
5104        }
5105        client_c
5106            .user_store
5107            .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5108
5109        server.allow_connections();
5110        client_c
5111            .authenticate_and_connect(false, &cx_c.to_async())
5112            .await
5113            .unwrap();
5114
5115        deterministic.run_until_parked();
5116        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5117            client.user_store.read_with(*cx, |store, _| {
5118                assert_eq!(
5119                    contacts(store),
5120                    [
5121                        ("user_a", true, vec![]),
5122                        ("user_b", true, vec![("b", vec![])]),
5123                        ("user_c", true, vec![])
5124                    ],
5125                    "{} has the wrong contacts",
5126                    client.username
5127                )
5128            });
5129        }
5130
5131        fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, Vec<&str>)>)> {
5132            user_store
5133                .contacts()
5134                .iter()
5135                .map(|contact| {
5136                    let projects = contact
5137                        .projects
5138                        .iter()
5139                        .map(|p| {
5140                            (
5141                                p.worktree_root_names[0].as_str(),
5142                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5143                            )
5144                        })
5145                        .collect();
5146                    (contact.user.github_login.as_str(), contact.online, projects)
5147                })
5148                .collect()
5149        }
5150    }
5151
5152    #[gpui::test(iterations = 10)]
5153    async fn test_contact_requests(
5154        executor: Arc<Deterministic>,
5155        cx_a: &mut TestAppContext,
5156        cx_a2: &mut TestAppContext,
5157        cx_b: &mut TestAppContext,
5158        cx_b2: &mut TestAppContext,
5159        cx_c: &mut TestAppContext,
5160        cx_c2: &mut TestAppContext,
5161    ) {
5162        cx_a.foreground().forbid_parking();
5163
5164        // Connect to a server as 3 clients.
5165        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5166        let client_a = server.create_client(cx_a, "user_a").await;
5167        let client_a2 = server.create_client(cx_a2, "user_a").await;
5168        let client_b = server.create_client(cx_b, "user_b").await;
5169        let client_b2 = server.create_client(cx_b2, "user_b").await;
5170        let client_c = server.create_client(cx_c, "user_c").await;
5171        let client_c2 = server.create_client(cx_c2, "user_c").await;
5172
5173        assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5174        assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5175        assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5176
5177        // User A and User C request that user B become their contact.
5178        client_a
5179            .user_store
5180            .update(cx_a, |store, cx| {
5181                store.request_contact(client_b.user_id().unwrap(), cx)
5182            })
5183            .await
5184            .unwrap();
5185        client_c
5186            .user_store
5187            .update(cx_c, |store, cx| {
5188                store.request_contact(client_b.user_id().unwrap(), cx)
5189            })
5190            .await
5191            .unwrap();
5192        executor.run_until_parked();
5193
5194        // All users see the pending request appear in all their clients.
5195        assert_eq!(
5196            client_a.summarize_contacts(&cx_a).outgoing_requests,
5197            &["user_b"]
5198        );
5199        assert_eq!(
5200            client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5201            &["user_b"]
5202        );
5203        assert_eq!(
5204            client_b.summarize_contacts(&cx_b).incoming_requests,
5205            &["user_a", "user_c"]
5206        );
5207        assert_eq!(
5208            client_b2.summarize_contacts(&cx_b2).incoming_requests,
5209            &["user_a", "user_c"]
5210        );
5211        assert_eq!(
5212            client_c.summarize_contacts(&cx_c).outgoing_requests,
5213            &["user_b"]
5214        );
5215        assert_eq!(
5216            client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5217            &["user_b"]
5218        );
5219
5220        // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5221        disconnect_and_reconnect(&client_a, cx_a).await;
5222        disconnect_and_reconnect(&client_b, cx_b).await;
5223        disconnect_and_reconnect(&client_c, cx_c).await;
5224        executor.run_until_parked();
5225        assert_eq!(
5226            client_a.summarize_contacts(&cx_a).outgoing_requests,
5227            &["user_b"]
5228        );
5229        assert_eq!(
5230            client_b.summarize_contacts(&cx_b).incoming_requests,
5231            &["user_a", "user_c"]
5232        );
5233        assert_eq!(
5234            client_c.summarize_contacts(&cx_c).outgoing_requests,
5235            &["user_b"]
5236        );
5237
5238        // User B accepts the request from user A.
5239        client_b
5240            .user_store
5241            .update(cx_b, |store, cx| {
5242                store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5243            })
5244            .await
5245            .unwrap();
5246
5247        executor.run_until_parked();
5248
5249        // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5250        let contacts_b = client_b.summarize_contacts(&cx_b);
5251        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5252        assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5253        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5254        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5255        assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5256
5257        // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5258        let contacts_a = client_a.summarize_contacts(&cx_a);
5259        assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5260        assert!(contacts_a.outgoing_requests.is_empty());
5261        let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5262        assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5263        assert!(contacts_a2.outgoing_requests.is_empty());
5264
5265        // Contacts are present upon connecting (tested here via disconnect/reconnect)
5266        disconnect_and_reconnect(&client_a, cx_a).await;
5267        disconnect_and_reconnect(&client_b, cx_b).await;
5268        disconnect_and_reconnect(&client_c, cx_c).await;
5269        executor.run_until_parked();
5270        assert_eq!(
5271            client_a.summarize_contacts(&cx_a).current,
5272            &["user_a", "user_b"]
5273        );
5274        assert_eq!(
5275            client_b.summarize_contacts(&cx_b).current,
5276            &["user_a", "user_b"]
5277        );
5278        assert_eq!(
5279            client_b.summarize_contacts(&cx_b).incoming_requests,
5280            &["user_c"]
5281        );
5282        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5283        assert_eq!(
5284            client_c.summarize_contacts(&cx_c).outgoing_requests,
5285            &["user_b"]
5286        );
5287
5288        // User B rejects the request from user C.
5289        client_b
5290            .user_store
5291            .update(cx_b, |store, cx| {
5292                store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5293            })
5294            .await
5295            .unwrap();
5296
5297        executor.run_until_parked();
5298
5299        // User B doesn't see user C as their contact, and the incoming request from them is removed.
5300        let contacts_b = client_b.summarize_contacts(&cx_b);
5301        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5302        assert!(contacts_b.incoming_requests.is_empty());
5303        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5304        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5305        assert!(contacts_b2.incoming_requests.is_empty());
5306
5307        // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5308        let contacts_c = client_c.summarize_contacts(&cx_c);
5309        assert_eq!(contacts_c.current, &["user_c"]);
5310        assert!(contacts_c.outgoing_requests.is_empty());
5311        let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5312        assert_eq!(contacts_c2.current, &["user_c"]);
5313        assert!(contacts_c2.outgoing_requests.is_empty());
5314
5315        // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5316        disconnect_and_reconnect(&client_a, cx_a).await;
5317        disconnect_and_reconnect(&client_b, cx_b).await;
5318        disconnect_and_reconnect(&client_c, cx_c).await;
5319        executor.run_until_parked();
5320        assert_eq!(
5321            client_a.summarize_contacts(&cx_a).current,
5322            &["user_a", "user_b"]
5323        );
5324        assert_eq!(
5325            client_b.summarize_contacts(&cx_b).current,
5326            &["user_a", "user_b"]
5327        );
5328        assert!(client_b
5329            .summarize_contacts(&cx_b)
5330            .incoming_requests
5331            .is_empty());
5332        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5333        assert!(client_c
5334            .summarize_contacts(&cx_c)
5335            .outgoing_requests
5336            .is_empty());
5337
5338        async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5339            client.disconnect(&cx.to_async()).unwrap();
5340            client.clear_contacts(cx).await;
5341            client
5342                .authenticate_and_connect(false, &cx.to_async())
5343                .await
5344                .unwrap();
5345        }
5346    }
5347
5348    #[gpui::test(iterations = 10)]
5349    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5350        cx_a.foreground().forbid_parking();
5351        let fs = FakeFs::new(cx_a.background());
5352
5353        // 2 clients connect to a server.
5354        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5355        let mut client_a = server.create_client(cx_a, "user_a").await;
5356        let mut client_b = server.create_client(cx_b, "user_b").await;
5357        server
5358            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5359            .await;
5360        cx_a.update(editor::init);
5361        cx_b.update(editor::init);
5362
5363        // Client A shares a project.
5364        fs.insert_tree(
5365            "/a",
5366            json!({
5367                "1.txt": "one",
5368                "2.txt": "two",
5369                "3.txt": "three",
5370            }),
5371        )
5372        .await;
5373        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5374
5375        // Client B joins the project.
5376        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5377
5378        // Client A opens some editors.
5379        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5380        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5381        let editor_a1 = workspace_a
5382            .update(cx_a, |workspace, cx| {
5383                workspace.open_path((worktree_id, "1.txt"), true, cx)
5384            })
5385            .await
5386            .unwrap()
5387            .downcast::<Editor>()
5388            .unwrap();
5389        let editor_a2 = workspace_a
5390            .update(cx_a, |workspace, cx| {
5391                workspace.open_path((worktree_id, "2.txt"), true, cx)
5392            })
5393            .await
5394            .unwrap()
5395            .downcast::<Editor>()
5396            .unwrap();
5397
5398        // Client B opens an editor.
5399        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5400        let editor_b1 = workspace_b
5401            .update(cx_b, |workspace, cx| {
5402                workspace.open_path((worktree_id, "1.txt"), true, cx)
5403            })
5404            .await
5405            .unwrap()
5406            .downcast::<Editor>()
5407            .unwrap();
5408
5409        let client_a_id = project_b.read_with(cx_b, |project, _| {
5410            project.collaborators().values().next().unwrap().peer_id
5411        });
5412        let client_b_id = project_a.read_with(cx_a, |project, _| {
5413            project.collaborators().values().next().unwrap().peer_id
5414        });
5415
5416        // When client B starts following client A, all visible view states are replicated to client B.
5417        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5418        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5419        workspace_b
5420            .update(cx_b, |workspace, cx| {
5421                workspace
5422                    .toggle_follow(&ToggleFollow(client_a_id), cx)
5423                    .unwrap()
5424            })
5425            .await
5426            .unwrap();
5427
5428        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5429            workspace
5430                .active_item(cx)
5431                .unwrap()
5432                .downcast::<Editor>()
5433                .unwrap()
5434        });
5435        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5436        assert_eq!(
5437            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5438            Some((worktree_id, "2.txt").into())
5439        );
5440        assert_eq!(
5441            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5442            vec![2..3]
5443        );
5444        assert_eq!(
5445            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5446            vec![0..1]
5447        );
5448
5449        // When client A activates a different editor, client B does so as well.
5450        workspace_a.update(cx_a, |workspace, cx| {
5451            workspace.activate_item(&editor_a1, cx)
5452        });
5453        workspace_b
5454            .condition(cx_b, |workspace, cx| {
5455                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5456            })
5457            .await;
5458
5459        // When client A navigates back and forth, client B does so as well.
5460        workspace_a
5461            .update(cx_a, |workspace, cx| {
5462                workspace::Pane::go_back(workspace, None, cx)
5463            })
5464            .await;
5465        workspace_b
5466            .condition(cx_b, |workspace, cx| {
5467                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5468            })
5469            .await;
5470
5471        workspace_a
5472            .update(cx_a, |workspace, cx| {
5473                workspace::Pane::go_forward(workspace, None, cx)
5474            })
5475            .await;
5476        workspace_b
5477            .condition(cx_b, |workspace, cx| {
5478                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5479            })
5480            .await;
5481
5482        // Changes to client A's editor are reflected on client B.
5483        editor_a1.update(cx_a, |editor, cx| {
5484            editor.select_ranges([1..1, 2..2], None, cx);
5485        });
5486        editor_b1
5487            .condition(cx_b, |editor, cx| {
5488                editor.selected_ranges(cx) == vec![1..1, 2..2]
5489            })
5490            .await;
5491
5492        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5493        editor_b1
5494            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5495            .await;
5496
5497        editor_a1.update(cx_a, |editor, cx| {
5498            editor.select_ranges([3..3], None, cx);
5499            editor.set_scroll_position(vec2f(0., 100.), cx);
5500        });
5501        editor_b1
5502            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5503            .await;
5504
5505        // After unfollowing, client B stops receiving updates from client A.
5506        workspace_b.update(cx_b, |workspace, cx| {
5507            workspace.unfollow(&workspace.active_pane().clone(), cx)
5508        });
5509        workspace_a.update(cx_a, |workspace, cx| {
5510            workspace.activate_item(&editor_a2, cx)
5511        });
5512        cx_a.foreground().run_until_parked();
5513        assert_eq!(
5514            workspace_b.read_with(cx_b, |workspace, cx| workspace
5515                .active_item(cx)
5516                .unwrap()
5517                .id()),
5518            editor_b1.id()
5519        );
5520
5521        // Client A starts following client B.
5522        workspace_a
5523            .update(cx_a, |workspace, cx| {
5524                workspace
5525                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5526                    .unwrap()
5527            })
5528            .await
5529            .unwrap();
5530        assert_eq!(
5531            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5532            Some(client_b_id)
5533        );
5534        assert_eq!(
5535            workspace_a.read_with(cx_a, |workspace, cx| workspace
5536                .active_item(cx)
5537                .unwrap()
5538                .id()),
5539            editor_a1.id()
5540        );
5541
5542        // Following interrupts when client B disconnects.
5543        client_b.disconnect(&cx_b.to_async()).unwrap();
5544        cx_a.foreground().run_until_parked();
5545        assert_eq!(
5546            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5547            None
5548        );
5549    }
5550
5551    #[gpui::test(iterations = 10)]
5552    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5553        cx_a.foreground().forbid_parking();
5554        let fs = FakeFs::new(cx_a.background());
5555
5556        // 2 clients connect to a server.
5557        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5558        let mut client_a = server.create_client(cx_a, "user_a").await;
5559        let mut client_b = server.create_client(cx_b, "user_b").await;
5560        server
5561            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5562            .await;
5563        cx_a.update(editor::init);
5564        cx_b.update(editor::init);
5565
5566        // Client A shares a project.
5567        fs.insert_tree(
5568            "/a",
5569            json!({
5570                "1.txt": "one",
5571                "2.txt": "two",
5572                "3.txt": "three",
5573                "4.txt": "four",
5574            }),
5575        )
5576        .await;
5577        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5578
5579        // Client B joins the project.
5580        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5581
5582        // Client A opens some editors.
5583        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5584        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5585        let _editor_a1 = workspace_a
5586            .update(cx_a, |workspace, cx| {
5587                workspace.open_path((worktree_id, "1.txt"), true, cx)
5588            })
5589            .await
5590            .unwrap()
5591            .downcast::<Editor>()
5592            .unwrap();
5593
5594        // Client B opens an editor.
5595        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5596        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5597        let _editor_b1 = workspace_b
5598            .update(cx_b, |workspace, cx| {
5599                workspace.open_path((worktree_id, "2.txt"), true, cx)
5600            })
5601            .await
5602            .unwrap()
5603            .downcast::<Editor>()
5604            .unwrap();
5605
5606        // Clients A and B follow each other in split panes
5607        workspace_a
5608            .update(cx_a, |workspace, cx| {
5609                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5610                assert_ne!(*workspace.active_pane(), pane_a1);
5611                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5612                workspace
5613                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5614                    .unwrap()
5615            })
5616            .await
5617            .unwrap();
5618        workspace_b
5619            .update(cx_b, |workspace, cx| {
5620                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5621                assert_ne!(*workspace.active_pane(), pane_b1);
5622                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5623                workspace
5624                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5625                    .unwrap()
5626            })
5627            .await
5628            .unwrap();
5629
5630        workspace_a
5631            .update(cx_a, |workspace, cx| {
5632                workspace.activate_next_pane(cx);
5633                assert_eq!(*workspace.active_pane(), pane_a1);
5634                workspace.open_path((worktree_id, "3.txt"), true, cx)
5635            })
5636            .await
5637            .unwrap();
5638        workspace_b
5639            .update(cx_b, |workspace, cx| {
5640                workspace.activate_next_pane(cx);
5641                assert_eq!(*workspace.active_pane(), pane_b1);
5642                workspace.open_path((worktree_id, "4.txt"), true, cx)
5643            })
5644            .await
5645            .unwrap();
5646        cx_a.foreground().run_until_parked();
5647
5648        // Ensure leader updates don't change the active pane of followers
5649        workspace_a.read_with(cx_a, |workspace, _| {
5650            assert_eq!(*workspace.active_pane(), pane_a1);
5651        });
5652        workspace_b.read_with(cx_b, |workspace, _| {
5653            assert_eq!(*workspace.active_pane(), pane_b1);
5654        });
5655
5656        // Ensure peers following each other doesn't cause an infinite loop.
5657        assert_eq!(
5658            workspace_a.read_with(cx_a, |workspace, cx| workspace
5659                .active_item(cx)
5660                .unwrap()
5661                .project_path(cx)),
5662            Some((worktree_id, "3.txt").into())
5663        );
5664        workspace_a.update(cx_a, |workspace, cx| {
5665            assert_eq!(
5666                workspace.active_item(cx).unwrap().project_path(cx),
5667                Some((worktree_id, "3.txt").into())
5668            );
5669            workspace.activate_next_pane(cx);
5670            assert_eq!(
5671                workspace.active_item(cx).unwrap().project_path(cx),
5672                Some((worktree_id, "4.txt").into())
5673            );
5674        });
5675        workspace_b.update(cx_b, |workspace, cx| {
5676            assert_eq!(
5677                workspace.active_item(cx).unwrap().project_path(cx),
5678                Some((worktree_id, "4.txt").into())
5679            );
5680            workspace.activate_next_pane(cx);
5681            assert_eq!(
5682                workspace.active_item(cx).unwrap().project_path(cx),
5683                Some((worktree_id, "3.txt").into())
5684            );
5685        });
5686    }
5687
5688    #[gpui::test(iterations = 10)]
5689    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5690        cx_a.foreground().forbid_parking();
5691        let fs = FakeFs::new(cx_a.background());
5692
5693        // 2 clients connect to a server.
5694        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5695        let mut client_a = server.create_client(cx_a, "user_a").await;
5696        let mut client_b = server.create_client(cx_b, "user_b").await;
5697        server
5698            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5699            .await;
5700        cx_a.update(editor::init);
5701        cx_b.update(editor::init);
5702
5703        // Client A shares a project.
5704        fs.insert_tree(
5705            "/a",
5706            json!({
5707                "1.txt": "one",
5708                "2.txt": "two",
5709                "3.txt": "three",
5710            }),
5711        )
5712        .await;
5713        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5714
5715        // Client B joins the project.
5716        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5717
5718        // Client A opens some editors.
5719        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5720        let _editor_a1 = workspace_a
5721            .update(cx_a, |workspace, cx| {
5722                workspace.open_path((worktree_id, "1.txt"), true, cx)
5723            })
5724            .await
5725            .unwrap()
5726            .downcast::<Editor>()
5727            .unwrap();
5728
5729        // Client B starts following client A.
5730        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5731        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5732        let leader_id = project_b.read_with(cx_b, |project, _| {
5733            project.collaborators().values().next().unwrap().peer_id
5734        });
5735        workspace_b
5736            .update(cx_b, |workspace, cx| {
5737                workspace
5738                    .toggle_follow(&ToggleFollow(leader_id), cx)
5739                    .unwrap()
5740            })
5741            .await
5742            .unwrap();
5743        assert_eq!(
5744            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5745            Some(leader_id)
5746        );
5747        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5748            workspace
5749                .active_item(cx)
5750                .unwrap()
5751                .downcast::<Editor>()
5752                .unwrap()
5753        });
5754
5755        // When client B moves, it automatically stops following client A.
5756        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5757        assert_eq!(
5758            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5759            None
5760        );
5761
5762        workspace_b
5763            .update(cx_b, |workspace, cx| {
5764                workspace
5765                    .toggle_follow(&ToggleFollow(leader_id), cx)
5766                    .unwrap()
5767            })
5768            .await
5769            .unwrap();
5770        assert_eq!(
5771            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5772            Some(leader_id)
5773        );
5774
5775        // When client B edits, it automatically stops following client A.
5776        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5777        assert_eq!(
5778            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5779            None
5780        );
5781
5782        workspace_b
5783            .update(cx_b, |workspace, cx| {
5784                workspace
5785                    .toggle_follow(&ToggleFollow(leader_id), cx)
5786                    .unwrap()
5787            })
5788            .await
5789            .unwrap();
5790        assert_eq!(
5791            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5792            Some(leader_id)
5793        );
5794
5795        // When client B scrolls, it automatically stops following client A.
5796        editor_b2.update(cx_b, |editor, cx| {
5797            editor.set_scroll_position(vec2f(0., 3.), cx)
5798        });
5799        assert_eq!(
5800            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5801            None
5802        );
5803
5804        workspace_b
5805            .update(cx_b, |workspace, cx| {
5806                workspace
5807                    .toggle_follow(&ToggleFollow(leader_id), cx)
5808                    .unwrap()
5809            })
5810            .await
5811            .unwrap();
5812        assert_eq!(
5813            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5814            Some(leader_id)
5815        );
5816
5817        // When client B activates a different pane, it continues following client A in the original pane.
5818        workspace_b.update(cx_b, |workspace, cx| {
5819            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5820        });
5821        assert_eq!(
5822            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5823            Some(leader_id)
5824        );
5825
5826        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5827        assert_eq!(
5828            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5829            Some(leader_id)
5830        );
5831
5832        // When client B activates a different item in the original pane, it automatically stops following client A.
5833        workspace_b
5834            .update(cx_b, |workspace, cx| {
5835                workspace.open_path((worktree_id, "2.txt"), true, cx)
5836            })
5837            .await
5838            .unwrap();
5839        assert_eq!(
5840            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5841            None
5842        );
5843    }
5844
5845    #[gpui::test(iterations = 100)]
5846    async fn test_random_collaboration(
5847        cx: &mut TestAppContext,
5848        deterministic: Arc<Deterministic>,
5849        rng: StdRng,
5850    ) {
5851        cx.foreground().forbid_parking();
5852        let max_peers = env::var("MAX_PEERS")
5853            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5854            .unwrap_or(5);
5855        assert!(max_peers <= 5);
5856
5857        let max_operations = env::var("OPERATIONS")
5858            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5859            .unwrap_or(10);
5860
5861        let rng = Arc::new(Mutex::new(rng));
5862
5863        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5864        let host_language_registry = Arc::new(LanguageRegistry::test());
5865
5866        let fs = FakeFs::new(cx.background());
5867        fs.insert_tree("/_collab", json!({"init": ""})).await;
5868
5869        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5870        let db = server.app_state.db.clone();
5871        let host_user_id = db.create_user("host", false).await.unwrap();
5872        for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
5873            let guest_user_id = db.create_user(username, false).await.unwrap();
5874            server
5875                .app_state
5876                .db
5877                .send_contact_request(guest_user_id, host_user_id)
5878                .await
5879                .unwrap();
5880            server
5881                .app_state
5882                .db
5883                .respond_to_contact_request(host_user_id, guest_user_id, true)
5884                .await
5885                .unwrap();
5886        }
5887
5888        let mut clients = Vec::new();
5889        let mut user_ids = Vec::new();
5890        let mut op_start_signals = Vec::new();
5891
5892        let mut next_entity_id = 100000;
5893        let mut host_cx = TestAppContext::new(
5894            cx.foreground_platform(),
5895            cx.platform(),
5896            deterministic.build_foreground(next_entity_id),
5897            deterministic.build_background(),
5898            cx.font_cache(),
5899            cx.leak_detector(),
5900            next_entity_id,
5901        );
5902        let host = server.create_client(&mut host_cx, "host").await;
5903        let host_project = host_cx.update(|cx| {
5904            Project::local(
5905                host.client.clone(),
5906                host.user_store.clone(),
5907                host_language_registry.clone(),
5908                fs.clone(),
5909                cx,
5910            )
5911        });
5912        let host_project_id = host_project
5913            .update(&mut host_cx, |p, _| p.next_remote_id())
5914            .await;
5915
5916        let (collab_worktree, _) = host_project
5917            .update(&mut host_cx, |project, cx| {
5918                project.find_or_create_local_worktree("/_collab", true, cx)
5919            })
5920            .await
5921            .unwrap();
5922        collab_worktree
5923            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5924            .await;
5925
5926        // Set up fake language servers.
5927        let mut language = Language::new(
5928            LanguageConfig {
5929                name: "Rust".into(),
5930                path_suffixes: vec!["rs".to_string()],
5931                ..Default::default()
5932            },
5933            None,
5934        );
5935        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5936            name: "the-fake-language-server",
5937            capabilities: lsp::LanguageServer::full_capabilities(),
5938            initializer: Some(Box::new({
5939                let rng = rng.clone();
5940                let fs = fs.clone();
5941                let project = host_project.downgrade();
5942                move |fake_server: &mut FakeLanguageServer| {
5943                    fake_server.handle_request::<lsp::request::Completion, _, _>(
5944                        |_, _| async move {
5945                            Ok(Some(lsp::CompletionResponse::Array(vec![
5946                                lsp::CompletionItem {
5947                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5948                                        range: lsp::Range::new(
5949                                            lsp::Position::new(0, 0),
5950                                            lsp::Position::new(0, 0),
5951                                        ),
5952                                        new_text: "the-new-text".to_string(),
5953                                    })),
5954                                    ..Default::default()
5955                                },
5956                            ])))
5957                        },
5958                    );
5959
5960                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5961                        |_, _| async move {
5962                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5963                                lsp::CodeAction {
5964                                    title: "the-code-action".to_string(),
5965                                    ..Default::default()
5966                                },
5967                            )]))
5968                        },
5969                    );
5970
5971                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5972                        |params, _| async move {
5973                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5974                                params.position,
5975                                params.position,
5976                            ))))
5977                        },
5978                    );
5979
5980                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5981                        let fs = fs.clone();
5982                        let rng = rng.clone();
5983                        move |_, _| {
5984                            let fs = fs.clone();
5985                            let rng = rng.clone();
5986                            async move {
5987                                let files = fs.files().await;
5988                                let mut rng = rng.lock();
5989                                let count = rng.gen_range::<usize, _>(1..3);
5990                                let files = (0..count)
5991                                    .map(|_| files.choose(&mut *rng).unwrap())
5992                                    .collect::<Vec<_>>();
5993                                log::info!("LSP: Returning definitions in files {:?}", &files);
5994                                Ok(Some(lsp::GotoDefinitionResponse::Array(
5995                                    files
5996                                        .into_iter()
5997                                        .map(|file| lsp::Location {
5998                                            uri: lsp::Url::from_file_path(file).unwrap(),
5999                                            range: Default::default(),
6000                                        })
6001                                        .collect(),
6002                                )))
6003                            }
6004                        }
6005                    });
6006
6007                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6008                        let rng = rng.clone();
6009                        let project = project.clone();
6010                        move |params, mut cx| {
6011                            let highlights = if let Some(project) = project.upgrade(&cx) {
6012                                project.update(&mut cx, |project, cx| {
6013                                    let path = params
6014                                        .text_document_position_params
6015                                        .text_document
6016                                        .uri
6017                                        .to_file_path()
6018                                        .unwrap();
6019                                    let (worktree, relative_path) =
6020                                        project.find_local_worktree(&path, cx)?;
6021                                    let project_path =
6022                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
6023                                    let buffer =
6024                                        project.get_open_buffer(&project_path, cx)?.read(cx);
6025
6026                                    let mut highlights = Vec::new();
6027                                    let highlight_count = rng.lock().gen_range(1..=5);
6028                                    let mut prev_end = 0;
6029                                    for _ in 0..highlight_count {
6030                                        let range =
6031                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
6032
6033                                        highlights.push(lsp::DocumentHighlight {
6034                                            range: range_to_lsp(range.to_point_utf16(buffer)),
6035                                            kind: Some(lsp::DocumentHighlightKind::READ),
6036                                        });
6037                                        prev_end = range.end;
6038                                    }
6039                                    Some(highlights)
6040                                })
6041                            } else {
6042                                None
6043                            };
6044                            async move { Ok(highlights) }
6045                        }
6046                    });
6047                }
6048            })),
6049            ..Default::default()
6050        });
6051        host_language_registry.add(Arc::new(language));
6052
6053        let op_start_signal = futures::channel::mpsc::unbounded();
6054        user_ids.push(host.current_user_id(&host_cx));
6055        op_start_signals.push(op_start_signal.0);
6056        clients.push(host_cx.foreground().spawn(host.simulate_host(
6057            host_project,
6058            op_start_signal.1,
6059            rng.clone(),
6060            host_cx,
6061        )));
6062
6063        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6064            rng.lock().gen_range(0..max_operations)
6065        } else {
6066            max_operations
6067        };
6068        let mut available_guests = vec![
6069            "guest-1".to_string(),
6070            "guest-2".to_string(),
6071            "guest-3".to_string(),
6072            "guest-4".to_string(),
6073        ];
6074        let mut operations = 0;
6075        while operations < max_operations {
6076            if operations == disconnect_host_at {
6077                server.disconnect_client(user_ids[0]);
6078                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6079                drop(op_start_signals);
6080                let mut clients = futures::future::join_all(clients).await;
6081                cx.foreground().run_until_parked();
6082
6083                let (host, mut host_cx, host_err) = clients.remove(0);
6084                if let Some(host_err) = host_err {
6085                    log::error!("host error - {:?}", host_err);
6086                }
6087                host.project
6088                    .as_ref()
6089                    .unwrap()
6090                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6091                for (guest, mut guest_cx, guest_err) in clients {
6092                    if let Some(guest_err) = guest_err {
6093                        log::error!("{} error - {:?}", guest.username, guest_err);
6094                    }
6095
6096                    let contacts = server
6097                        .app_state
6098                        .db
6099                        .get_contacts(guest.current_user_id(&guest_cx))
6100                        .await
6101                        .unwrap();
6102                    let contacts = server
6103                        .store
6104                        .read()
6105                        .await
6106                        .build_initial_contacts_update(contacts)
6107                        .contacts;
6108                    assert!(!contacts
6109                        .iter()
6110                        .flat_map(|contact| &contact.projects)
6111                        .any(|project| project.id == host_project_id));
6112                    guest
6113                        .project
6114                        .as_ref()
6115                        .unwrap()
6116                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6117                    guest_cx.update(|_| drop(guest));
6118                }
6119                host_cx.update(|_| drop(host));
6120
6121                return;
6122            }
6123
6124            let distribution = rng.lock().gen_range(0..100);
6125            match distribution {
6126                0..=19 if !available_guests.is_empty() => {
6127                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
6128                    let guest_username = available_guests.remove(guest_ix);
6129                    log::info!("Adding new connection for {}", guest_username);
6130                    next_entity_id += 100000;
6131                    let mut guest_cx = TestAppContext::new(
6132                        cx.foreground_platform(),
6133                        cx.platform(),
6134                        deterministic.build_foreground(next_entity_id),
6135                        deterministic.build_background(),
6136                        cx.font_cache(),
6137                        cx.leak_detector(),
6138                        next_entity_id,
6139                    );
6140                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
6141                    let guest_project = Project::remote(
6142                        host_project_id,
6143                        guest.client.clone(),
6144                        guest.user_store.clone(),
6145                        guest_lang_registry.clone(),
6146                        FakeFs::new(cx.background()),
6147                        &mut guest_cx.to_async(),
6148                    )
6149                    .await
6150                    .unwrap();
6151                    let op_start_signal = futures::channel::mpsc::unbounded();
6152                    user_ids.push(guest.current_user_id(&guest_cx));
6153                    op_start_signals.push(op_start_signal.0);
6154                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6155                        guest_username.clone(),
6156                        guest_project,
6157                        op_start_signal.1,
6158                        rng.clone(),
6159                        guest_cx,
6160                    )));
6161
6162                    log::info!("Added connection for {}", guest_username);
6163                    operations += 1;
6164                }
6165                20..=29 if clients.len() > 1 => {
6166                    let guest_ix = rng.lock().gen_range(1..clients.len());
6167                    log::info!("Removing guest {}", user_ids[guest_ix]);
6168                    let removed_guest_id = user_ids.remove(guest_ix);
6169                    let guest = clients.remove(guest_ix);
6170                    op_start_signals.remove(guest_ix);
6171                    server.forbid_connections();
6172                    server.disconnect_client(removed_guest_id);
6173                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6174                    let (guest, mut guest_cx, guest_err) = guest.await;
6175                    server.allow_connections();
6176
6177                    if let Some(guest_err) = guest_err {
6178                        log::error!("{} error - {:?}", guest.username, guest_err);
6179                    }
6180                    guest
6181                        .project
6182                        .as_ref()
6183                        .unwrap()
6184                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6185                    for user_id in &user_ids {
6186                        let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap();
6187                        let contacts = server
6188                            .store
6189                            .read()
6190                            .await
6191                            .build_initial_contacts_update(contacts)
6192                            .contacts;
6193                        for contact in contacts {
6194                            if contact.online {
6195                                assert_ne!(
6196                                    contact.user_id, removed_guest_id.0 as u64,
6197                                    "removed guest is still a contact of another peer"
6198                                );
6199                            }
6200                            for project in contact.projects {
6201                                for project_guest_id in project.guests {
6202                                    assert_ne!(
6203                                        project_guest_id, removed_guest_id.0 as u64,
6204                                        "removed guest appears as still participating on a project"
6205                                    );
6206                                }
6207                            }
6208                        }
6209                    }
6210
6211                    log::info!("{} removed", guest.username);
6212                    available_guests.push(guest.username.clone());
6213                    guest_cx.update(|_| drop(guest));
6214
6215                    operations += 1;
6216                }
6217                _ => {
6218                    while operations < max_operations && rng.lock().gen_bool(0.7) {
6219                        op_start_signals
6220                            .choose(&mut *rng.lock())
6221                            .unwrap()
6222                            .unbounded_send(())
6223                            .unwrap();
6224                        operations += 1;
6225                    }
6226
6227                    if rng.lock().gen_bool(0.8) {
6228                        cx.foreground().run_until_parked();
6229                    }
6230                }
6231            }
6232        }
6233
6234        drop(op_start_signals);
6235        let mut clients = futures::future::join_all(clients).await;
6236        cx.foreground().run_until_parked();
6237
6238        let (host_client, mut host_cx, host_err) = clients.remove(0);
6239        if let Some(host_err) = host_err {
6240            panic!("host error - {:?}", host_err);
6241        }
6242        let host_project = host_client.project.as_ref().unwrap();
6243        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6244            project
6245                .worktrees(cx)
6246                .map(|worktree| {
6247                    let snapshot = worktree.read(cx).snapshot();
6248                    (snapshot.id(), snapshot)
6249                })
6250                .collect::<BTreeMap<_, _>>()
6251        });
6252
6253        host_client
6254            .project
6255            .as_ref()
6256            .unwrap()
6257            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6258
6259        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6260            if let Some(guest_err) = guest_err {
6261                panic!("{} error - {:?}", guest_client.username, guest_err);
6262            }
6263            let worktree_snapshots =
6264                guest_client
6265                    .project
6266                    .as_ref()
6267                    .unwrap()
6268                    .read_with(&guest_cx, |project, cx| {
6269                        project
6270                            .worktrees(cx)
6271                            .map(|worktree| {
6272                                let worktree = worktree.read(cx);
6273                                (worktree.id(), worktree.snapshot())
6274                            })
6275                            .collect::<BTreeMap<_, _>>()
6276                    });
6277
6278            assert_eq!(
6279                worktree_snapshots.keys().collect::<Vec<_>>(),
6280                host_worktree_snapshots.keys().collect::<Vec<_>>(),
6281                "{} has different worktrees than the host",
6282                guest_client.username
6283            );
6284            for (id, host_snapshot) in &host_worktree_snapshots {
6285                let guest_snapshot = &worktree_snapshots[id];
6286                assert_eq!(
6287                    guest_snapshot.root_name(),
6288                    host_snapshot.root_name(),
6289                    "{} has different root name than the host for worktree {}",
6290                    guest_client.username,
6291                    id
6292                );
6293                assert_eq!(
6294                    guest_snapshot.entries(false).collect::<Vec<_>>(),
6295                    host_snapshot.entries(false).collect::<Vec<_>>(),
6296                    "{} has different snapshot than the host for worktree {}",
6297                    guest_client.username,
6298                    id
6299                );
6300                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6301            }
6302
6303            guest_client
6304                .project
6305                .as_ref()
6306                .unwrap()
6307                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6308
6309            for guest_buffer in &guest_client.buffers {
6310                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6311                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6312                    project.buffer_for_id(buffer_id, cx).expect(&format!(
6313                        "host does not have buffer for guest:{}, peer:{}, id:{}",
6314                        guest_client.username, guest_client.peer_id, buffer_id
6315                    ))
6316                });
6317                let path = host_buffer
6318                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6319
6320                assert_eq!(
6321                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6322                    0,
6323                    "{}, buffer {}, path {:?} has deferred operations",
6324                    guest_client.username,
6325                    buffer_id,
6326                    path,
6327                );
6328                assert_eq!(
6329                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6330                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6331                    "{}, buffer {}, path {:?}, differs from the host's buffer",
6332                    guest_client.username,
6333                    buffer_id,
6334                    path
6335                );
6336            }
6337
6338            guest_cx.update(|_| drop(guest_client));
6339        }
6340
6341        host_cx.update(|_| drop(host_client));
6342    }
6343
6344    struct TestServer {
6345        peer: Arc<Peer>,
6346        app_state: Arc<AppState>,
6347        server: Arc<Server>,
6348        foreground: Rc<executor::Foreground>,
6349        notifications: mpsc::UnboundedReceiver<()>,
6350        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6351        forbid_connections: Arc<AtomicBool>,
6352        _test_db: TestDb,
6353    }
6354
6355    impl TestServer {
6356        async fn start(
6357            foreground: Rc<executor::Foreground>,
6358            background: Arc<executor::Background>,
6359        ) -> Self {
6360            let test_db = TestDb::fake(background);
6361            let app_state = Self::build_app_state(&test_db).await;
6362            let peer = Peer::new();
6363            let notifications = mpsc::unbounded();
6364            let server = Server::new(app_state.clone(), Some(notifications.0));
6365            Self {
6366                peer,
6367                app_state,
6368                server,
6369                foreground,
6370                notifications: notifications.1,
6371                connection_killers: Default::default(),
6372                forbid_connections: Default::default(),
6373                _test_db: test_db,
6374            }
6375        }
6376
6377        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6378            cx.update(|cx| {
6379                let settings = Settings::test(cx);
6380                cx.set_global(settings);
6381            });
6382
6383            let http = FakeHttpClient::with_404_response();
6384            let user_id =
6385                if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6386                    user.id
6387                } else {
6388                    self.app_state.db.create_user(name, false).await.unwrap()
6389                };
6390            let client_name = name.to_string();
6391            let mut client = Client::new(http.clone());
6392            let server = self.server.clone();
6393            let connection_killers = self.connection_killers.clone();
6394            let forbid_connections = self.forbid_connections.clone();
6395            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6396
6397            Arc::get_mut(&mut client)
6398                .unwrap()
6399                .override_authenticate(move |cx| {
6400                    cx.spawn(|_| async move {
6401                        let access_token = "the-token".to_string();
6402                        Ok(Credentials {
6403                            user_id: user_id.0 as u64,
6404                            access_token,
6405                        })
6406                    })
6407                })
6408                .override_establish_connection(move |credentials, cx| {
6409                    assert_eq!(credentials.user_id, user_id.0 as u64);
6410                    assert_eq!(credentials.access_token, "the-token");
6411
6412                    let server = server.clone();
6413                    let connection_killers = connection_killers.clone();
6414                    let forbid_connections = forbid_connections.clone();
6415                    let client_name = client_name.clone();
6416                    let connection_id_tx = connection_id_tx.clone();
6417                    cx.spawn(move |cx| async move {
6418                        if forbid_connections.load(SeqCst) {
6419                            Err(EstablishConnectionError::other(anyhow!(
6420                                "server is forbidding connections"
6421                            )))
6422                        } else {
6423                            let (client_conn, server_conn, killed) =
6424                                Connection::in_memory(cx.background());
6425                            connection_killers.lock().insert(user_id, killed);
6426                            cx.background()
6427                                .spawn(server.handle_connection(
6428                                    server_conn,
6429                                    client_name,
6430                                    user_id,
6431                                    Some(connection_id_tx),
6432                                    cx.background(),
6433                                ))
6434                                .detach();
6435                            Ok(client_conn)
6436                        }
6437                    })
6438                });
6439
6440            Channel::init(&client);
6441            Project::init(&client);
6442            cx.update(|cx| {
6443                workspace::init(&client, cx);
6444            });
6445
6446            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6447            client
6448                .authenticate_and_connect(false, &cx.to_async())
6449                .await
6450                .unwrap();
6451            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6452
6453            let client = TestClient {
6454                client,
6455                peer_id,
6456                username: name.to_string(),
6457                user_store,
6458                language_registry: Arc::new(LanguageRegistry::test()),
6459                project: Default::default(),
6460                buffers: Default::default(),
6461            };
6462            client.wait_for_current_user(cx).await;
6463            client
6464        }
6465
6466        fn disconnect_client(&self, user_id: UserId) {
6467            self.connection_killers
6468                .lock()
6469                .remove(&user_id)
6470                .unwrap()
6471                .store(true, SeqCst);
6472        }
6473
6474        fn forbid_connections(&self) {
6475            self.forbid_connections.store(true, SeqCst);
6476        }
6477
6478        fn allow_connections(&self) {
6479            self.forbid_connections.store(false, SeqCst);
6480        }
6481
6482        async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6483            while let Some((client_a, cx_a)) = clients.pop() {
6484                for (client_b, cx_b) in &mut clients {
6485                    client_a
6486                        .user_store
6487                        .update(cx_a, |store, cx| {
6488                            store.request_contact(client_b.user_id().unwrap(), cx)
6489                        })
6490                        .await
6491                        .unwrap();
6492                    cx_a.foreground().run_until_parked();
6493                    client_b
6494                        .user_store
6495                        .update(*cx_b, |store, cx| {
6496                            store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6497                        })
6498                        .await
6499                        .unwrap();
6500                }
6501            }
6502        }
6503
6504        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6505            Arc::new(AppState {
6506                db: test_db.db().clone(),
6507                api_token: Default::default(),
6508            })
6509        }
6510
6511        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6512            self.server.store.read().await
6513        }
6514
6515        async fn condition<F>(&mut self, mut predicate: F)
6516        where
6517            F: FnMut(&Store) -> bool,
6518        {
6519            assert!(
6520                self.foreground.parking_forbidden(),
6521                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6522            );
6523            while !(predicate)(&*self.server.store.read().await) {
6524                self.foreground.start_waiting();
6525                self.notifications.next().await;
6526                self.foreground.finish_waiting();
6527            }
6528        }
6529    }
6530
6531    impl Deref for TestServer {
6532        type Target = Server;
6533
6534        fn deref(&self) -> &Self::Target {
6535            &self.server
6536        }
6537    }
6538
6539    impl Drop for TestServer {
6540        fn drop(&mut self) {
6541            self.peer.reset();
6542        }
6543    }
6544
6545    struct TestClient {
6546        client: Arc<Client>,
6547        username: String,
6548        pub peer_id: PeerId,
6549        pub user_store: ModelHandle<UserStore>,
6550        language_registry: Arc<LanguageRegistry>,
6551        project: Option<ModelHandle<Project>>,
6552        buffers: HashSet<ModelHandle<language::Buffer>>,
6553    }
6554
6555    impl Deref for TestClient {
6556        type Target = Arc<Client>;
6557
6558        fn deref(&self) -> &Self::Target {
6559            &self.client
6560        }
6561    }
6562
6563    struct ContactsSummary {
6564        pub current: Vec<String>,
6565        pub outgoing_requests: Vec<String>,
6566        pub incoming_requests: Vec<String>,
6567    }
6568
6569    impl TestClient {
6570        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6571            UserId::from_proto(
6572                self.user_store
6573                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6574            )
6575        }
6576
6577        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6578            let mut authed_user = self
6579                .user_store
6580                .read_with(cx, |user_store, _| user_store.watch_current_user());
6581            while authed_user.next().await.unwrap().is_none() {}
6582        }
6583
6584        async fn clear_contacts(&self, cx: &mut TestAppContext) {
6585            self.user_store
6586                .update(cx, |store, _| store.clear_contacts())
6587                .await;
6588        }
6589
6590        fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6591            self.user_store.read_with(cx, |store, _| ContactsSummary {
6592                current: store
6593                    .contacts()
6594                    .iter()
6595                    .map(|contact| contact.user.github_login.clone())
6596                    .collect(),
6597                outgoing_requests: store
6598                    .outgoing_contact_requests()
6599                    .iter()
6600                    .map(|user| user.github_login.clone())
6601                    .collect(),
6602                incoming_requests: store
6603                    .incoming_contact_requests()
6604                    .iter()
6605                    .map(|user| user.github_login.clone())
6606                    .collect(),
6607            })
6608        }
6609
6610        async fn build_local_project(
6611            &mut self,
6612            fs: Arc<FakeFs>,
6613            root_path: impl AsRef<Path>,
6614            cx: &mut TestAppContext,
6615        ) -> (ModelHandle<Project>, WorktreeId) {
6616            let project = cx.update(|cx| {
6617                Project::local(
6618                    self.client.clone(),
6619                    self.user_store.clone(),
6620                    self.language_registry.clone(),
6621                    fs,
6622                    cx,
6623                )
6624            });
6625            self.project = Some(project.clone());
6626            let (worktree, _) = project
6627                .update(cx, |p, cx| {
6628                    p.find_or_create_local_worktree(root_path, true, cx)
6629                })
6630                .await
6631                .unwrap();
6632            worktree
6633                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6634                .await;
6635            project
6636                .update(cx, |project, _| project.next_remote_id())
6637                .await;
6638            (project, worktree.read_with(cx, |tree, _| tree.id()))
6639        }
6640
6641        async fn build_remote_project(
6642            &mut self,
6643            host_project: &ModelHandle<Project>,
6644            host_cx: &mut TestAppContext,
6645            guest_cx: &mut TestAppContext,
6646        ) -> ModelHandle<Project> {
6647            let host_project_id = host_project
6648                .read_with(host_cx, |project, _| project.next_remote_id())
6649                .await;
6650            let guest_user_id = self.user_id().unwrap();
6651            let languages =
6652                host_project.read_with(host_cx, |project, _| project.languages().clone());
6653            let project_b = guest_cx.spawn(|mut cx| {
6654                let user_store = self.user_store.clone();
6655                let guest_client = self.client.clone();
6656                async move {
6657                    Project::remote(
6658                        host_project_id,
6659                        guest_client,
6660                        user_store.clone(),
6661                        languages,
6662                        FakeFs::new(cx.background()),
6663                        &mut cx,
6664                    )
6665                    .await
6666                    .unwrap()
6667                }
6668            });
6669            host_cx.foreground().run_until_parked();
6670            host_project.update(host_cx, |project, cx| {
6671                project.respond_to_join_request(guest_user_id, true, cx)
6672            });
6673            let project = project_b.await;
6674            self.project = Some(project.clone());
6675            project
6676        }
6677
6678        fn build_workspace(
6679            &self,
6680            project: &ModelHandle<Project>,
6681            cx: &mut TestAppContext,
6682        ) -> ViewHandle<Workspace> {
6683            let (window_id, _) = cx.add_window(|_| EmptyView);
6684            cx.add_view(window_id, |cx| {
6685                let fs = project.read(cx).fs().clone();
6686                Workspace::new(
6687                    &WorkspaceParams {
6688                        fs,
6689                        project: project.clone(),
6690                        user_store: self.user_store.clone(),
6691                        languages: self.language_registry.clone(),
6692                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6693                        channel_list: cx.add_model(|cx| {
6694                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6695                        }),
6696                        client: self.client.clone(),
6697                    },
6698                    cx,
6699                )
6700            })
6701        }
6702
6703        async fn simulate_host(
6704            mut self,
6705            project: ModelHandle<Project>,
6706            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6707            rng: Arc<Mutex<StdRng>>,
6708            mut cx: TestAppContext,
6709        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6710            async fn simulate_host_internal(
6711                client: &mut TestClient,
6712                project: ModelHandle<Project>,
6713                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6714                rng: Arc<Mutex<StdRng>>,
6715                cx: &mut TestAppContext,
6716            ) -> anyhow::Result<()> {
6717                let fs = project.read_with(cx, |project, _| project.fs().clone());
6718
6719                cx.update(|cx| {
6720                    cx.subscribe(&project, move |project, event, cx| {
6721                        if let project::Event::ContactRequestedJoin(user) = event {
6722                            log::info!("Host: accepting join request from {}", user.github_login);
6723                            project.update(cx, |project, cx| {
6724                                project.respond_to_join_request(user.id, true, cx)
6725                            });
6726                        }
6727                    })
6728                    .detach();
6729                });
6730
6731                while op_start_signal.next().await.is_some() {
6732                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6733                    let files = fs.as_fake().files().await;
6734                    match distribution {
6735                        0..=19 if !files.is_empty() => {
6736                            let path = files.choose(&mut *rng.lock()).unwrap();
6737                            let mut path = path.as_path();
6738                            while let Some(parent_path) = path.parent() {
6739                                path = parent_path;
6740                                if rng.lock().gen() {
6741                                    break;
6742                                }
6743                            }
6744
6745                            log::info!("Host: find/create local worktree {:?}", path);
6746                            let find_or_create_worktree = project.update(cx, |project, cx| {
6747                                project.find_or_create_local_worktree(path, true, cx)
6748                            });
6749                            if rng.lock().gen() {
6750                                cx.background().spawn(find_or_create_worktree).detach();
6751                            } else {
6752                                find_or_create_worktree.await?;
6753                            }
6754                        }
6755                        20..=79 if !files.is_empty() => {
6756                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6757                                let file = files.choose(&mut *rng.lock()).unwrap();
6758                                let (worktree, path) = project
6759                                    .update(cx, |project, cx| {
6760                                        project.find_or_create_local_worktree(
6761                                            file.clone(),
6762                                            true,
6763                                            cx,
6764                                        )
6765                                    })
6766                                    .await?;
6767                                let project_path =
6768                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6769                                log::info!(
6770                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6771                                    file,
6772                                    project_path.0,
6773                                    project_path.1
6774                                );
6775                                let buffer = project
6776                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6777                                    .await
6778                                    .unwrap();
6779                                client.buffers.insert(buffer.clone());
6780                                buffer
6781                            } else {
6782                                client
6783                                    .buffers
6784                                    .iter()
6785                                    .choose(&mut *rng.lock())
6786                                    .unwrap()
6787                                    .clone()
6788                            };
6789
6790                            if rng.lock().gen_bool(0.1) {
6791                                cx.update(|cx| {
6792                                    log::info!(
6793                                        "Host: dropping buffer {:?}",
6794                                        buffer.read(cx).file().unwrap().full_path(cx)
6795                                    );
6796                                    client.buffers.remove(&buffer);
6797                                    drop(buffer);
6798                                });
6799                            } else {
6800                                buffer.update(cx, |buffer, cx| {
6801                                    log::info!(
6802                                        "Host: updating buffer {:?} ({})",
6803                                        buffer.file().unwrap().full_path(cx),
6804                                        buffer.remote_id()
6805                                    );
6806
6807                                    if rng.lock().gen_bool(0.7) {
6808                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6809                                    } else {
6810                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6811                                    }
6812                                });
6813                            }
6814                        }
6815                        _ => loop {
6816                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6817                            let mut path = PathBuf::new();
6818                            path.push("/");
6819                            for _ in 0..path_component_count {
6820                                let letter = rng.lock().gen_range(b'a'..=b'z');
6821                                path.push(std::str::from_utf8(&[letter]).unwrap());
6822                            }
6823                            path.set_extension("rs");
6824                            let parent_path = path.parent().unwrap();
6825
6826                            log::info!("Host: creating file {:?}", path,);
6827
6828                            if fs.create_dir(&parent_path).await.is_ok()
6829                                && fs.create_file(&path, Default::default()).await.is_ok()
6830                            {
6831                                break;
6832                            } else {
6833                                log::info!("Host: cannot create file");
6834                            }
6835                        },
6836                    }
6837
6838                    cx.background().simulate_random_delay().await;
6839                }
6840
6841                Ok(())
6842            }
6843
6844            let result =
6845                simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
6846                    .await;
6847            log::info!("Host done");
6848            self.project = Some(project);
6849            (self, cx, result.err())
6850        }
6851
6852        pub async fn simulate_guest(
6853            mut self,
6854            guest_username: String,
6855            project: ModelHandle<Project>,
6856            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6857            rng: Arc<Mutex<StdRng>>,
6858            mut cx: TestAppContext,
6859        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6860            async fn simulate_guest_internal(
6861                client: &mut TestClient,
6862                guest_username: &str,
6863                project: ModelHandle<Project>,
6864                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6865                rng: Arc<Mutex<StdRng>>,
6866                cx: &mut TestAppContext,
6867            ) -> anyhow::Result<()> {
6868                while op_start_signal.next().await.is_some() {
6869                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6870                        let worktree = if let Some(worktree) =
6871                            project.read_with(cx, |project, cx| {
6872                                project
6873                                    .worktrees(&cx)
6874                                    .filter(|worktree| {
6875                                        let worktree = worktree.read(cx);
6876                                        worktree.is_visible()
6877                                            && worktree.entries(false).any(|e| e.is_file())
6878                                    })
6879                                    .choose(&mut *rng.lock())
6880                            }) {
6881                            worktree
6882                        } else {
6883                            cx.background().simulate_random_delay().await;
6884                            continue;
6885                        };
6886
6887                        let (worktree_root_name, project_path) =
6888                            worktree.read_with(cx, |worktree, _| {
6889                                let entry = worktree
6890                                    .entries(false)
6891                                    .filter(|e| e.is_file())
6892                                    .choose(&mut *rng.lock())
6893                                    .unwrap();
6894                                (
6895                                    worktree.root_name().to_string(),
6896                                    (worktree.id(), entry.path.clone()),
6897                                )
6898                            });
6899                        log::info!(
6900                            "{}: opening path {:?} in worktree {} ({})",
6901                            guest_username,
6902                            project_path.1,
6903                            project_path.0,
6904                            worktree_root_name,
6905                        );
6906                        let buffer = project
6907                            .update(cx, |project, cx| {
6908                                project.open_buffer(project_path.clone(), cx)
6909                            })
6910                            .await?;
6911                        log::info!(
6912                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6913                            guest_username,
6914                            project_path.1,
6915                            project_path.0,
6916                            worktree_root_name,
6917                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6918                        );
6919                        client.buffers.insert(buffer.clone());
6920                        buffer
6921                    } else {
6922                        client
6923                            .buffers
6924                            .iter()
6925                            .choose(&mut *rng.lock())
6926                            .unwrap()
6927                            .clone()
6928                    };
6929
6930                    let choice = rng.lock().gen_range(0..100);
6931                    match choice {
6932                        0..=9 => {
6933                            cx.update(|cx| {
6934                                log::info!(
6935                                    "{}: dropping buffer {:?}",
6936                                    guest_username,
6937                                    buffer.read(cx).file().unwrap().full_path(cx)
6938                                );
6939                                client.buffers.remove(&buffer);
6940                                drop(buffer);
6941                            });
6942                        }
6943                        10..=19 => {
6944                            let completions = project.update(cx, |project, cx| {
6945                                log::info!(
6946                                    "{}: requesting completions for buffer {} ({:?})",
6947                                    guest_username,
6948                                    buffer.read(cx).remote_id(),
6949                                    buffer.read(cx).file().unwrap().full_path(cx)
6950                                );
6951                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6952                                project.completions(&buffer, offset, cx)
6953                            });
6954                            let completions = cx.background().spawn(async move {
6955                                completions
6956                                    .await
6957                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6958                            });
6959                            if rng.lock().gen_bool(0.3) {
6960                                log::info!("{}: detaching completions request", guest_username);
6961                                cx.update(|cx| completions.detach_and_log_err(cx));
6962                            } else {
6963                                completions.await?;
6964                            }
6965                        }
6966                        20..=29 => {
6967                            let code_actions = project.update(cx, |project, cx| {
6968                                log::info!(
6969                                    "{}: requesting code actions for buffer {} ({:?})",
6970                                    guest_username,
6971                                    buffer.read(cx).remote_id(),
6972                                    buffer.read(cx).file().unwrap().full_path(cx)
6973                                );
6974                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6975                                project.code_actions(&buffer, range, cx)
6976                            });
6977                            let code_actions = cx.background().spawn(async move {
6978                                code_actions.await.map_err(|err| {
6979                                    anyhow!("code actions request failed: {:?}", err)
6980                                })
6981                            });
6982                            if rng.lock().gen_bool(0.3) {
6983                                log::info!("{}: detaching code actions request", guest_username);
6984                                cx.update(|cx| code_actions.detach_and_log_err(cx));
6985                            } else {
6986                                code_actions.await?;
6987                            }
6988                        }
6989                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6990                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6991                                log::info!(
6992                                    "{}: saving buffer {} ({:?})",
6993                                    guest_username,
6994                                    buffer.remote_id(),
6995                                    buffer.file().unwrap().full_path(cx)
6996                                );
6997                                (buffer.version(), buffer.save(cx))
6998                            });
6999                            let save = cx.background().spawn(async move {
7000                                let (saved_version, _) = save
7001                                    .await
7002                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7003                                assert!(saved_version.observed_all(&requested_version));
7004                                Ok::<_, anyhow::Error>(())
7005                            });
7006                            if rng.lock().gen_bool(0.3) {
7007                                log::info!("{}: detaching save request", guest_username);
7008                                cx.update(|cx| save.detach_and_log_err(cx));
7009                            } else {
7010                                save.await?;
7011                            }
7012                        }
7013                        40..=44 => {
7014                            let prepare_rename = project.update(cx, |project, cx| {
7015                                log::info!(
7016                                    "{}: preparing rename for buffer {} ({:?})",
7017                                    guest_username,
7018                                    buffer.read(cx).remote_id(),
7019                                    buffer.read(cx).file().unwrap().full_path(cx)
7020                                );
7021                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7022                                project.prepare_rename(buffer, offset, cx)
7023                            });
7024                            let prepare_rename = cx.background().spawn(async move {
7025                                prepare_rename.await.map_err(|err| {
7026                                    anyhow!("prepare rename request failed: {:?}", err)
7027                                })
7028                            });
7029                            if rng.lock().gen_bool(0.3) {
7030                                log::info!("{}: detaching prepare rename request", guest_username);
7031                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7032                            } else {
7033                                prepare_rename.await?;
7034                            }
7035                        }
7036                        45..=49 => {
7037                            let definitions = project.update(cx, |project, cx| {
7038                                log::info!(
7039                                    "{}: requesting definitions for buffer {} ({:?})",
7040                                    guest_username,
7041                                    buffer.read(cx).remote_id(),
7042                                    buffer.read(cx).file().unwrap().full_path(cx)
7043                                );
7044                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7045                                project.definition(&buffer, offset, cx)
7046                            });
7047                            let definitions = cx.background().spawn(async move {
7048                                definitions
7049                                    .await
7050                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7051                            });
7052                            if rng.lock().gen_bool(0.3) {
7053                                log::info!("{}: detaching definitions request", guest_username);
7054                                cx.update(|cx| definitions.detach_and_log_err(cx));
7055                            } else {
7056                                client
7057                                    .buffers
7058                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7059                            }
7060                        }
7061                        50..=54 => {
7062                            let highlights = project.update(cx, |project, cx| {
7063                                log::info!(
7064                                    "{}: requesting highlights for buffer {} ({:?})",
7065                                    guest_username,
7066                                    buffer.read(cx).remote_id(),
7067                                    buffer.read(cx).file().unwrap().full_path(cx)
7068                                );
7069                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7070                                project.document_highlights(&buffer, offset, cx)
7071                            });
7072                            let highlights = cx.background().spawn(async move {
7073                                highlights
7074                                    .await
7075                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7076                            });
7077                            if rng.lock().gen_bool(0.3) {
7078                                log::info!("{}: detaching highlights request", guest_username);
7079                                cx.update(|cx| highlights.detach_and_log_err(cx));
7080                            } else {
7081                                highlights.await?;
7082                            }
7083                        }
7084                        55..=59 => {
7085                            let search = project.update(cx, |project, cx| {
7086                                let query = rng.lock().gen_range('a'..='z');
7087                                log::info!("{}: project-wide search {:?}", guest_username, query);
7088                                project.search(SearchQuery::text(query, false, false), cx)
7089                            });
7090                            let search = cx.background().spawn(async move {
7091                                search
7092                                    .await
7093                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
7094                            });
7095                            if rng.lock().gen_bool(0.3) {
7096                                log::info!("{}: detaching search request", guest_username);
7097                                cx.update(|cx| search.detach_and_log_err(cx));
7098                            } else {
7099                                client.buffers.extend(search.await?.into_keys());
7100                            }
7101                        }
7102                        60..=69 => {
7103                            let worktree = project
7104                                .read_with(cx, |project, cx| {
7105                                    project
7106                                        .worktrees(&cx)
7107                                        .filter(|worktree| {
7108                                            let worktree = worktree.read(cx);
7109                                            worktree.is_visible()
7110                                                && worktree.entries(false).any(|e| e.is_file())
7111                                                && worktree
7112                                                    .root_entry()
7113                                                    .map_or(false, |e| e.is_dir())
7114                                        })
7115                                        .choose(&mut *rng.lock())
7116                                })
7117                                .unwrap();
7118                            let (worktree_id, worktree_root_name) = worktree
7119                                .read_with(cx, |worktree, _| {
7120                                    (worktree.id(), worktree.root_name().to_string())
7121                                });
7122
7123                            let mut new_name = String::new();
7124                            for _ in 0..10 {
7125                                let letter = rng.lock().gen_range('a'..='z');
7126                                new_name.push(letter);
7127                            }
7128                            let mut new_path = PathBuf::new();
7129                            new_path.push(new_name);
7130                            new_path.set_extension("rs");
7131                            log::info!(
7132                                "{}: creating {:?} in worktree {} ({})",
7133                                guest_username,
7134                                new_path,
7135                                worktree_id,
7136                                worktree_root_name,
7137                            );
7138                            project
7139                                .update(cx, |project, cx| {
7140                                    project.create_entry((worktree_id, new_path), false, cx)
7141                                })
7142                                .unwrap()
7143                                .await?;
7144                        }
7145                        _ => {
7146                            buffer.update(cx, |buffer, cx| {
7147                                log::info!(
7148                                    "{}: updating buffer {} ({:?})",
7149                                    guest_username,
7150                                    buffer.remote_id(),
7151                                    buffer.file().unwrap().full_path(cx)
7152                                );
7153                                if rng.lock().gen_bool(0.7) {
7154                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7155                                } else {
7156                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7157                                }
7158                            });
7159                        }
7160                    }
7161                    cx.background().simulate_random_delay().await;
7162                }
7163                Ok(())
7164            }
7165
7166            let result = simulate_guest_internal(
7167                &mut self,
7168                &guest_username,
7169                project.clone(),
7170                op_start_signal,
7171                rng,
7172                &mut cx,
7173            )
7174            .await;
7175            log::info!("{}: done", guest_username);
7176
7177            self.project = Some(project);
7178            (self, cx, result.err())
7179        }
7180    }
7181
7182    impl Drop for TestClient {
7183        fn drop(&mut self) {
7184            self.client.tear_down();
7185        }
7186    }
7187
7188    impl Executor for Arc<gpui::executor::Background> {
7189        type Sleep = gpui::executor::Timer;
7190
7191        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7192            self.spawn(future).detach();
7193        }
7194
7195        fn sleep(&self, duration: Duration) -> Self::Sleep {
7196            self.as_ref().timer(duration)
7197        }
7198    }
7199
7200    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7201        channel
7202            .messages()
7203            .cursor::<()>()
7204            .map(|m| {
7205                (
7206                    m.sender.github_login.clone(),
7207                    m.body.clone(),
7208                    m.is_pending(),
7209                )
7210            })
7211            .collect()
7212    }
7213
7214    struct EmptyView;
7215
7216    impl gpui::Entity for EmptyView {
7217        type Event = ();
7218    }
7219
7220    impl gpui::View for EmptyView {
7221        fn ui_name() -> &'static str {
7222            "empty view"
7223        }
7224
7225        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7226            gpui::Element::boxed(gpui::elements::Empty::new())
7227        }
7228    }
7229}