rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{self, ChannelId, MessageId, User, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::IntoResponse,
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::HashMap;
  26use futures::{
  27    channel::mpsc,
  28    future::{self, BoxFuture},
  29    FutureExt, SinkExt, StreamExt, TryStreamExt,
  30};
  31use lazy_static::lazy_static;
  32use rpc::{
  33    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  34    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  35};
  36use std::{
  37    any::TypeId,
  38    future::Future,
  39    marker::PhantomData,
  40    net::SocketAddr,
  41    ops::{Deref, DerefMut},
  42    rc::Rc,
  43    sync::{
  44        atomic::{AtomicBool, Ordering::SeqCst},
  45        Arc,
  46    },
  47    time::Duration,
  48};
  49use store::{Store, Worktree};
  50use time::OffsetDateTime;
  51use tokio::{
  52    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  53    time::Sleep,
  54};
  55use tower::ServiceBuilder;
  56use tracing::{info_span, instrument, Instrument};
  57
  58type MessageHandler =
  59    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  60
  61struct Response<R> {
  62    server: Arc<Server>,
  63    receipt: Receipt<R>,
  64    responded: Arc<AtomicBool>,
  65}
  66
  67impl<R: RequestMessage> Response<R> {
  68    fn send(self, payload: R::Response) -> Result<()> {
  69        self.responded.store(true, SeqCst);
  70        self.server.peer.respond(self.receipt, payload)?;
  71        Ok(())
  72    }
  73
  74    fn into_receipt(self) -> Receipt<R> {
  75        self.responded.store(true, SeqCst);
  76        self.receipt
  77    }
  78}
  79
  80pub struct Server {
  81    peer: Arc<Peer>,
  82    store: RwLock<Store>,
  83    app_state: Arc<AppState>,
  84    handlers: HashMap<TypeId, MessageHandler>,
  85    notifications: Option<mpsc::UnboundedSender<()>>,
  86}
  87
  88pub trait Executor: Send + Clone {
  89    type Sleep: Send + Future;
  90    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  91    fn sleep(&self, duration: Duration) -> Self::Sleep;
  92}
  93
  94#[derive(Clone)]
  95pub struct RealExecutor;
  96
  97const MESSAGE_COUNT_PER_PAGE: usize = 100;
  98const MAX_MESSAGE_LEN: usize = 1024;
  99
 100struct StoreReadGuard<'a> {
 101    guard: RwLockReadGuard<'a, Store>,
 102    _not_send: PhantomData<Rc<()>>,
 103}
 104
 105struct StoreWriteGuard<'a> {
 106    guard: RwLockWriteGuard<'a, Store>,
 107    _not_send: PhantomData<Rc<()>>,
 108}
 109
 110impl Server {
 111    pub fn new(
 112        app_state: Arc<AppState>,
 113        notifications: Option<mpsc::UnboundedSender<()>>,
 114    ) -> Arc<Self> {
 115        let mut server = Self {
 116            peer: Peer::new(),
 117            app_state,
 118            store: Default::default(),
 119            handlers: Default::default(),
 120            notifications,
 121        };
 122
 123        server
 124            .add_request_handler(Server::ping)
 125            .add_request_handler(Server::register_project)
 126            .add_message_handler(Server::unregister_project)
 127            .add_request_handler(Server::join_project)
 128            .add_message_handler(Server::leave_project)
 129            .add_message_handler(Server::respond_to_join_project_request)
 130            .add_request_handler(Server::register_worktree)
 131            .add_message_handler(Server::unregister_worktree)
 132            .add_request_handler(Server::update_worktree)
 133            .add_message_handler(Server::start_language_server)
 134            .add_message_handler(Server::update_language_server)
 135            .add_message_handler(Server::update_diagnostic_summary)
 136            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 137            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 138            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 139            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 140            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 141            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 142            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 143            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 144            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 145            .add_request_handler(
 146                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 147            )
 148            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 149            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 150            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 151            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 152            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 153            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 154            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 155            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 156            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 157            .add_request_handler(Server::update_buffer)
 158            .add_message_handler(Server::update_buffer_file)
 159            .add_message_handler(Server::buffer_reloaded)
 160            .add_message_handler(Server::buffer_saved)
 161            .add_request_handler(Server::save_buffer)
 162            .add_request_handler(Server::get_channels)
 163            .add_request_handler(Server::get_users)
 164            .add_request_handler(Server::fuzzy_search_users)
 165            .add_request_handler(Server::request_contact)
 166            .add_request_handler(Server::remove_contact)
 167            .add_request_handler(Server::respond_to_contact_request)
 168            .add_request_handler(Server::join_channel)
 169            .add_message_handler(Server::leave_channel)
 170            .add_request_handler(Server::send_channel_message)
 171            .add_request_handler(Server::follow)
 172            .add_message_handler(Server::unfollow)
 173            .add_message_handler(Server::update_followers)
 174            .add_request_handler(Server::get_channel_messages);
 175
 176        Arc::new(server)
 177    }
 178
 179    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 180    where
 181        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 182        Fut: 'static + Send + Future<Output = Result<()>>,
 183        M: EnvelopedMessage,
 184    {
 185        let prev_handler = self.handlers.insert(
 186            TypeId::of::<M>(),
 187            Box::new(move |server, envelope| {
 188                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 189                let span = info_span!(
 190                    "handle message",
 191                    payload_type = envelope.payload_type_name(),
 192                    payload = format!("{:?}", envelope.payload).as_str(),
 193                );
 194                let future = (handler)(server, *envelope);
 195                async move {
 196                    if let Err(error) = future.await {
 197                        tracing::error!(%error, "error handling message");
 198                    }
 199                }
 200                .instrument(span)
 201                .boxed()
 202            }),
 203        );
 204        if prev_handler.is_some() {
 205            panic!("registered a handler for the same message twice");
 206        }
 207        self
 208    }
 209
 210    /// Handle a request while holding a lock to the store. This is useful when we're registering
 211    /// a connection but we want to respond on the connection before anybody else can send on it.
 212    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 213    where
 214        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
 215        Fut: Send + Future<Output = Result<()>>,
 216        M: RequestMessage,
 217    {
 218        let handler = Arc::new(handler);
 219        self.add_message_handler(move |server, envelope| {
 220            let receipt = envelope.receipt();
 221            let handler = handler.clone();
 222            async move {
 223                let responded = Arc::new(AtomicBool::default());
 224                let response = Response {
 225                    server: server.clone(),
 226                    responded: responded.clone(),
 227                    receipt: envelope.receipt(),
 228                };
 229                match (handler)(server.clone(), envelope, response).await {
 230                    Ok(()) => {
 231                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 232                            Ok(())
 233                        } else {
 234                            Err(anyhow!("handler did not send a response"))?
 235                        }
 236                    }
 237                    Err(error) => {
 238                        server.peer.respond_with_error(
 239                            receipt,
 240                            proto::Error {
 241                                message: error.to_string(),
 242                            },
 243                        )?;
 244                        Err(error)
 245                    }
 246                }
 247            }
 248        })
 249    }
 250
 251    pub fn handle_connection<E: Executor>(
 252        self: &Arc<Self>,
 253        connection: Connection,
 254        address: String,
 255        user: User,
 256        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 257        executor: E,
 258    ) -> impl Future<Output = Result<()>> {
 259        let mut this = self.clone();
 260        let user_id = user.id;
 261        let login = user.github_login;
 262        let span = info_span!("handle connection", %user_id, %login, %address);
 263        async move {
 264            let (connection_id, handle_io, mut incoming_rx) = this
 265                .peer
 266                .add_connection(connection, {
 267                    let executor = executor.clone();
 268                    move |duration| {
 269                        let timer = executor.sleep(duration);
 270                        async move {
 271                            timer.await;
 272                        }
 273                    }
 274                })
 275                .await;
 276
 277            tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
 278
 279            if let Some(send_connection_id) = send_connection_id.as_mut() {
 280                let _ = send_connection_id.send(connection_id).await;
 281            }
 282
 283            let (contacts, invite_code) = future::try_join(
 284                this.app_state.db.get_contacts(user_id),
 285                this.app_state.db.get_invite_code_for_user(user_id)
 286            ).await?;
 287
 288            {
 289                let mut store = this.store_mut().await;
 290                store.add_connection(connection_id, user_id);
 291                this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 292                
 293                if let Some((code, count)) = invite_code {
 294                    this.peer.send(connection_id, proto::UpdateInviteInfo {
 295                        url: format!("{}{}", this.app_state.invite_link_prefix, code),
 296                        count,
 297                    })?;
 298                }
 299            }
 300            this.update_user_contacts(user_id).await?;
 301
 302            let handle_io = handle_io.fuse();
 303            futures::pin_mut!(handle_io);
 304            loop {
 305                let next_message = incoming_rx.next().fuse();
 306                futures::pin_mut!(next_message);
 307                futures::select_biased! {
 308                    result = handle_io => {
 309                        if let Err(error) = result {
 310                            tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
 311                        }
 312                        break;
 313                    }
 314                    message = next_message => {
 315                        if let Some(message) = message {
 316                            let type_name = message.payload_type_name();
 317                            let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
 318                            async {
 319                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 320                                    let notifications = this.notifications.clone();
 321                                    let is_background = message.is_background();
 322                                    let handle_message = (handler)(this.clone(), message);
 323                                    let handle_message = async move {
 324                                        handle_message.await;
 325                                        if let Some(mut notifications) = notifications {
 326                                            let _ = notifications.send(()).await;
 327                                        }
 328                                    };
 329                                    if is_background {
 330                                        executor.spawn_detached(handle_message);
 331                                    } else {
 332                                        handle_message.await;
 333                                    }
 334                                } else {
 335                                    tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
 336                                }
 337                            }.instrument(span).await;
 338                        } else {
 339                            tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
 340                            break;
 341                        }
 342                    }
 343                }
 344            }
 345
 346            tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
 347            if let Err(error) = this.sign_out(connection_id).await {
 348                tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
 349            }
 350
 351            Ok(())
 352        }.instrument(span)
 353    }
 354
 355    #[instrument(skip(self), err)]
 356    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 357        self.peer.disconnect(connection_id);
 358
 359        let removed_user_id = {
 360            let mut store = self.store_mut().await;
 361            let removed_connection = store.remove_connection(connection_id)?;
 362
 363            for (project_id, project) in removed_connection.hosted_projects {
 364                broadcast(connection_id, project.guests.keys().copied(), |conn_id| {
 365                    self.peer
 366                        .send(conn_id, proto::UnregisterProject { project_id })
 367                });
 368
 369                for (_, receipts) in project.join_requests {
 370                    for receipt in receipts {
 371                        self.peer.respond(
 372                            receipt,
 373                            proto::JoinProjectResponse {
 374                                variant: Some(proto::join_project_response::Variant::Decline(
 375                                    proto::join_project_response::Decline {
 376                                        reason: proto::join_project_response::decline::Reason::WentOffline as i32
 377                                    },
 378                                )),
 379                            },
 380                        )?;
 381                    }
 382                }
 383            }
 384
 385            for project_id in removed_connection.guest_project_ids {
 386                if let Some(project) = store.project(project_id).trace_err() {
 387                    broadcast(connection_id, project.connection_ids(), |conn_id| {
 388                        self.peer.send(
 389                            conn_id,
 390                            proto::RemoveProjectCollaborator {
 391                                project_id,
 392                                peer_id: connection_id.0,
 393                            },
 394                        )
 395                    });
 396                    if project.guests.is_empty() {
 397                        self.peer
 398                            .send(
 399                                project.host_connection_id,
 400                                proto::ProjectUnshared { project_id },
 401                            )
 402                            .trace_err();
 403                    }
 404                }
 405            }
 406
 407            removed_connection.user_id
 408        };
 409
 410        self.update_user_contacts(removed_user_id).await?;
 411
 412        Ok(())
 413    }
 414
 415    async fn ping(
 416        self: Arc<Server>,
 417        _: TypedEnvelope<proto::Ping>,
 418        response: Response<proto::Ping>,
 419    ) -> Result<()> {
 420        response.send(proto::Ack {})?;
 421        Ok(())
 422    }
 423
 424    async fn register_project(
 425        self: Arc<Server>,
 426        request: TypedEnvelope<proto::RegisterProject>,
 427        response: Response<proto::RegisterProject>,
 428    ) -> Result<()> {
 429        let user_id;
 430        let project_id;
 431        {
 432            let mut state = self.store_mut().await;
 433            user_id = state.user_id_for_connection(request.sender_id)?;
 434            project_id = state.register_project(request.sender_id, user_id);
 435        };
 436        self.update_user_contacts(user_id).await?;
 437        response.send(proto::RegisterProjectResponse { project_id })?;
 438        Ok(())
 439    }
 440
 441    async fn unregister_project(
 442        self: Arc<Server>,
 443        request: TypedEnvelope<proto::UnregisterProject>,
 444    ) -> Result<()> {
 445        let (user_id, project) = {
 446            let mut state = self.store_mut().await;
 447            let project =
 448                state.unregister_project(request.payload.project_id, request.sender_id)?;
 449            (state.user_id_for_connection(request.sender_id)?, project)
 450        };
 451        for (_, receipts) in project.join_requests {
 452            for receipt in receipts {
 453                self.peer.respond(
 454                    receipt,
 455                    proto::JoinProjectResponse {
 456                        variant: Some(proto::join_project_response::Variant::Decline(
 457                            proto::join_project_response::Decline {
 458                                reason: proto::join_project_response::decline::Reason::Closed
 459                                    as i32,
 460                            },
 461                        )),
 462                    },
 463                )?;
 464            }
 465        }
 466
 467        self.update_user_contacts(user_id).await?;
 468        Ok(())
 469    }
 470
 471    async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
 472        let contacts = self.app_state.db.get_contacts(user_id).await?;
 473        let store = self.store().await;
 474        let updated_contact = store.contact_for_user(user_id, false);
 475        for contact in contacts {
 476            if let db::Contact::Accepted {
 477                user_id: contact_user_id,
 478                ..
 479            } = contact
 480            {
 481                for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
 482                    self.peer
 483                        .send(
 484                            contact_conn_id,
 485                            proto::UpdateContacts {
 486                                contacts: vec![updated_contact.clone()],
 487                                remove_contacts: Default::default(),
 488                                incoming_requests: Default::default(),
 489                                remove_incoming_requests: Default::default(),
 490                                outgoing_requests: Default::default(),
 491                                remove_outgoing_requests: Default::default(),
 492                            },
 493                        )
 494                        .trace_err();
 495                }
 496            }
 497        }
 498        Ok(())
 499    }
 500
 501    async fn join_project(
 502        self: Arc<Server>,
 503        request: TypedEnvelope<proto::JoinProject>,
 504        response: Response<proto::JoinProject>,
 505    ) -> Result<()> {
 506        let project_id = request.payload.project_id;
 507        let host_user_id;
 508        let guest_user_id;
 509        let host_connection_id;
 510        {
 511            let state = self.store().await;
 512            let project = state.project(project_id)?;
 513            host_user_id = project.host_user_id;
 514            host_connection_id = project.host_connection_id;
 515            guest_user_id = state.user_id_for_connection(request.sender_id)?;
 516        };
 517
 518        let has_contact = self
 519            .app_state
 520            .db
 521            .has_contact(guest_user_id, host_user_id)
 522            .await?;
 523        if !has_contact {
 524            return Err(anyhow!("no such project"))?;
 525        }
 526
 527        self.store_mut().await.request_join_project(
 528            guest_user_id,
 529            project_id,
 530            response.into_receipt(),
 531        )?;
 532        self.peer.send(
 533            host_connection_id,
 534            proto::RequestJoinProject {
 535                project_id,
 536                requester_id: guest_user_id.to_proto(),
 537            },
 538        )?;
 539        Ok(())
 540    }
 541
 542    async fn respond_to_join_project_request(
 543        self: Arc<Server>,
 544        request: TypedEnvelope<proto::RespondToJoinProjectRequest>,
 545    ) -> Result<()> {
 546        let host_user_id;
 547
 548        {
 549            let mut state = self.store_mut().await;
 550            let project_id = request.payload.project_id;
 551            let project = state.project(project_id)?;
 552            if project.host_connection_id != request.sender_id {
 553                Err(anyhow!("no such connection"))?;
 554            }
 555
 556            host_user_id = project.host_user_id;
 557            let guest_user_id = UserId::from_proto(request.payload.requester_id);
 558
 559            if !request.payload.allow {
 560                let receipts = state
 561                    .deny_join_project_request(request.sender_id, guest_user_id, project_id)
 562                    .ok_or_else(|| anyhow!("no such request"))?;
 563                for receipt in receipts {
 564                    self.peer.respond(
 565                        receipt,
 566                        proto::JoinProjectResponse {
 567                            variant: Some(proto::join_project_response::Variant::Decline(
 568                                proto::join_project_response::Decline {
 569                                    reason: proto::join_project_response::decline::Reason::Declined
 570                                        as i32,
 571                                },
 572                            )),
 573                        },
 574                    )?;
 575                }
 576                return Ok(());
 577            }
 578
 579            let (receipts_with_replica_ids, project) = state
 580                .accept_join_project_request(request.sender_id, guest_user_id, project_id)
 581                .ok_or_else(|| anyhow!("no such request"))?;
 582
 583            let peer_count = project.guests.len();
 584            let mut collaborators = Vec::with_capacity(peer_count);
 585            collaborators.push(proto::Collaborator {
 586                peer_id: project.host_connection_id.0,
 587                replica_id: 0,
 588                user_id: project.host_user_id.to_proto(),
 589            });
 590            let worktrees = project
 591                .worktrees
 592                .iter()
 593                .filter_map(|(id, shared_worktree)| {
 594                    let worktree = project.worktrees.get(&id)?;
 595                    Some(proto::Worktree {
 596                        id: *id,
 597                        root_name: worktree.root_name.clone(),
 598                        entries: shared_worktree.entries.values().cloned().collect(),
 599                        diagnostic_summaries: shared_worktree
 600                            .diagnostic_summaries
 601                            .values()
 602                            .cloned()
 603                            .collect(),
 604                        visible: worktree.visible,
 605                        scan_id: shared_worktree.scan_id,
 606                    })
 607                })
 608                .collect::<Vec<_>>();
 609
 610            // Add all guests other than the requesting user's own connections as collaborators
 611            for (peer_conn_id, (peer_replica_id, peer_user_id)) in &project.guests {
 612                if receipts_with_replica_ids
 613                    .iter()
 614                    .all(|(receipt, _)| receipt.sender_id != *peer_conn_id)
 615                {
 616                    collaborators.push(proto::Collaborator {
 617                        peer_id: peer_conn_id.0,
 618                        replica_id: *peer_replica_id as u32,
 619                        user_id: peer_user_id.to_proto(),
 620                    });
 621                }
 622            }
 623
 624            for conn_id in project.connection_ids() {
 625                for (receipt, replica_id) in &receipts_with_replica_ids {
 626                    if conn_id != receipt.sender_id {
 627                        self.peer.send(
 628                            conn_id,
 629                            proto::AddProjectCollaborator {
 630                                project_id,
 631                                collaborator: Some(proto::Collaborator {
 632                                    peer_id: receipt.sender_id.0,
 633                                    replica_id: *replica_id as u32,
 634                                    user_id: guest_user_id.to_proto(),
 635                                }),
 636                            },
 637                        )?;
 638                    }
 639                }
 640            }
 641
 642            for (receipt, replica_id) in receipts_with_replica_ids {
 643                self.peer.respond(
 644                    receipt,
 645                    proto::JoinProjectResponse {
 646                        variant: Some(proto::join_project_response::Variant::Accept(
 647                            proto::join_project_response::Accept {
 648                                worktrees: worktrees.clone(),
 649                                replica_id: replica_id as u32,
 650                                collaborators: collaborators.clone(),
 651                                language_servers: project.language_servers.clone(),
 652                            },
 653                        )),
 654                    },
 655                )?;
 656            }
 657        }
 658
 659        self.update_user_contacts(host_user_id).await?;
 660        Ok(())
 661    }
 662
 663    async fn leave_project(
 664        self: Arc<Server>,
 665        request: TypedEnvelope<proto::LeaveProject>,
 666    ) -> Result<()> {
 667        let sender_id = request.sender_id;
 668        let project_id = request.payload.project_id;
 669        let project;
 670        {
 671            let mut store = self.store_mut().await;
 672            project = store.leave_project(sender_id, project_id)?;
 673
 674            if project.remove_collaborator {
 675                broadcast(sender_id, project.connection_ids, |conn_id| {
 676                    self.peer.send(
 677                        conn_id,
 678                        proto::RemoveProjectCollaborator {
 679                            project_id,
 680                            peer_id: sender_id.0,
 681                        },
 682                    )
 683                });
 684            }
 685
 686            if let Some(requester_id) = project.cancel_request {
 687                self.peer.send(
 688                    project.host_connection_id,
 689                    proto::JoinProjectRequestCancelled {
 690                        project_id,
 691                        requester_id: requester_id.to_proto(),
 692                    },
 693                )?;
 694            }
 695
 696            if project.unshare {
 697                self.peer.send(
 698                    project.host_connection_id,
 699                    proto::ProjectUnshared { project_id },
 700                )?;
 701            }
 702        }
 703        self.update_user_contacts(project.host_user_id).await?;
 704        Ok(())
 705    }
 706
 707    async fn register_worktree(
 708        self: Arc<Server>,
 709        request: TypedEnvelope<proto::RegisterWorktree>,
 710        response: Response<proto::RegisterWorktree>,
 711    ) -> Result<()> {
 712        let host_user_id;
 713        {
 714            let mut state = self.store_mut().await;
 715            host_user_id = state.user_id_for_connection(request.sender_id)?;
 716
 717            let guest_connection_ids = state
 718                .read_project(request.payload.project_id, request.sender_id)?
 719                .guest_connection_ids();
 720            state.register_worktree(
 721                request.payload.project_id,
 722                request.payload.worktree_id,
 723                request.sender_id,
 724                Worktree {
 725                    root_name: request.payload.root_name.clone(),
 726                    visible: request.payload.visible,
 727                    ..Default::default()
 728                },
 729            )?;
 730
 731            broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 732                self.peer
 733                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 734            });
 735        }
 736        self.update_user_contacts(host_user_id).await?;
 737        response.send(proto::Ack {})?;
 738        Ok(())
 739    }
 740
 741    async fn unregister_worktree(
 742        self: Arc<Server>,
 743        request: TypedEnvelope<proto::UnregisterWorktree>,
 744    ) -> Result<()> {
 745        let host_user_id;
 746        let project_id = request.payload.project_id;
 747        let worktree_id = request.payload.worktree_id;
 748        {
 749            let mut state = self.store_mut().await;
 750            let (_, guest_connection_ids) =
 751                state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 752            host_user_id = state.user_id_for_connection(request.sender_id)?;
 753            broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 754                self.peer.send(
 755                    conn_id,
 756                    proto::UnregisterWorktree {
 757                        project_id,
 758                        worktree_id,
 759                    },
 760                )
 761            });
 762        }
 763        self.update_user_contacts(host_user_id).await?;
 764        Ok(())
 765    }
 766
 767    async fn update_worktree(
 768        self: Arc<Server>,
 769        request: TypedEnvelope<proto::UpdateWorktree>,
 770        response: Response<proto::UpdateWorktree>,
 771    ) -> Result<()> {
 772        let connection_ids = self.store_mut().await.update_worktree(
 773            request.sender_id,
 774            request.payload.project_id,
 775            request.payload.worktree_id,
 776            &request.payload.removed_entries,
 777            &request.payload.updated_entries,
 778            request.payload.scan_id,
 779        )?;
 780
 781        broadcast(request.sender_id, connection_ids, |connection_id| {
 782            self.peer
 783                .forward_send(request.sender_id, connection_id, request.payload.clone())
 784        });
 785        response.send(proto::Ack {})?;
 786        Ok(())
 787    }
 788
 789    async fn update_diagnostic_summary(
 790        self: Arc<Server>,
 791        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 792    ) -> Result<()> {
 793        let summary = request
 794            .payload
 795            .summary
 796            .clone()
 797            .ok_or_else(|| anyhow!("invalid summary"))?;
 798        let receiver_ids = self.store_mut().await.update_diagnostic_summary(
 799            request.payload.project_id,
 800            request.payload.worktree_id,
 801            request.sender_id,
 802            summary,
 803        )?;
 804
 805        broadcast(request.sender_id, receiver_ids, |connection_id| {
 806            self.peer
 807                .forward_send(request.sender_id, connection_id, request.payload.clone())
 808        });
 809        Ok(())
 810    }
 811
 812    async fn start_language_server(
 813        self: Arc<Server>,
 814        request: TypedEnvelope<proto::StartLanguageServer>,
 815    ) -> Result<()> {
 816        let receiver_ids = self.store_mut().await.start_language_server(
 817            request.payload.project_id,
 818            request.sender_id,
 819            request
 820                .payload
 821                .server
 822                .clone()
 823                .ok_or_else(|| anyhow!("invalid language server"))?,
 824        )?;
 825        broadcast(request.sender_id, receiver_ids, |connection_id| {
 826            self.peer
 827                .forward_send(request.sender_id, connection_id, request.payload.clone())
 828        });
 829        Ok(())
 830    }
 831
 832    async fn update_language_server(
 833        self: Arc<Server>,
 834        request: TypedEnvelope<proto::UpdateLanguageServer>,
 835    ) -> Result<()> {
 836        let receiver_ids = self
 837            .store()
 838            .await
 839            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 840        broadcast(request.sender_id, receiver_ids, |connection_id| {
 841            self.peer
 842                .forward_send(request.sender_id, connection_id, request.payload.clone())
 843        });
 844        Ok(())
 845    }
 846
 847    async fn forward_project_request<T>(
 848        self: Arc<Server>,
 849        request: TypedEnvelope<T>,
 850        response: Response<T>,
 851    ) -> Result<()>
 852    where
 853        T: EntityMessage + RequestMessage,
 854    {
 855        let host_connection_id = self
 856            .store()
 857            .await
 858            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 859            .host_connection_id;
 860
 861        response.send(
 862            self.peer
 863                .forward_request(request.sender_id, host_connection_id, request.payload)
 864                .await?,
 865        )?;
 866        Ok(())
 867    }
 868
 869    async fn save_buffer(
 870        self: Arc<Server>,
 871        request: TypedEnvelope<proto::SaveBuffer>,
 872        response: Response<proto::SaveBuffer>,
 873    ) -> Result<()> {
 874        let host = self
 875            .store()
 876            .await
 877            .read_project(request.payload.project_id, request.sender_id)?
 878            .host_connection_id;
 879        let response_payload = self
 880            .peer
 881            .forward_request(request.sender_id, host, request.payload.clone())
 882            .await?;
 883
 884        let mut guests = self
 885            .store()
 886            .await
 887            .read_project(request.payload.project_id, request.sender_id)?
 888            .connection_ids();
 889        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 890        broadcast(host, guests, |conn_id| {
 891            self.peer
 892                .forward_send(host, conn_id, response_payload.clone())
 893        });
 894        response.send(response_payload)?;
 895        Ok(())
 896    }
 897
 898    async fn update_buffer(
 899        self: Arc<Server>,
 900        request: TypedEnvelope<proto::UpdateBuffer>,
 901        response: Response<proto::UpdateBuffer>,
 902    ) -> Result<()> {
 903        let receiver_ids = self
 904            .store()
 905            .await
 906            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 907        broadcast(request.sender_id, receiver_ids, |connection_id| {
 908            self.peer
 909                .forward_send(request.sender_id, connection_id, request.payload.clone())
 910        });
 911        response.send(proto::Ack {})?;
 912        Ok(())
 913    }
 914
 915    async fn update_buffer_file(
 916        self: Arc<Server>,
 917        request: TypedEnvelope<proto::UpdateBufferFile>,
 918    ) -> Result<()> {
 919        let receiver_ids = self
 920            .store()
 921            .await
 922            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 923        broadcast(request.sender_id, receiver_ids, |connection_id| {
 924            self.peer
 925                .forward_send(request.sender_id, connection_id, request.payload.clone())
 926        });
 927        Ok(())
 928    }
 929
 930    async fn buffer_reloaded(
 931        self: Arc<Server>,
 932        request: TypedEnvelope<proto::BufferReloaded>,
 933    ) -> Result<()> {
 934        let receiver_ids = self
 935            .store()
 936            .await
 937            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 938        broadcast(request.sender_id, receiver_ids, |connection_id| {
 939            self.peer
 940                .forward_send(request.sender_id, connection_id, request.payload.clone())
 941        });
 942        Ok(())
 943    }
 944
 945    async fn buffer_saved(
 946        self: Arc<Server>,
 947        request: TypedEnvelope<proto::BufferSaved>,
 948    ) -> Result<()> {
 949        let receiver_ids = self
 950            .store()
 951            .await
 952            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 953        broadcast(request.sender_id, receiver_ids, |connection_id| {
 954            self.peer
 955                .forward_send(request.sender_id, connection_id, request.payload.clone())
 956        });
 957        Ok(())
 958    }
 959
 960    async fn follow(
 961        self: Arc<Self>,
 962        request: TypedEnvelope<proto::Follow>,
 963        response: Response<proto::Follow>,
 964    ) -> Result<()> {
 965        let leader_id = ConnectionId(request.payload.leader_id);
 966        let follower_id = request.sender_id;
 967        if !self
 968            .store()
 969            .await
 970            .project_connection_ids(request.payload.project_id, follower_id)?
 971            .contains(&leader_id)
 972        {
 973            Err(anyhow!("no such peer"))?;
 974        }
 975        let mut response_payload = self
 976            .peer
 977            .forward_request(request.sender_id, leader_id, request.payload)
 978            .await?;
 979        response_payload
 980            .views
 981            .retain(|view| view.leader_id != Some(follower_id.0));
 982        response.send(response_payload)?;
 983        Ok(())
 984    }
 985
 986    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 987        let leader_id = ConnectionId(request.payload.leader_id);
 988        if !self
 989            .store()
 990            .await
 991            .project_connection_ids(request.payload.project_id, request.sender_id)?
 992            .contains(&leader_id)
 993        {
 994            Err(anyhow!("no such peer"))?;
 995        }
 996        self.peer
 997            .forward_send(request.sender_id, leader_id, request.payload)?;
 998        Ok(())
 999    }
1000
1001    async fn update_followers(
1002        self: Arc<Self>,
1003        request: TypedEnvelope<proto::UpdateFollowers>,
1004    ) -> Result<()> {
1005        let connection_ids = self
1006            .store()
1007            .await
1008            .project_connection_ids(request.payload.project_id, request.sender_id)?;
1009        let leader_id = request
1010            .payload
1011            .variant
1012            .as_ref()
1013            .and_then(|variant| match variant {
1014                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1015                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1016                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1017            });
1018        for follower_id in &request.payload.follower_ids {
1019            let follower_id = ConnectionId(*follower_id);
1020            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
1021                self.peer
1022                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
1023            }
1024        }
1025        Ok(())
1026    }
1027
1028    async fn get_channels(
1029        self: Arc<Server>,
1030        request: TypedEnvelope<proto::GetChannels>,
1031        response: Response<proto::GetChannels>,
1032    ) -> Result<()> {
1033        let user_id = self
1034            .store()
1035            .await
1036            .user_id_for_connection(request.sender_id)?;
1037        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
1038        response.send(proto::GetChannelsResponse {
1039            channels: channels
1040                .into_iter()
1041                .map(|chan| proto::Channel {
1042                    id: chan.id.to_proto(),
1043                    name: chan.name,
1044                })
1045                .collect(),
1046        })?;
1047        Ok(())
1048    }
1049
1050    async fn get_users(
1051        self: Arc<Server>,
1052        request: TypedEnvelope<proto::GetUsers>,
1053        response: Response<proto::GetUsers>,
1054    ) -> Result<()> {
1055        let user_ids = request
1056            .payload
1057            .user_ids
1058            .into_iter()
1059            .map(UserId::from_proto)
1060            .collect();
1061        let users = self
1062            .app_state
1063            .db
1064            .get_users_by_ids(user_ids)
1065            .await?
1066            .into_iter()
1067            .map(|user| proto::User {
1068                id: user.id.to_proto(),
1069                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1070                github_login: user.github_login,
1071            })
1072            .collect();
1073        response.send(proto::UsersResponse { users })?;
1074        Ok(())
1075    }
1076
1077    async fn fuzzy_search_users(
1078        self: Arc<Server>,
1079        request: TypedEnvelope<proto::FuzzySearchUsers>,
1080        response: Response<proto::FuzzySearchUsers>,
1081    ) -> Result<()> {
1082        let user_id = self
1083            .store()
1084            .await
1085            .user_id_for_connection(request.sender_id)?;
1086        let query = request.payload.query;
1087        let db = &self.app_state.db;
1088        let users = match query.len() {
1089            0 => vec![],
1090            1 | 2 => db
1091                .get_user_by_github_login(&query)
1092                .await?
1093                .into_iter()
1094                .collect(),
1095            _ => db.fuzzy_search_users(&query, 10).await?,
1096        };
1097        let users = users
1098            .into_iter()
1099            .filter(|user| user.id != user_id)
1100            .map(|user| proto::User {
1101                id: user.id.to_proto(),
1102                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1103                github_login: user.github_login,
1104            })
1105            .collect();
1106        response.send(proto::UsersResponse { users })?;
1107        Ok(())
1108    }
1109
1110    async fn request_contact(
1111        self: Arc<Server>,
1112        request: TypedEnvelope<proto::RequestContact>,
1113        response: Response<proto::RequestContact>,
1114    ) -> Result<()> {
1115        let requester_id = self
1116            .store()
1117            .await
1118            .user_id_for_connection(request.sender_id)?;
1119        let responder_id = UserId::from_proto(request.payload.responder_id);
1120        if requester_id == responder_id {
1121            return Err(anyhow!("cannot add yourself as a contact"))?;
1122        }
1123
1124        self.app_state
1125            .db
1126            .send_contact_request(requester_id, responder_id)
1127            .await?;
1128
1129        // Update outgoing contact requests of requester
1130        let mut update = proto::UpdateContacts::default();
1131        update.outgoing_requests.push(responder_id.to_proto());
1132        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1133            self.peer.send(connection_id, update.clone())?;
1134        }
1135
1136        // Update incoming contact requests of responder
1137        let mut update = proto::UpdateContacts::default();
1138        update
1139            .incoming_requests
1140            .push(proto::IncomingContactRequest {
1141                requester_id: requester_id.to_proto(),
1142                should_notify: true,
1143            });
1144        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1145            self.peer.send(connection_id, update.clone())?;
1146        }
1147
1148        response.send(proto::Ack {})?;
1149        Ok(())
1150    }
1151
1152    async fn respond_to_contact_request(
1153        self: Arc<Server>,
1154        request: TypedEnvelope<proto::RespondToContactRequest>,
1155        response: Response<proto::RespondToContactRequest>,
1156    ) -> Result<()> {
1157        let responder_id = self
1158            .store()
1159            .await
1160            .user_id_for_connection(request.sender_id)?;
1161        let requester_id = UserId::from_proto(request.payload.requester_id);
1162        if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1163            self.app_state
1164                .db
1165                .dismiss_contact_notification(responder_id, requester_id)
1166                .await?;
1167        } else {
1168            let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1169            self.app_state
1170                .db
1171                .respond_to_contact_request(responder_id, requester_id, accept)
1172                .await?;
1173
1174            let store = self.store().await;
1175            // Update responder with new contact
1176            let mut update = proto::UpdateContacts::default();
1177            if accept {
1178                update
1179                    .contacts
1180                    .push(store.contact_for_user(requester_id, false));
1181            }
1182            update
1183                .remove_incoming_requests
1184                .push(requester_id.to_proto());
1185            for connection_id in store.connection_ids_for_user(responder_id) {
1186                self.peer.send(connection_id, update.clone())?;
1187            }
1188
1189            // Update requester with new contact
1190            let mut update = proto::UpdateContacts::default();
1191            if accept {
1192                update
1193                    .contacts
1194                    .push(store.contact_for_user(responder_id, true));
1195            }
1196            update
1197                .remove_outgoing_requests
1198                .push(responder_id.to_proto());
1199            for connection_id in store.connection_ids_for_user(requester_id) {
1200                self.peer.send(connection_id, update.clone())?;
1201            }
1202        }
1203
1204        response.send(proto::Ack {})?;
1205        Ok(())
1206    }
1207
1208    async fn remove_contact(
1209        self: Arc<Server>,
1210        request: TypedEnvelope<proto::RemoveContact>,
1211        response: Response<proto::RemoveContact>,
1212    ) -> Result<()> {
1213        let requester_id = self
1214            .store()
1215            .await
1216            .user_id_for_connection(request.sender_id)?;
1217        let responder_id = UserId::from_proto(request.payload.user_id);
1218        self.app_state
1219            .db
1220            .remove_contact(requester_id, responder_id)
1221            .await?;
1222
1223        // Update outgoing contact requests of requester
1224        let mut update = proto::UpdateContacts::default();
1225        update
1226            .remove_outgoing_requests
1227            .push(responder_id.to_proto());
1228        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1229            self.peer.send(connection_id, update.clone())?;
1230        }
1231
1232        // Update incoming contact requests of responder
1233        let mut update = proto::UpdateContacts::default();
1234        update
1235            .remove_incoming_requests
1236            .push(requester_id.to_proto());
1237        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1238            self.peer.send(connection_id, update.clone())?;
1239        }
1240
1241        response.send(proto::Ack {})?;
1242        Ok(())
1243    }
1244
1245    async fn join_channel(
1246        self: Arc<Self>,
1247        request: TypedEnvelope<proto::JoinChannel>,
1248        response: Response<proto::JoinChannel>,
1249    ) -> Result<()> {
1250        let user_id = self
1251            .store()
1252            .await
1253            .user_id_for_connection(request.sender_id)?;
1254        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1255        if !self
1256            .app_state
1257            .db
1258            .can_user_access_channel(user_id, channel_id)
1259            .await?
1260        {
1261            Err(anyhow!("access denied"))?;
1262        }
1263
1264        self.store_mut()
1265            .await
1266            .join_channel(request.sender_id, channel_id);
1267        let messages = self
1268            .app_state
1269            .db
1270            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1271            .await?
1272            .into_iter()
1273            .map(|msg| proto::ChannelMessage {
1274                id: msg.id.to_proto(),
1275                body: msg.body,
1276                timestamp: msg.sent_at.unix_timestamp() as u64,
1277                sender_id: msg.sender_id.to_proto(),
1278                nonce: Some(msg.nonce.as_u128().into()),
1279            })
1280            .collect::<Vec<_>>();
1281        response.send(proto::JoinChannelResponse {
1282            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1283            messages,
1284        })?;
1285        Ok(())
1286    }
1287
1288    async fn leave_channel(
1289        self: Arc<Self>,
1290        request: TypedEnvelope<proto::LeaveChannel>,
1291    ) -> Result<()> {
1292        let user_id = self
1293            .store()
1294            .await
1295            .user_id_for_connection(request.sender_id)?;
1296        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1297        if !self
1298            .app_state
1299            .db
1300            .can_user_access_channel(user_id, channel_id)
1301            .await?
1302        {
1303            Err(anyhow!("access denied"))?;
1304        }
1305
1306        self.store_mut()
1307            .await
1308            .leave_channel(request.sender_id, channel_id);
1309
1310        Ok(())
1311    }
1312
1313    async fn send_channel_message(
1314        self: Arc<Self>,
1315        request: TypedEnvelope<proto::SendChannelMessage>,
1316        response: Response<proto::SendChannelMessage>,
1317    ) -> Result<()> {
1318        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1319        let user_id;
1320        let connection_ids;
1321        {
1322            let state = self.store().await;
1323            user_id = state.user_id_for_connection(request.sender_id)?;
1324            connection_ids = state.channel_connection_ids(channel_id)?;
1325        }
1326
1327        // Validate the message body.
1328        let body = request.payload.body.trim().to_string();
1329        if body.len() > MAX_MESSAGE_LEN {
1330            return Err(anyhow!("message is too long"))?;
1331        }
1332        if body.is_empty() {
1333            return Err(anyhow!("message can't be blank"))?;
1334        }
1335
1336        let timestamp = OffsetDateTime::now_utc();
1337        let nonce = request
1338            .payload
1339            .nonce
1340            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1341
1342        let message_id = self
1343            .app_state
1344            .db
1345            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1346            .await?
1347            .to_proto();
1348        let message = proto::ChannelMessage {
1349            sender_id: user_id.to_proto(),
1350            id: message_id,
1351            body,
1352            timestamp: timestamp.unix_timestamp() as u64,
1353            nonce: Some(nonce),
1354        };
1355        broadcast(request.sender_id, connection_ids, |conn_id| {
1356            self.peer.send(
1357                conn_id,
1358                proto::ChannelMessageSent {
1359                    channel_id: channel_id.to_proto(),
1360                    message: Some(message.clone()),
1361                },
1362            )
1363        });
1364        response.send(proto::SendChannelMessageResponse {
1365            message: Some(message),
1366        })?;
1367        Ok(())
1368    }
1369
1370    async fn get_channel_messages(
1371        self: Arc<Self>,
1372        request: TypedEnvelope<proto::GetChannelMessages>,
1373        response: Response<proto::GetChannelMessages>,
1374    ) -> Result<()> {
1375        let user_id = self
1376            .store()
1377            .await
1378            .user_id_for_connection(request.sender_id)?;
1379        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1380        if !self
1381            .app_state
1382            .db
1383            .can_user_access_channel(user_id, channel_id)
1384            .await?
1385        {
1386            Err(anyhow!("access denied"))?;
1387        }
1388
1389        let messages = self
1390            .app_state
1391            .db
1392            .get_channel_messages(
1393                channel_id,
1394                MESSAGE_COUNT_PER_PAGE,
1395                Some(MessageId::from_proto(request.payload.before_message_id)),
1396            )
1397            .await?
1398            .into_iter()
1399            .map(|msg| proto::ChannelMessage {
1400                id: msg.id.to_proto(),
1401                body: msg.body,
1402                timestamp: msg.sent_at.unix_timestamp() as u64,
1403                sender_id: msg.sender_id.to_proto(),
1404                nonce: Some(msg.nonce.as_u128().into()),
1405            })
1406            .collect::<Vec<_>>();
1407        response.send(proto::GetChannelMessagesResponse {
1408            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1409            messages,
1410        })?;
1411        Ok(())
1412    }
1413
1414    async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1415        #[cfg(test)]
1416        tokio::task::yield_now().await;
1417        let guard = self.store.read().await;
1418        #[cfg(test)]
1419        tokio::task::yield_now().await;
1420        StoreReadGuard {
1421            guard,
1422            _not_send: PhantomData,
1423        }
1424    }
1425
1426    async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1427        #[cfg(test)]
1428        tokio::task::yield_now().await;
1429        let guard = self.store.write().await;
1430        #[cfg(test)]
1431        tokio::task::yield_now().await;
1432        StoreWriteGuard {
1433            guard,
1434            _not_send: PhantomData,
1435        }
1436    }
1437}
1438
1439impl<'a> Deref for StoreReadGuard<'a> {
1440    type Target = Store;
1441
1442    fn deref(&self) -> &Self::Target {
1443        &*self.guard
1444    }
1445}
1446
1447impl<'a> Deref for StoreWriteGuard<'a> {
1448    type Target = Store;
1449
1450    fn deref(&self) -> &Self::Target {
1451        &*self.guard
1452    }
1453}
1454
1455impl<'a> DerefMut for StoreWriteGuard<'a> {
1456    fn deref_mut(&mut self) -> &mut Self::Target {
1457        &mut *self.guard
1458    }
1459}
1460
1461impl<'a> Drop for StoreWriteGuard<'a> {
1462    fn drop(&mut self) {
1463        #[cfg(test)]
1464        self.check_invariants();
1465
1466        let metrics = self.metrics();
1467        tracing::info!(
1468            connections = metrics.connections,
1469            registered_projects = metrics.registered_projects,
1470            shared_projects = metrics.shared_projects,
1471            "metrics"
1472        );
1473    }
1474}
1475
1476impl Executor for RealExecutor {
1477    type Sleep = Sleep;
1478
1479    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1480        tokio::task::spawn(future);
1481    }
1482
1483    fn sleep(&self, duration: Duration) -> Self::Sleep {
1484        tokio::time::sleep(duration)
1485    }
1486}
1487
1488fn broadcast<F>(
1489    sender_id: ConnectionId,
1490    receiver_ids: impl IntoIterator<Item = ConnectionId>,
1491    mut f: F,
1492) where
1493    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1494{
1495    for receiver_id in receiver_ids {
1496        if receiver_id != sender_id {
1497            f(receiver_id).trace_err();
1498        }
1499    }
1500}
1501
1502lazy_static! {
1503    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1504}
1505
1506pub struct ProtocolVersion(u32);
1507
1508impl Header for ProtocolVersion {
1509    fn name() -> &'static HeaderName {
1510        &ZED_PROTOCOL_VERSION
1511    }
1512
1513    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1514    where
1515        Self: Sized,
1516        I: Iterator<Item = &'i axum::http::HeaderValue>,
1517    {
1518        let version = values
1519            .next()
1520            .ok_or_else(|| axum::headers::Error::invalid())?
1521            .to_str()
1522            .map_err(|_| axum::headers::Error::invalid())?
1523            .parse()
1524            .map_err(|_| axum::headers::Error::invalid())?;
1525        Ok(Self(version))
1526    }
1527
1528    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1529        values.extend([self.0.to_string().parse().unwrap()]);
1530    }
1531}
1532
1533pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1534    let server = Server::new(app_state.clone(), None);
1535    Router::new()
1536        .route("/rpc", get(handle_websocket_request))
1537        .layer(
1538            ServiceBuilder::new()
1539                .layer(Extension(app_state))
1540                .layer(middleware::from_fn(auth::validate_header))
1541                .layer(Extension(server)),
1542        )
1543}
1544
1545pub async fn handle_websocket_request(
1546    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1547    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1548    Extension(server): Extension<Arc<Server>>,
1549    Extension(user): Extension<User>,
1550    ws: WebSocketUpgrade,
1551) -> axum::response::Response {
1552    if protocol_version != rpc::PROTOCOL_VERSION {
1553        return (
1554            StatusCode::UPGRADE_REQUIRED,
1555            "client must be upgraded".to_string(),
1556        )
1557            .into_response();
1558    }
1559    let socket_address = socket_address.to_string();
1560    ws.on_upgrade(move |socket| {
1561        use util::ResultExt;
1562        let socket = socket
1563            .map_ok(to_tungstenite_message)
1564            .err_into()
1565            .with(|message| async move { Ok(to_axum_message(message)) });
1566        let connection = Connection::new(Box::pin(socket));
1567        async move {
1568            server
1569                .handle_connection(connection, socket_address, user, None, RealExecutor)
1570                .await
1571                .log_err();
1572        }
1573    })
1574}
1575
1576fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1577    match message {
1578        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1579        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1580        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1581        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1582        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1583            code: frame.code.into(),
1584            reason: frame.reason,
1585        })),
1586    }
1587}
1588
1589fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1590    match message {
1591        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1592        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1593        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1594        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1595        AxumMessage::Close(frame) => {
1596            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1597                code: frame.code.into(),
1598                reason: frame.reason,
1599            }))
1600        }
1601    }
1602}
1603
1604pub trait ResultExt {
1605    type Ok;
1606
1607    fn trace_err(self) -> Option<Self::Ok>;
1608}
1609
1610impl<T, E> ResultExt for Result<T, E>
1611where
1612    E: std::fmt::Debug,
1613{
1614    type Ok = T;
1615
1616    fn trace_err(self) -> Option<T> {
1617        match self {
1618            Ok(value) => Some(value),
1619            Err(error) => {
1620                tracing::error!("{:?}", error);
1621                None
1622            }
1623        }
1624    }
1625}
1626
1627#[cfg(test)]
1628mod tests {
1629    use super::*;
1630    use crate::{
1631        db::{tests::TestDb, UserId},
1632        AppState,
1633    };
1634    use ::rpc::Peer;
1635    use client::{
1636        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1637        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1638    };
1639    use collections::{BTreeMap, HashSet};
1640    use editor::{
1641        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1642        ToOffset, ToggleCodeActions, Undo,
1643    };
1644    use gpui::{
1645        executor::{self, Deterministic},
1646        geometry::vector::vec2f,
1647        ModelHandle, TestAppContext, ViewHandle,
1648    };
1649    use language::{
1650        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1651        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1652    };
1653    use lsp::{self, FakeLanguageServer};
1654    use parking_lot::Mutex;
1655    use project::{
1656        fs::{FakeFs, Fs as _},
1657        search::SearchQuery,
1658        worktree::WorktreeHandle,
1659        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1660    };
1661    use rand::prelude::*;
1662    use rpc::PeerId;
1663    use serde_json::json;
1664    use settings::Settings;
1665    use sqlx::types::time::OffsetDateTime;
1666    use std::{
1667        cell::RefCell,
1668        env,
1669        ops::Deref,
1670        path::{Path, PathBuf},
1671        rc::Rc,
1672        sync::{
1673            atomic::{AtomicBool, Ordering::SeqCst},
1674            Arc,
1675        },
1676        time::Duration,
1677    };
1678    use theme::ThemeRegistry;
1679    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1680
1681    #[cfg(test)]
1682    #[ctor::ctor]
1683    fn init_logger() {
1684        if std::env::var("RUST_LOG").is_ok() {
1685            env_logger::init();
1686        }
1687    }
1688
1689    #[gpui::test(iterations = 10)]
1690    async fn test_share_project(
1691        deterministic: Arc<Deterministic>,
1692        cx_a: &mut TestAppContext,
1693        cx_b: &mut TestAppContext,
1694        cx_b2: &mut TestAppContext,
1695    ) {
1696        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1697        let lang_registry = Arc::new(LanguageRegistry::test());
1698        let fs = FakeFs::new(cx_a.background());
1699        cx_a.foreground().forbid_parking();
1700
1701        // Connect to a server as 2 clients.
1702        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1703        let client_a = server.create_client(cx_a, "user_a").await;
1704        let mut client_b = server.create_client(cx_b, "user_b").await;
1705        server
1706            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1707            .await;
1708
1709        // Share a project as client A
1710        fs.insert_tree(
1711            "/a",
1712            json!({
1713                "a.txt": "a-contents",
1714                "b.txt": "b-contents",
1715            }),
1716        )
1717        .await;
1718        let project_a = cx_a.update(|cx| {
1719            Project::local(
1720                client_a.clone(),
1721                client_a.user_store.clone(),
1722                lang_registry.clone(),
1723                fs.clone(),
1724                cx,
1725            )
1726        });
1727        let project_id = project_a
1728            .read_with(cx_a, |project, _| project.next_remote_id())
1729            .await;
1730        let (worktree_a, _) = project_a
1731            .update(cx_a, |p, cx| {
1732                p.find_or_create_local_worktree("/a", true, cx)
1733            })
1734            .await
1735            .unwrap();
1736        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1737        worktree_a
1738            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1739            .await;
1740
1741        // Join that project as client B
1742        let client_b_peer_id = client_b.peer_id;
1743        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1744
1745        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1746            assert_eq!(
1747                project
1748                    .collaborators()
1749                    .get(&client_a.peer_id)
1750                    .unwrap()
1751                    .user
1752                    .github_login,
1753                "user_a"
1754            );
1755            project.replica_id()
1756        });
1757        project_a
1758            .condition(&cx_a, |tree, _| {
1759                tree.collaborators()
1760                    .get(&client_b_peer_id)
1761                    .map_or(false, |collaborator| {
1762                        collaborator.replica_id == replica_id_b
1763                            && collaborator.user.github_login == "user_b"
1764                    })
1765            })
1766            .await;
1767
1768        // Open the same file as client B and client A.
1769        let buffer_b = project_b
1770            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1771            .await
1772            .unwrap();
1773        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1774        project_a.read_with(cx_a, |project, cx| {
1775            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1776        });
1777        let buffer_a = project_a
1778            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1779            .await
1780            .unwrap();
1781
1782        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1783
1784        // TODO
1785        // // Create a selection set as client B and see that selection set as client A.
1786        // buffer_a
1787        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1788        //     .await;
1789
1790        // Edit the buffer as client B and see that edit as client A.
1791        editor_b.update(cx_b, |editor, cx| {
1792            editor.handle_input(&Input("ok, ".into()), cx)
1793        });
1794        buffer_a
1795            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1796            .await;
1797
1798        // TODO
1799        // // Remove the selection set as client B, see those selections disappear as client A.
1800        cx_b.update(move |_| drop(editor_b));
1801        // buffer_a
1802        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1803        //     .await;
1804
1805        // Client B can join again on a different window because they are already a participant.
1806        let client_b2 = server.create_client(cx_b2, "user_b").await;
1807        let project_b2 = Project::remote(
1808            project_id,
1809            client_b2.client.clone(),
1810            client_b2.user_store.clone(),
1811            lang_registry.clone(),
1812            FakeFs::new(cx_b2.background()),
1813            &mut cx_b2.to_async(),
1814        )
1815        .await
1816        .unwrap();
1817        deterministic.run_until_parked();
1818        project_a.read_with(cx_a, |project, _| {
1819            assert_eq!(project.collaborators().len(), 2);
1820        });
1821        project_b.read_with(cx_b, |project, _| {
1822            assert_eq!(project.collaborators().len(), 2);
1823        });
1824        project_b2.read_with(cx_b2, |project, _| {
1825            assert_eq!(project.collaborators().len(), 2);
1826        });
1827
1828        // Dropping client B's first project removes only that from client A's collaborators.
1829        cx_b.update(move |_| {
1830            drop(client_b.project.take());
1831            drop(project_b);
1832        });
1833        deterministic.run_until_parked();
1834        project_a.read_with(cx_a, |project, _| {
1835            assert_eq!(project.collaborators().len(), 1);
1836        });
1837        project_b2.read_with(cx_b2, |project, _| {
1838            assert_eq!(project.collaborators().len(), 1);
1839        });
1840    }
1841
1842    #[gpui::test(iterations = 10)]
1843    async fn test_unshare_project(
1844        deterministic: Arc<Deterministic>,
1845        cx_a: &mut TestAppContext,
1846        cx_b: &mut TestAppContext,
1847    ) {
1848        let lang_registry = Arc::new(LanguageRegistry::test());
1849        let fs = FakeFs::new(cx_a.background());
1850        cx_a.foreground().forbid_parking();
1851
1852        // Connect to a server as 2 clients.
1853        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1854        let client_a = server.create_client(cx_a, "user_a").await;
1855        let mut client_b = server.create_client(cx_b, "user_b").await;
1856        server
1857            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1858            .await;
1859
1860        // Share a project as client A
1861        fs.insert_tree(
1862            "/a",
1863            json!({
1864                "a.txt": "a-contents",
1865                "b.txt": "b-contents",
1866            }),
1867        )
1868        .await;
1869        let project_a = cx_a.update(|cx| {
1870            Project::local(
1871                client_a.clone(),
1872                client_a.user_store.clone(),
1873                lang_registry.clone(),
1874                fs.clone(),
1875                cx,
1876            )
1877        });
1878        let (worktree_a, _) = project_a
1879            .update(cx_a, |p, cx| {
1880                p.find_or_create_local_worktree("/a", true, cx)
1881            })
1882            .await
1883            .unwrap();
1884        worktree_a
1885            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1886            .await;
1887        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1888
1889        // Join that project as client B
1890        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1891        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1892        project_b
1893            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1894            .await
1895            .unwrap();
1896
1897        // When client B leaves the project, it gets automatically unshared.
1898        cx_b.update(|_| {
1899            drop(client_b.project.take());
1900            drop(project_b);
1901        });
1902        deterministic.run_until_parked();
1903        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1904
1905        // When client B joins again, the project gets re-shared.
1906        let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1907        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1908        project_b2
1909            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1910            .await
1911            .unwrap();
1912    }
1913
1914    #[gpui::test(iterations = 10)]
1915    async fn test_host_disconnect(
1916        deterministic: Arc<Deterministic>,
1917        cx_a: &mut TestAppContext,
1918        cx_b: &mut TestAppContext,
1919        cx_c: &mut TestAppContext,
1920    ) {
1921        let lang_registry = Arc::new(LanguageRegistry::test());
1922        let fs = FakeFs::new(cx_a.background());
1923        cx_a.foreground().forbid_parking();
1924
1925        // Connect to a server as 3 clients.
1926        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1927        let client_a = server.create_client(cx_a, "user_a").await;
1928        let mut client_b = server.create_client(cx_b, "user_b").await;
1929        let client_c = server.create_client(cx_c, "user_c").await;
1930        server
1931            .make_contacts(vec![
1932                (&client_a, cx_a),
1933                (&client_b, cx_b),
1934                (&client_c, cx_c),
1935            ])
1936            .await;
1937
1938        // Share a project as client A
1939        fs.insert_tree(
1940            "/a",
1941            json!({
1942                "a.txt": "a-contents",
1943                "b.txt": "b-contents",
1944            }),
1945        )
1946        .await;
1947        let project_a = cx_a.update(|cx| {
1948            Project::local(
1949                client_a.clone(),
1950                client_a.user_store.clone(),
1951                lang_registry.clone(),
1952                fs.clone(),
1953                cx,
1954            )
1955        });
1956        let project_id = project_a
1957            .read_with(cx_a, |project, _| project.next_remote_id())
1958            .await;
1959        let (worktree_a, _) = project_a
1960            .update(cx_a, |p, cx| {
1961                p.find_or_create_local_worktree("/a", true, cx)
1962            })
1963            .await
1964            .unwrap();
1965        worktree_a
1966            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1967            .await;
1968        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1969
1970        // Join that project as client B
1971        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1972        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1973        project_b
1974            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1975            .await
1976            .unwrap();
1977
1978        // Request to join that project as client C
1979        let project_c = cx_c.spawn(|mut cx| {
1980            let client = client_c.client.clone();
1981            let user_store = client_c.user_store.clone();
1982            let lang_registry = lang_registry.clone();
1983            async move {
1984                Project::remote(
1985                    project_id,
1986                    client,
1987                    user_store,
1988                    lang_registry.clone(),
1989                    FakeFs::new(cx.background()),
1990                    &mut cx,
1991                )
1992                .await
1993            }
1994        });
1995        deterministic.run_until_parked();
1996
1997        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1998        server.disconnect_client(client_a.current_user_id(cx_a));
1999        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2000        project_a
2001            .condition(cx_a, |project, _| project.collaborators().is_empty())
2002            .await;
2003        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
2004        project_b
2005            .condition(cx_b, |project, _| project.is_read_only())
2006            .await;
2007        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
2008        cx_b.update(|_| {
2009            drop(project_b);
2010        });
2011        assert!(matches!(
2012            project_c.await.unwrap_err(),
2013            project::JoinProjectError::HostWentOffline
2014        ));
2015
2016        // Ensure guests can still join.
2017        let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2018        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
2019        project_b2
2020            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2021            .await
2022            .unwrap();
2023    }
2024
2025    #[gpui::test(iterations = 10)]
2026    async fn test_decline_join_request(
2027        deterministic: Arc<Deterministic>,
2028        cx_a: &mut TestAppContext,
2029        cx_b: &mut TestAppContext,
2030    ) {
2031        let lang_registry = Arc::new(LanguageRegistry::test());
2032        let fs = FakeFs::new(cx_a.background());
2033        cx_a.foreground().forbid_parking();
2034
2035        // Connect to a server as 2 clients.
2036        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2037        let client_a = server.create_client(cx_a, "user_a").await;
2038        let client_b = server.create_client(cx_b, "user_b").await;
2039        server
2040            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2041            .await;
2042
2043        // Share a project as client A
2044        fs.insert_tree("/a", json!({})).await;
2045        let project_a = cx_a.update(|cx| {
2046            Project::local(
2047                client_a.clone(),
2048                client_a.user_store.clone(),
2049                lang_registry.clone(),
2050                fs.clone(),
2051                cx,
2052            )
2053        });
2054        let project_id = project_a
2055            .read_with(cx_a, |project, _| project.next_remote_id())
2056            .await;
2057        let (worktree_a, _) = project_a
2058            .update(cx_a, |p, cx| {
2059                p.find_or_create_local_worktree("/a", true, cx)
2060            })
2061            .await
2062            .unwrap();
2063        worktree_a
2064            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2065            .await;
2066
2067        // Request to join that project as client B
2068        let project_b = cx_b.spawn(|mut cx| {
2069            let client = client_b.client.clone();
2070            let user_store = client_b.user_store.clone();
2071            let lang_registry = lang_registry.clone();
2072            async move {
2073                Project::remote(
2074                    project_id,
2075                    client,
2076                    user_store,
2077                    lang_registry.clone(),
2078                    FakeFs::new(cx.background()),
2079                    &mut cx,
2080                )
2081                .await
2082            }
2083        });
2084        deterministic.run_until_parked();
2085        project_a.update(cx_a, |project, cx| {
2086            project.respond_to_join_request(client_b.user_id().unwrap(), false, cx)
2087        });
2088        assert!(matches!(
2089            project_b.await.unwrap_err(),
2090            project::JoinProjectError::HostDeclined
2091        ));
2092
2093        // Request to join the project again as client B
2094        let project_b = cx_b.spawn(|mut cx| {
2095            let client = client_b.client.clone();
2096            let user_store = client_b.user_store.clone();
2097            let lang_registry = lang_registry.clone();
2098            async move {
2099                Project::remote(
2100                    project_id,
2101                    client,
2102                    user_store,
2103                    lang_registry.clone(),
2104                    FakeFs::new(cx.background()),
2105                    &mut cx,
2106                )
2107                .await
2108            }
2109        });
2110
2111        // Close the project on the host
2112        deterministic.run_until_parked();
2113        cx_a.update(|_| drop(project_a));
2114        deterministic.run_until_parked();
2115        assert!(matches!(
2116            project_b.await.unwrap_err(),
2117            project::JoinProjectError::HostClosedProject
2118        ));
2119    }
2120
2121    #[gpui::test(iterations = 10)]
2122    async fn test_cancel_join_request(
2123        deterministic: Arc<Deterministic>,
2124        cx_a: &mut TestAppContext,
2125        cx_b: &mut TestAppContext,
2126    ) {
2127        let lang_registry = Arc::new(LanguageRegistry::test());
2128        let fs = FakeFs::new(cx_a.background());
2129        cx_a.foreground().forbid_parking();
2130
2131        // Connect to a server as 2 clients.
2132        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2133        let client_a = server.create_client(cx_a, "user_a").await;
2134        let client_b = server.create_client(cx_b, "user_b").await;
2135        server
2136            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2137            .await;
2138
2139        // Share a project as client A
2140        fs.insert_tree("/a", json!({})).await;
2141        let project_a = cx_a.update(|cx| {
2142            Project::local(
2143                client_a.clone(),
2144                client_a.user_store.clone(),
2145                lang_registry.clone(),
2146                fs.clone(),
2147                cx,
2148            )
2149        });
2150        let project_id = project_a
2151            .read_with(cx_a, |project, _| project.next_remote_id())
2152            .await;
2153
2154        let project_a_events = Rc::new(RefCell::new(Vec::new()));
2155        let user_b = client_a
2156            .user_store
2157            .update(cx_a, |store, cx| {
2158                store.fetch_user(client_b.user_id().unwrap(), cx)
2159            })
2160            .await
2161            .unwrap();
2162        project_a.update(cx_a, {
2163            let project_a_events = project_a_events.clone();
2164            move |_, cx| {
2165                cx.subscribe(&cx.handle(), move |_, _, event, _| {
2166                    project_a_events.borrow_mut().push(event.clone());
2167                })
2168                .detach();
2169            }
2170        });
2171
2172        let (worktree_a, _) = project_a
2173            .update(cx_a, |p, cx| {
2174                p.find_or_create_local_worktree("/a", true, cx)
2175            })
2176            .await
2177            .unwrap();
2178        worktree_a
2179            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2180            .await;
2181
2182        // Request to join that project as client B
2183        let project_b = cx_b.spawn(|mut cx| {
2184            let client = client_b.client.clone();
2185            let user_store = client_b.user_store.clone();
2186            let lang_registry = lang_registry.clone();
2187            async move {
2188                Project::remote(
2189                    project_id,
2190                    client,
2191                    user_store,
2192                    lang_registry.clone(),
2193                    FakeFs::new(cx.background()),
2194                    &mut cx,
2195                )
2196                .await
2197            }
2198        });
2199        deterministic.run_until_parked();
2200        assert_eq!(
2201            &*project_a_events.borrow(),
2202            &[project::Event::ContactRequestedJoin(user_b.clone())]
2203        );
2204        project_a_events.borrow_mut().clear();
2205
2206        // Cancel the join request by leaving the project
2207        client_b
2208            .client
2209            .send(proto::LeaveProject { project_id })
2210            .unwrap();
2211        drop(project_b);
2212
2213        deterministic.run_until_parked();
2214        assert_eq!(
2215            &*project_a_events.borrow(),
2216            &[project::Event::ContactCancelledJoinRequest(user_b.clone())]
2217        );
2218    }
2219
2220    #[gpui::test(iterations = 10)]
2221    async fn test_propagate_saves_and_fs_changes(
2222        cx_a: &mut TestAppContext,
2223        cx_b: &mut TestAppContext,
2224        cx_c: &mut TestAppContext,
2225    ) {
2226        let lang_registry = Arc::new(LanguageRegistry::test());
2227        let fs = FakeFs::new(cx_a.background());
2228        cx_a.foreground().forbid_parking();
2229
2230        // Connect to a server as 3 clients.
2231        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2232        let client_a = server.create_client(cx_a, "user_a").await;
2233        let mut client_b = server.create_client(cx_b, "user_b").await;
2234        let mut client_c = server.create_client(cx_c, "user_c").await;
2235        server
2236            .make_contacts(vec![
2237                (&client_a, cx_a),
2238                (&client_b, cx_b),
2239                (&client_c, cx_c),
2240            ])
2241            .await;
2242
2243        // Share a worktree as client A.
2244        fs.insert_tree(
2245            "/a",
2246            json!({
2247                "file1": "",
2248                "file2": ""
2249            }),
2250        )
2251        .await;
2252        let project_a = cx_a.update(|cx| {
2253            Project::local(
2254                client_a.clone(),
2255                client_a.user_store.clone(),
2256                lang_registry.clone(),
2257                fs.clone(),
2258                cx,
2259            )
2260        });
2261        let (worktree_a, _) = project_a
2262            .update(cx_a, |p, cx| {
2263                p.find_or_create_local_worktree("/a", true, cx)
2264            })
2265            .await
2266            .unwrap();
2267        worktree_a
2268            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2269            .await;
2270        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2271
2272        // Join that worktree as clients B and C.
2273        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2274        let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
2275        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2276        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
2277
2278        // Open and edit a buffer as both guests B and C.
2279        let buffer_b = project_b
2280            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2281            .await
2282            .unwrap();
2283        let buffer_c = project_c
2284            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2285            .await
2286            .unwrap();
2287        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
2288        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
2289
2290        // Open and edit that buffer as the host.
2291        let buffer_a = project_a
2292            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2293            .await
2294            .unwrap();
2295
2296        buffer_a
2297            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
2298            .await;
2299        buffer_a.update(cx_a, |buf, cx| {
2300            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
2301        });
2302
2303        // Wait for edits to propagate
2304        buffer_a
2305            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2306            .await;
2307        buffer_b
2308            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2309            .await;
2310        buffer_c
2311            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2312            .await;
2313
2314        // Edit the buffer as the host and concurrently save as guest B.
2315        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2316        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2317        save_b.await.unwrap();
2318        assert_eq!(
2319            fs.load("/a/file1".as_ref()).await.unwrap(),
2320            "hi-a, i-am-c, i-am-b, i-am-a"
2321        );
2322        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2323        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2324        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2325
2326        worktree_a.flush_fs_events(cx_a).await;
2327
2328        // Make changes on host's file system, see those changes on guest worktrees.
2329        fs.rename(
2330            "/a/file1".as_ref(),
2331            "/a/file1-renamed".as_ref(),
2332            Default::default(),
2333        )
2334        .await
2335        .unwrap();
2336
2337        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2338            .await
2339            .unwrap();
2340        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2341
2342        worktree_a
2343            .condition(&cx_a, |tree, _| {
2344                tree.paths()
2345                    .map(|p| p.to_string_lossy())
2346                    .collect::<Vec<_>>()
2347                    == ["file1-renamed", "file3", "file4"]
2348            })
2349            .await;
2350        worktree_b
2351            .condition(&cx_b, |tree, _| {
2352                tree.paths()
2353                    .map(|p| p.to_string_lossy())
2354                    .collect::<Vec<_>>()
2355                    == ["file1-renamed", "file3", "file4"]
2356            })
2357            .await;
2358        worktree_c
2359            .condition(&cx_c, |tree, _| {
2360                tree.paths()
2361                    .map(|p| p.to_string_lossy())
2362                    .collect::<Vec<_>>()
2363                    == ["file1-renamed", "file3", "file4"]
2364            })
2365            .await;
2366
2367        // Ensure buffer files are updated as well.
2368        buffer_a
2369            .condition(&cx_a, |buf, _| {
2370                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2371            })
2372            .await;
2373        buffer_b
2374            .condition(&cx_b, |buf, _| {
2375                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2376            })
2377            .await;
2378        buffer_c
2379            .condition(&cx_c, |buf, _| {
2380                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2381            })
2382            .await;
2383    }
2384
2385    #[gpui::test(iterations = 10)]
2386    async fn test_fs_operations(
2387        executor: Arc<Deterministic>,
2388        cx_a: &mut TestAppContext,
2389        cx_b: &mut TestAppContext,
2390    ) {
2391        executor.forbid_parking();
2392        let fs = FakeFs::new(cx_a.background());
2393
2394        // Connect to a server as 2 clients.
2395        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2396        let mut client_a = server.create_client(cx_a, "user_a").await;
2397        let mut client_b = server.create_client(cx_b, "user_b").await;
2398        server
2399            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2400            .await;
2401
2402        // Share a project as client A
2403        fs.insert_tree(
2404            "/dir",
2405            json!({
2406                "a.txt": "a-contents",
2407                "b.txt": "b-contents",
2408            }),
2409        )
2410        .await;
2411
2412        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2413        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2414
2415        let worktree_a =
2416            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2417        let worktree_b =
2418            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2419
2420        let entry = project_b
2421            .update(cx_b, |project, cx| {
2422                project
2423                    .create_entry((worktree_id, "c.txt"), false, cx)
2424                    .unwrap()
2425            })
2426            .await
2427            .unwrap();
2428        worktree_a.read_with(cx_a, |worktree, _| {
2429            assert_eq!(
2430                worktree
2431                    .paths()
2432                    .map(|p| p.to_string_lossy())
2433                    .collect::<Vec<_>>(),
2434                ["a.txt", "b.txt", "c.txt"]
2435            );
2436        });
2437        worktree_b.read_with(cx_b, |worktree, _| {
2438            assert_eq!(
2439                worktree
2440                    .paths()
2441                    .map(|p| p.to_string_lossy())
2442                    .collect::<Vec<_>>(),
2443                ["a.txt", "b.txt", "c.txt"]
2444            );
2445        });
2446
2447        project_b
2448            .update(cx_b, |project, cx| {
2449                project.rename_entry(entry.id, Path::new("d.txt"), cx)
2450            })
2451            .unwrap()
2452            .await
2453            .unwrap();
2454        worktree_a.read_with(cx_a, |worktree, _| {
2455            assert_eq!(
2456                worktree
2457                    .paths()
2458                    .map(|p| p.to_string_lossy())
2459                    .collect::<Vec<_>>(),
2460                ["a.txt", "b.txt", "d.txt"]
2461            );
2462        });
2463        worktree_b.read_with(cx_b, |worktree, _| {
2464            assert_eq!(
2465                worktree
2466                    .paths()
2467                    .map(|p| p.to_string_lossy())
2468                    .collect::<Vec<_>>(),
2469                ["a.txt", "b.txt", "d.txt"]
2470            );
2471        });
2472
2473        let dir_entry = project_b
2474            .update(cx_b, |project, cx| {
2475                project
2476                    .create_entry((worktree_id, "DIR"), true, cx)
2477                    .unwrap()
2478            })
2479            .await
2480            .unwrap();
2481        worktree_a.read_with(cx_a, |worktree, _| {
2482            assert_eq!(
2483                worktree
2484                    .paths()
2485                    .map(|p| p.to_string_lossy())
2486                    .collect::<Vec<_>>(),
2487                ["DIR", "a.txt", "b.txt", "d.txt"]
2488            );
2489        });
2490        worktree_b.read_with(cx_b, |worktree, _| {
2491            assert_eq!(
2492                worktree
2493                    .paths()
2494                    .map(|p| p.to_string_lossy())
2495                    .collect::<Vec<_>>(),
2496                ["DIR", "a.txt", "b.txt", "d.txt"]
2497            );
2498        });
2499
2500        project_b
2501            .update(cx_b, |project, cx| {
2502                project.delete_entry(dir_entry.id, cx).unwrap()
2503            })
2504            .await
2505            .unwrap();
2506        worktree_a.read_with(cx_a, |worktree, _| {
2507            assert_eq!(
2508                worktree
2509                    .paths()
2510                    .map(|p| p.to_string_lossy())
2511                    .collect::<Vec<_>>(),
2512                ["a.txt", "b.txt", "d.txt"]
2513            );
2514        });
2515        worktree_b.read_with(cx_b, |worktree, _| {
2516            assert_eq!(
2517                worktree
2518                    .paths()
2519                    .map(|p| p.to_string_lossy())
2520                    .collect::<Vec<_>>(),
2521                ["a.txt", "b.txt", "d.txt"]
2522            );
2523        });
2524
2525        project_b
2526            .update(cx_b, |project, cx| {
2527                project.delete_entry(entry.id, cx).unwrap()
2528            })
2529            .await
2530            .unwrap();
2531        worktree_a.read_with(cx_a, |worktree, _| {
2532            assert_eq!(
2533                worktree
2534                    .paths()
2535                    .map(|p| p.to_string_lossy())
2536                    .collect::<Vec<_>>(),
2537                ["a.txt", "b.txt"]
2538            );
2539        });
2540        worktree_b.read_with(cx_b, |worktree, _| {
2541            assert_eq!(
2542                worktree
2543                    .paths()
2544                    .map(|p| p.to_string_lossy())
2545                    .collect::<Vec<_>>(),
2546                ["a.txt", "b.txt"]
2547            );
2548        });
2549    }
2550
2551    #[gpui::test(iterations = 10)]
2552    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2553        cx_a.foreground().forbid_parking();
2554        let lang_registry = Arc::new(LanguageRegistry::test());
2555        let fs = FakeFs::new(cx_a.background());
2556
2557        // Connect to a server as 2 clients.
2558        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2559        let client_a = server.create_client(cx_a, "user_a").await;
2560        let mut client_b = server.create_client(cx_b, "user_b").await;
2561        server
2562            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2563            .await;
2564
2565        // Share a project as client A
2566        fs.insert_tree(
2567            "/dir",
2568            json!({
2569                "a.txt": "a-contents",
2570            }),
2571        )
2572        .await;
2573
2574        let project_a = cx_a.update(|cx| {
2575            Project::local(
2576                client_a.clone(),
2577                client_a.user_store.clone(),
2578                lang_registry.clone(),
2579                fs.clone(),
2580                cx,
2581            )
2582        });
2583        let (worktree_a, _) = project_a
2584            .update(cx_a, |p, cx| {
2585                p.find_or_create_local_worktree("/dir", true, cx)
2586            })
2587            .await
2588            .unwrap();
2589        worktree_a
2590            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2591            .await;
2592        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2593
2594        // Join that project as client B
2595        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2596
2597        // Open a buffer as client B
2598        let buffer_b = project_b
2599            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2600            .await
2601            .unwrap();
2602
2603        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2604        buffer_b.read_with(cx_b, |buf, _| {
2605            assert!(buf.is_dirty());
2606            assert!(!buf.has_conflict());
2607        });
2608
2609        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2610        buffer_b
2611            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2612            .await;
2613        buffer_b.read_with(cx_b, |buf, _| {
2614            assert!(!buf.has_conflict());
2615        });
2616
2617        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2618        buffer_b.read_with(cx_b, |buf, _| {
2619            assert!(buf.is_dirty());
2620            assert!(!buf.has_conflict());
2621        });
2622    }
2623
2624    #[gpui::test(iterations = 10)]
2625    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2626        cx_a.foreground().forbid_parking();
2627        let lang_registry = Arc::new(LanguageRegistry::test());
2628        let fs = FakeFs::new(cx_a.background());
2629
2630        // Connect to a server as 2 clients.
2631        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2632        let client_a = server.create_client(cx_a, "user_a").await;
2633        let mut client_b = server.create_client(cx_b, "user_b").await;
2634        server
2635            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2636            .await;
2637
2638        // Share a project as client A
2639        fs.insert_tree(
2640            "/dir",
2641            json!({
2642                "a.txt": "a-contents",
2643            }),
2644        )
2645        .await;
2646
2647        let project_a = cx_a.update(|cx| {
2648            Project::local(
2649                client_a.clone(),
2650                client_a.user_store.clone(),
2651                lang_registry.clone(),
2652                fs.clone(),
2653                cx,
2654            )
2655        });
2656        let (worktree_a, _) = project_a
2657            .update(cx_a, |p, cx| {
2658                p.find_or_create_local_worktree("/dir", true, cx)
2659            })
2660            .await
2661            .unwrap();
2662        worktree_a
2663            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2664            .await;
2665        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2666
2667        // Join that project as client B
2668        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2669        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2670
2671        // Open a buffer as client B
2672        let buffer_b = project_b
2673            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2674            .await
2675            .unwrap();
2676        buffer_b.read_with(cx_b, |buf, _| {
2677            assert!(!buf.is_dirty());
2678            assert!(!buf.has_conflict());
2679        });
2680
2681        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2682            .await
2683            .unwrap();
2684        buffer_b
2685            .condition(&cx_b, |buf, _| {
2686                buf.text() == "new contents" && !buf.is_dirty()
2687            })
2688            .await;
2689        buffer_b.read_with(cx_b, |buf, _| {
2690            assert!(!buf.has_conflict());
2691        });
2692    }
2693
2694    #[gpui::test(iterations = 10)]
2695    async fn test_editing_while_guest_opens_buffer(
2696        cx_a: &mut TestAppContext,
2697        cx_b: &mut TestAppContext,
2698    ) {
2699        cx_a.foreground().forbid_parking();
2700        let lang_registry = Arc::new(LanguageRegistry::test());
2701        let fs = FakeFs::new(cx_a.background());
2702
2703        // Connect to a server as 2 clients.
2704        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2705        let client_a = server.create_client(cx_a, "user_a").await;
2706        let mut client_b = server.create_client(cx_b, "user_b").await;
2707        server
2708            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2709            .await;
2710
2711        // Share a project as client A
2712        fs.insert_tree(
2713            "/dir",
2714            json!({
2715                "a.txt": "a-contents",
2716            }),
2717        )
2718        .await;
2719        let project_a = cx_a.update(|cx| {
2720            Project::local(
2721                client_a.clone(),
2722                client_a.user_store.clone(),
2723                lang_registry.clone(),
2724                fs.clone(),
2725                cx,
2726            )
2727        });
2728        let (worktree_a, _) = project_a
2729            .update(cx_a, |p, cx| {
2730                p.find_or_create_local_worktree("/dir", true, cx)
2731            })
2732            .await
2733            .unwrap();
2734        worktree_a
2735            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2736            .await;
2737        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2738
2739        // Join that project as client B
2740        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2741
2742        // Open a buffer as client A
2743        let buffer_a = project_a
2744            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2745            .await
2746            .unwrap();
2747
2748        // Start opening the same buffer as client B
2749        let buffer_b = cx_b
2750            .background()
2751            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2752
2753        // Edit the buffer as client A while client B is still opening it.
2754        cx_b.background().simulate_random_delay().await;
2755        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2756        cx_b.background().simulate_random_delay().await;
2757        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2758
2759        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2760        let buffer_b = buffer_b.await.unwrap();
2761        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2762    }
2763
2764    #[gpui::test(iterations = 10)]
2765    async fn test_leaving_worktree_while_opening_buffer(
2766        cx_a: &mut TestAppContext,
2767        cx_b: &mut TestAppContext,
2768    ) {
2769        cx_a.foreground().forbid_parking();
2770        let lang_registry = Arc::new(LanguageRegistry::test());
2771        let fs = FakeFs::new(cx_a.background());
2772
2773        // Connect to a server as 2 clients.
2774        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2775        let client_a = server.create_client(cx_a, "user_a").await;
2776        let mut client_b = server.create_client(cx_b, "user_b").await;
2777        server
2778            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2779            .await;
2780
2781        // Share a project as client A
2782        fs.insert_tree(
2783            "/dir",
2784            json!({
2785                "a.txt": "a-contents",
2786            }),
2787        )
2788        .await;
2789        let project_a = cx_a.update(|cx| {
2790            Project::local(
2791                client_a.clone(),
2792                client_a.user_store.clone(),
2793                lang_registry.clone(),
2794                fs.clone(),
2795                cx,
2796            )
2797        });
2798        let (worktree_a, _) = project_a
2799            .update(cx_a, |p, cx| {
2800                p.find_or_create_local_worktree("/dir", true, cx)
2801            })
2802            .await
2803            .unwrap();
2804        worktree_a
2805            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2806            .await;
2807        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2808
2809        // Join that project as client B
2810        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2811
2812        // See that a guest has joined as client A.
2813        project_a
2814            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2815            .await;
2816
2817        // Begin opening a buffer as client B, but leave the project before the open completes.
2818        let buffer_b = cx_b
2819            .background()
2820            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2821        cx_b.update(|_| {
2822            drop(client_b.project.take());
2823            drop(project_b);
2824        });
2825        drop(buffer_b);
2826
2827        // See that the guest has left.
2828        project_a
2829            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2830            .await;
2831    }
2832
2833    #[gpui::test(iterations = 10)]
2834    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2835        cx_a.foreground().forbid_parking();
2836        let lang_registry = Arc::new(LanguageRegistry::test());
2837        let fs = FakeFs::new(cx_a.background());
2838
2839        // Connect to a server as 2 clients.
2840        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2841        let client_a = server.create_client(cx_a, "user_a").await;
2842        let mut client_b = server.create_client(cx_b, "user_b").await;
2843        server
2844            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2845            .await;
2846
2847        // Share a project as client A
2848        fs.insert_tree(
2849            "/a",
2850            json!({
2851                "a.txt": "a-contents",
2852                "b.txt": "b-contents",
2853            }),
2854        )
2855        .await;
2856        let project_a = cx_a.update(|cx| {
2857            Project::local(
2858                client_a.clone(),
2859                client_a.user_store.clone(),
2860                lang_registry.clone(),
2861                fs.clone(),
2862                cx,
2863            )
2864        });
2865        let (worktree_a, _) = project_a
2866            .update(cx_a, |p, cx| {
2867                p.find_or_create_local_worktree("/a", true, cx)
2868            })
2869            .await
2870            .unwrap();
2871        worktree_a
2872            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2873            .await;
2874
2875        // Join that project as client B
2876        let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2877
2878        // Client A sees that a guest has joined.
2879        project_a
2880            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2881            .await;
2882
2883        // Drop client B's connection and ensure client A observes client B leaving the project.
2884        client_b.disconnect(&cx_b.to_async()).unwrap();
2885        project_a
2886            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2887            .await;
2888
2889        // Rejoin the project as client B
2890        let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2891
2892        // Client A sees that a guest has re-joined.
2893        project_a
2894            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2895            .await;
2896
2897        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2898        client_b.wait_for_current_user(cx_b).await;
2899        server.disconnect_client(client_b.current_user_id(cx_b));
2900        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2901        project_a
2902            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2903            .await;
2904    }
2905
2906    #[gpui::test(iterations = 10)]
2907    async fn test_collaborating_with_diagnostics(
2908        deterministic: Arc<Deterministic>,
2909        cx_a: &mut TestAppContext,
2910        cx_b: &mut TestAppContext,
2911        cx_c: &mut TestAppContext,
2912    ) {
2913        deterministic.forbid_parking();
2914        let lang_registry = Arc::new(LanguageRegistry::test());
2915        let fs = FakeFs::new(cx_a.background());
2916
2917        // Set up a fake language server.
2918        let mut language = Language::new(
2919            LanguageConfig {
2920                name: "Rust".into(),
2921                path_suffixes: vec!["rs".to_string()],
2922                ..Default::default()
2923            },
2924            Some(tree_sitter_rust::language()),
2925        );
2926        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2927        lang_registry.add(Arc::new(language));
2928
2929        // Connect to a server as 2 clients.
2930        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2931        let client_a = server.create_client(cx_a, "user_a").await;
2932        let mut client_b = server.create_client(cx_b, "user_b").await;
2933        let mut client_c = server.create_client(cx_c, "user_c").await;
2934        server
2935            .make_contacts(vec![
2936                (&client_a, cx_a),
2937                (&client_b, cx_b),
2938                (&client_c, cx_c),
2939            ])
2940            .await;
2941
2942        // Share a project as client A
2943        fs.insert_tree(
2944            "/a",
2945            json!({
2946                "a.rs": "let one = two",
2947                "other.rs": "",
2948            }),
2949        )
2950        .await;
2951        let project_a = cx_a.update(|cx| {
2952            Project::local(
2953                client_a.clone(),
2954                client_a.user_store.clone(),
2955                lang_registry.clone(),
2956                fs.clone(),
2957                cx,
2958            )
2959        });
2960        let (worktree_a, _) = project_a
2961            .update(cx_a, |p, cx| {
2962                p.find_or_create_local_worktree("/a", true, cx)
2963            })
2964            .await
2965            .unwrap();
2966        worktree_a
2967            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2968            .await;
2969        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2970        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2971
2972        // Cause the language server to start.
2973        let _buffer = cx_a
2974            .background()
2975            .spawn(project_a.update(cx_a, |project, cx| {
2976                project.open_buffer(
2977                    ProjectPath {
2978                        worktree_id,
2979                        path: Path::new("other.rs").into(),
2980                    },
2981                    cx,
2982                )
2983            }))
2984            .await
2985            .unwrap();
2986
2987        // Join the worktree as client B.
2988        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2989
2990        // Simulate a language server reporting errors for a file.
2991        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2992        fake_language_server
2993            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2994            .await;
2995        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2996            lsp::PublishDiagnosticsParams {
2997                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2998                version: None,
2999                diagnostics: vec![lsp::Diagnostic {
3000                    severity: Some(lsp::DiagnosticSeverity::ERROR),
3001                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
3002                    message: "message 1".to_string(),
3003                    ..Default::default()
3004                }],
3005            },
3006        );
3007
3008        // Wait for server to see the diagnostics update.
3009        deterministic.run_until_parked();
3010        {
3011            let store = server.store.read().await;
3012            let project = store.project(project_id).unwrap();
3013            let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap();
3014            assert!(!worktree.diagnostic_summaries.is_empty());
3015        }
3016
3017        // Ensure client B observes the new diagnostics.
3018        project_b.read_with(cx_b, |project, cx| {
3019            assert_eq!(
3020                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3021                &[(
3022                    ProjectPath {
3023                        worktree_id,
3024                        path: Arc::from(Path::new("a.rs")),
3025                    },
3026                    DiagnosticSummary {
3027                        error_count: 1,
3028                        warning_count: 0,
3029                        ..Default::default()
3030                    },
3031                )]
3032            )
3033        });
3034
3035        // Join project as client C and observe the diagnostics.
3036        let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
3037        project_c.read_with(cx_c, |project, cx| {
3038            assert_eq!(
3039                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3040                &[(
3041                    ProjectPath {
3042                        worktree_id,
3043                        path: Arc::from(Path::new("a.rs")),
3044                    },
3045                    DiagnosticSummary {
3046                        error_count: 1,
3047                        warning_count: 0,
3048                        ..Default::default()
3049                    },
3050                )]
3051            )
3052        });
3053
3054        // Simulate a language server reporting more errors for a file.
3055        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3056            lsp::PublishDiagnosticsParams {
3057                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3058                version: None,
3059                diagnostics: vec![
3060                    lsp::Diagnostic {
3061                        severity: Some(lsp::DiagnosticSeverity::ERROR),
3062                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
3063                        message: "message 1".to_string(),
3064                        ..Default::default()
3065                    },
3066                    lsp::Diagnostic {
3067                        severity: Some(lsp::DiagnosticSeverity::WARNING),
3068                        range: lsp::Range::new(
3069                            lsp::Position::new(0, 10),
3070                            lsp::Position::new(0, 13),
3071                        ),
3072                        message: "message 2".to_string(),
3073                        ..Default::default()
3074                    },
3075                ],
3076            },
3077        );
3078
3079        // Clients B and C get the updated summaries
3080        deterministic.run_until_parked();
3081        project_b.read_with(cx_b, |project, cx| {
3082            assert_eq!(
3083                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3084                [(
3085                    ProjectPath {
3086                        worktree_id,
3087                        path: Arc::from(Path::new("a.rs")),
3088                    },
3089                    DiagnosticSummary {
3090                        error_count: 1,
3091                        warning_count: 1,
3092                        ..Default::default()
3093                    },
3094                )]
3095            );
3096        });
3097        project_c.read_with(cx_c, |project, cx| {
3098            assert_eq!(
3099                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3100                [(
3101                    ProjectPath {
3102                        worktree_id,
3103                        path: Arc::from(Path::new("a.rs")),
3104                    },
3105                    DiagnosticSummary {
3106                        error_count: 1,
3107                        warning_count: 1,
3108                        ..Default::default()
3109                    },
3110                )]
3111            );
3112        });
3113
3114        // Open the file with the errors on client B. They should be present.
3115        let buffer_b = cx_b
3116            .background()
3117            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3118            .await
3119            .unwrap();
3120
3121        buffer_b.read_with(cx_b, |buffer, _| {
3122            assert_eq!(
3123                buffer
3124                    .snapshot()
3125                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
3126                    .map(|entry| entry)
3127                    .collect::<Vec<_>>(),
3128                &[
3129                    DiagnosticEntry {
3130                        range: Point::new(0, 4)..Point::new(0, 7),
3131                        diagnostic: Diagnostic {
3132                            group_id: 0,
3133                            message: "message 1".to_string(),
3134                            severity: lsp::DiagnosticSeverity::ERROR,
3135                            is_primary: true,
3136                            ..Default::default()
3137                        }
3138                    },
3139                    DiagnosticEntry {
3140                        range: Point::new(0, 10)..Point::new(0, 13),
3141                        diagnostic: Diagnostic {
3142                            group_id: 1,
3143                            severity: lsp::DiagnosticSeverity::WARNING,
3144                            message: "message 2".to_string(),
3145                            is_primary: true,
3146                            ..Default::default()
3147                        }
3148                    }
3149                ]
3150            );
3151        });
3152
3153        // Simulate a language server reporting no errors for a file.
3154        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3155            lsp::PublishDiagnosticsParams {
3156                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3157                version: None,
3158                diagnostics: vec![],
3159            },
3160        );
3161        deterministic.run_until_parked();
3162        project_a.read_with(cx_a, |project, cx| {
3163            assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3164        });
3165        project_b.read_with(cx_b, |project, cx| {
3166            assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3167        });
3168        project_c.read_with(cx_c, |project, cx| {
3169            assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3170        });
3171    }
3172
3173    #[gpui::test(iterations = 10)]
3174    async fn test_collaborating_with_completion(
3175        cx_a: &mut TestAppContext,
3176        cx_b: &mut TestAppContext,
3177    ) {
3178        cx_a.foreground().forbid_parking();
3179        let lang_registry = Arc::new(LanguageRegistry::test());
3180        let fs = FakeFs::new(cx_a.background());
3181
3182        // Set up a fake language server.
3183        let mut language = Language::new(
3184            LanguageConfig {
3185                name: "Rust".into(),
3186                path_suffixes: vec!["rs".to_string()],
3187                ..Default::default()
3188            },
3189            Some(tree_sitter_rust::language()),
3190        );
3191        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3192            capabilities: lsp::ServerCapabilities {
3193                completion_provider: Some(lsp::CompletionOptions {
3194                    trigger_characters: Some(vec![".".to_string()]),
3195                    ..Default::default()
3196                }),
3197                ..Default::default()
3198            },
3199            ..Default::default()
3200        });
3201        lang_registry.add(Arc::new(language));
3202
3203        // Connect to a server as 2 clients.
3204        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3205        let client_a = server.create_client(cx_a, "user_a").await;
3206        let mut client_b = server.create_client(cx_b, "user_b").await;
3207        server
3208            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3209            .await;
3210
3211        // Share a project as client A
3212        fs.insert_tree(
3213            "/a",
3214            json!({
3215                "main.rs": "fn main() { a }",
3216                "other.rs": "",
3217            }),
3218        )
3219        .await;
3220        let project_a = cx_a.update(|cx| {
3221            Project::local(
3222                client_a.clone(),
3223                client_a.user_store.clone(),
3224                lang_registry.clone(),
3225                fs.clone(),
3226                cx,
3227            )
3228        });
3229        let (worktree_a, _) = project_a
3230            .update(cx_a, |p, cx| {
3231                p.find_or_create_local_worktree("/a", true, cx)
3232            })
3233            .await
3234            .unwrap();
3235        worktree_a
3236            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3237            .await;
3238        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3239
3240        // Join the worktree as client B.
3241        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3242
3243        // Open a file in an editor as the guest.
3244        let buffer_b = project_b
3245            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3246            .await
3247            .unwrap();
3248        let (window_b, _) = cx_b.add_window(|_| EmptyView);
3249        let editor_b = cx_b.add_view(window_b, |cx| {
3250            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
3251        });
3252
3253        let fake_language_server = fake_language_servers.next().await.unwrap();
3254        buffer_b
3255            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3256            .await;
3257
3258        // Type a completion trigger character as the guest.
3259        editor_b.update(cx_b, |editor, cx| {
3260            editor.change_selections(None, cx, |s| s.select_ranges([13..13]));
3261            editor.handle_input(&Input(".".into()), cx);
3262            cx.focus(&editor_b);
3263        });
3264
3265        // Receive a completion request as the host's language server.
3266        // Return some completions from the host's language server.
3267        cx_a.foreground().start_waiting();
3268        fake_language_server
3269            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3270                assert_eq!(
3271                    params.text_document_position.text_document.uri,
3272                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3273                );
3274                assert_eq!(
3275                    params.text_document_position.position,
3276                    lsp::Position::new(0, 14),
3277                );
3278
3279                Ok(Some(lsp::CompletionResponse::Array(vec![
3280                    lsp::CompletionItem {
3281                        label: "first_method(…)".into(),
3282                        detail: Some("fn(&mut self, B) -> C".into()),
3283                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3284                            new_text: "first_method($1)".to_string(),
3285                            range: lsp::Range::new(
3286                                lsp::Position::new(0, 14),
3287                                lsp::Position::new(0, 14),
3288                            ),
3289                        })),
3290                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3291                        ..Default::default()
3292                    },
3293                    lsp::CompletionItem {
3294                        label: "second_method(…)".into(),
3295                        detail: Some("fn(&mut self, C) -> D<E>".into()),
3296                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3297                            new_text: "second_method()".to_string(),
3298                            range: lsp::Range::new(
3299                                lsp::Position::new(0, 14),
3300                                lsp::Position::new(0, 14),
3301                            ),
3302                        })),
3303                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3304                        ..Default::default()
3305                    },
3306                ])))
3307            })
3308            .next()
3309            .await
3310            .unwrap();
3311        cx_a.foreground().finish_waiting();
3312
3313        // Open the buffer on the host.
3314        let buffer_a = project_a
3315            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3316            .await
3317            .unwrap();
3318        buffer_a
3319            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3320            .await;
3321
3322        // Confirm a completion on the guest.
3323        editor_b
3324            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3325            .await;
3326        editor_b.update(cx_b, |editor, cx| {
3327            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3328            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3329        });
3330
3331        // Return a resolved completion from the host's language server.
3332        // The resolved completion has an additional text edit.
3333        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3334            |params, _| async move {
3335                assert_eq!(params.label, "first_method(…)");
3336                Ok(lsp::CompletionItem {
3337                    label: "first_method(…)".into(),
3338                    detail: Some("fn(&mut self, B) -> C".into()),
3339                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3340                        new_text: "first_method($1)".to_string(),
3341                        range: lsp::Range::new(
3342                            lsp::Position::new(0, 14),
3343                            lsp::Position::new(0, 14),
3344                        ),
3345                    })),
3346                    additional_text_edits: Some(vec![lsp::TextEdit {
3347                        new_text: "use d::SomeTrait;\n".to_string(),
3348                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3349                    }]),
3350                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3351                    ..Default::default()
3352                })
3353            },
3354        );
3355
3356        // The additional edit is applied.
3357        buffer_a
3358            .condition(&cx_a, |buffer, _| {
3359                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3360            })
3361            .await;
3362        buffer_b
3363            .condition(&cx_b, |buffer, _| {
3364                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3365            })
3366            .await;
3367    }
3368
3369    #[gpui::test(iterations = 10)]
3370    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3371        cx_a.foreground().forbid_parking();
3372        let lang_registry = Arc::new(LanguageRegistry::test());
3373        let fs = FakeFs::new(cx_a.background());
3374
3375        // Connect to a server as 2 clients.
3376        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3377        let client_a = server.create_client(cx_a, "user_a").await;
3378        let mut client_b = server.create_client(cx_b, "user_b").await;
3379        server
3380            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3381            .await;
3382
3383        // Share a project as client A
3384        fs.insert_tree(
3385            "/a",
3386            json!({
3387                "a.rs": "let one = 1;",
3388            }),
3389        )
3390        .await;
3391        let project_a = cx_a.update(|cx| {
3392            Project::local(
3393                client_a.clone(),
3394                client_a.user_store.clone(),
3395                lang_registry.clone(),
3396                fs.clone(),
3397                cx,
3398            )
3399        });
3400        let (worktree_a, _) = project_a
3401            .update(cx_a, |p, cx| {
3402                p.find_or_create_local_worktree("/a", true, cx)
3403            })
3404            .await
3405            .unwrap();
3406        worktree_a
3407            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3408            .await;
3409        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3410        let buffer_a = project_a
3411            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3412            .await
3413            .unwrap();
3414
3415        // Join the worktree as client B.
3416        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3417
3418        let buffer_b = cx_b
3419            .background()
3420            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3421            .await
3422            .unwrap();
3423        buffer_b.update(cx_b, |buffer, cx| {
3424            buffer.edit([(4..7, "six")], cx);
3425            buffer.edit([(10..11, "6")], cx);
3426            assert_eq!(buffer.text(), "let six = 6;");
3427            assert!(buffer.is_dirty());
3428            assert!(!buffer.has_conflict());
3429        });
3430        buffer_a
3431            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3432            .await;
3433
3434        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3435            .await
3436            .unwrap();
3437        buffer_a
3438            .condition(cx_a, |buffer, _| buffer.has_conflict())
3439            .await;
3440        buffer_b
3441            .condition(cx_b, |buffer, _| buffer.has_conflict())
3442            .await;
3443
3444        project_b
3445            .update(cx_b, |project, cx| {
3446                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3447            })
3448            .await
3449            .unwrap();
3450        buffer_a.read_with(cx_a, |buffer, _| {
3451            assert_eq!(buffer.text(), "let seven = 7;");
3452            assert!(!buffer.is_dirty());
3453            assert!(!buffer.has_conflict());
3454        });
3455        buffer_b.read_with(cx_b, |buffer, _| {
3456            assert_eq!(buffer.text(), "let seven = 7;");
3457            assert!(!buffer.is_dirty());
3458            assert!(!buffer.has_conflict());
3459        });
3460
3461        buffer_a.update(cx_a, |buffer, cx| {
3462            // Undoing on the host is a no-op when the reload was initiated by the guest.
3463            buffer.undo(cx);
3464            assert_eq!(buffer.text(), "let seven = 7;");
3465            assert!(!buffer.is_dirty());
3466            assert!(!buffer.has_conflict());
3467        });
3468        buffer_b.update(cx_b, |buffer, cx| {
3469            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3470            buffer.undo(cx);
3471            assert_eq!(buffer.text(), "let six = 6;");
3472            assert!(buffer.is_dirty());
3473            assert!(!buffer.has_conflict());
3474        });
3475    }
3476
3477    #[gpui::test(iterations = 10)]
3478    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3479        cx_a.foreground().forbid_parking();
3480        let lang_registry = Arc::new(LanguageRegistry::test());
3481        let fs = FakeFs::new(cx_a.background());
3482
3483        // Set up a fake language server.
3484        let mut language = Language::new(
3485            LanguageConfig {
3486                name: "Rust".into(),
3487                path_suffixes: vec!["rs".to_string()],
3488                ..Default::default()
3489            },
3490            Some(tree_sitter_rust::language()),
3491        );
3492        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3493        lang_registry.add(Arc::new(language));
3494
3495        // Connect to a server as 2 clients.
3496        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3497        let client_a = server.create_client(cx_a, "user_a").await;
3498        let mut client_b = server.create_client(cx_b, "user_b").await;
3499        server
3500            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3501            .await;
3502
3503        // Share a project as client A
3504        fs.insert_tree(
3505            "/a",
3506            json!({
3507                "a.rs": "let one = two",
3508            }),
3509        )
3510        .await;
3511        let project_a = cx_a.update(|cx| {
3512            Project::local(
3513                client_a.clone(),
3514                client_a.user_store.clone(),
3515                lang_registry.clone(),
3516                fs.clone(),
3517                cx,
3518            )
3519        });
3520        let (worktree_a, _) = project_a
3521            .update(cx_a, |p, cx| {
3522                p.find_or_create_local_worktree("/a", true, cx)
3523            })
3524            .await
3525            .unwrap();
3526        worktree_a
3527            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3528            .await;
3529        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3530
3531        // Join the project as client B.
3532        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3533
3534        let buffer_b = cx_b
3535            .background()
3536            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3537            .await
3538            .unwrap();
3539
3540        let fake_language_server = fake_language_servers.next().await.unwrap();
3541        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3542            Ok(Some(vec![
3543                lsp::TextEdit {
3544                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3545                    new_text: "h".to_string(),
3546                },
3547                lsp::TextEdit {
3548                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3549                    new_text: "y".to_string(),
3550                },
3551            ]))
3552        });
3553
3554        project_b
3555            .update(cx_b, |project, cx| {
3556                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3557            })
3558            .await
3559            .unwrap();
3560        assert_eq!(
3561            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3562            "let honey = two"
3563        );
3564    }
3565
3566    #[gpui::test(iterations = 10)]
3567    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3568        cx_a.foreground().forbid_parking();
3569        let lang_registry = Arc::new(LanguageRegistry::test());
3570        let fs = FakeFs::new(cx_a.background());
3571        fs.insert_tree(
3572            "/root-1",
3573            json!({
3574                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3575            }),
3576        )
3577        .await;
3578        fs.insert_tree(
3579            "/root-2",
3580            json!({
3581                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3582            }),
3583        )
3584        .await;
3585
3586        // Set up a fake language server.
3587        let mut language = Language::new(
3588            LanguageConfig {
3589                name: "Rust".into(),
3590                path_suffixes: vec!["rs".to_string()],
3591                ..Default::default()
3592            },
3593            Some(tree_sitter_rust::language()),
3594        );
3595        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3596        lang_registry.add(Arc::new(language));
3597
3598        // Connect to a server as 2 clients.
3599        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3600        let client_a = server.create_client(cx_a, "user_a").await;
3601        let mut client_b = server.create_client(cx_b, "user_b").await;
3602        server
3603            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3604            .await;
3605
3606        // Share a project as client A
3607        let project_a = cx_a.update(|cx| {
3608            Project::local(
3609                client_a.clone(),
3610                client_a.user_store.clone(),
3611                lang_registry.clone(),
3612                fs.clone(),
3613                cx,
3614            )
3615        });
3616        let (worktree_a, _) = project_a
3617            .update(cx_a, |p, cx| {
3618                p.find_or_create_local_worktree("/root-1", true, cx)
3619            })
3620            .await
3621            .unwrap();
3622        worktree_a
3623            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3624            .await;
3625        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3626
3627        // Join the project as client B.
3628        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3629
3630        // Open the file on client B.
3631        let buffer_b = cx_b
3632            .background()
3633            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3634            .await
3635            .unwrap();
3636
3637        // Request the definition of a symbol as the guest.
3638        let fake_language_server = fake_language_servers.next().await.unwrap();
3639        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3640            |_, _| async move {
3641                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3642                    lsp::Location::new(
3643                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3644                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3645                    ),
3646                )))
3647            },
3648        );
3649
3650        let definitions_1 = project_b
3651            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3652            .await
3653            .unwrap();
3654        cx_b.read(|cx| {
3655            assert_eq!(definitions_1.len(), 1);
3656            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3657            let target_buffer = definitions_1[0].buffer.read(cx);
3658            assert_eq!(
3659                target_buffer.text(),
3660                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3661            );
3662            assert_eq!(
3663                definitions_1[0].range.to_point(target_buffer),
3664                Point::new(0, 6)..Point::new(0, 9)
3665            );
3666        });
3667
3668        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3669        // the previous call to `definition`.
3670        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3671            |_, _| async move {
3672                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3673                    lsp::Location::new(
3674                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3675                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3676                    ),
3677                )))
3678            },
3679        );
3680
3681        let definitions_2 = project_b
3682            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3683            .await
3684            .unwrap();
3685        cx_b.read(|cx| {
3686            assert_eq!(definitions_2.len(), 1);
3687            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3688            let target_buffer = definitions_2[0].buffer.read(cx);
3689            assert_eq!(
3690                target_buffer.text(),
3691                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3692            );
3693            assert_eq!(
3694                definitions_2[0].range.to_point(target_buffer),
3695                Point::new(1, 6)..Point::new(1, 11)
3696            );
3697        });
3698        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3699    }
3700
3701    #[gpui::test(iterations = 10)]
3702    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3703        cx_a.foreground().forbid_parking();
3704        let lang_registry = Arc::new(LanguageRegistry::test());
3705        let fs = FakeFs::new(cx_a.background());
3706        fs.insert_tree(
3707            "/root-1",
3708            json!({
3709                "one.rs": "const ONE: usize = 1;",
3710                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3711            }),
3712        )
3713        .await;
3714        fs.insert_tree(
3715            "/root-2",
3716            json!({
3717                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3718            }),
3719        )
3720        .await;
3721
3722        // Set up a fake language server.
3723        let mut language = Language::new(
3724            LanguageConfig {
3725                name: "Rust".into(),
3726                path_suffixes: vec!["rs".to_string()],
3727                ..Default::default()
3728            },
3729            Some(tree_sitter_rust::language()),
3730        );
3731        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3732        lang_registry.add(Arc::new(language));
3733
3734        // Connect to a server as 2 clients.
3735        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3736        let client_a = server.create_client(cx_a, "user_a").await;
3737        let mut client_b = server.create_client(cx_b, "user_b").await;
3738        server
3739            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3740            .await;
3741
3742        // Share a project as client A
3743        let project_a = cx_a.update(|cx| {
3744            Project::local(
3745                client_a.clone(),
3746                client_a.user_store.clone(),
3747                lang_registry.clone(),
3748                fs.clone(),
3749                cx,
3750            )
3751        });
3752        let (worktree_a, _) = project_a
3753            .update(cx_a, |p, cx| {
3754                p.find_or_create_local_worktree("/root-1", true, cx)
3755            })
3756            .await
3757            .unwrap();
3758        worktree_a
3759            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3760            .await;
3761        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3762
3763        // Join the worktree as client B.
3764        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3765
3766        // Open the file on client B.
3767        let buffer_b = cx_b
3768            .background()
3769            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3770            .await
3771            .unwrap();
3772
3773        // Request references to a symbol as the guest.
3774        let fake_language_server = fake_language_servers.next().await.unwrap();
3775        fake_language_server.handle_request::<lsp::request::References, _, _>(
3776            |params, _| async move {
3777                assert_eq!(
3778                    params.text_document_position.text_document.uri.as_str(),
3779                    "file:///root-1/one.rs"
3780                );
3781                Ok(Some(vec![
3782                    lsp::Location {
3783                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3784                        range: lsp::Range::new(
3785                            lsp::Position::new(0, 24),
3786                            lsp::Position::new(0, 27),
3787                        ),
3788                    },
3789                    lsp::Location {
3790                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3791                        range: lsp::Range::new(
3792                            lsp::Position::new(0, 35),
3793                            lsp::Position::new(0, 38),
3794                        ),
3795                    },
3796                    lsp::Location {
3797                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3798                        range: lsp::Range::new(
3799                            lsp::Position::new(0, 37),
3800                            lsp::Position::new(0, 40),
3801                        ),
3802                    },
3803                ]))
3804            },
3805        );
3806
3807        let references = project_b
3808            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3809            .await
3810            .unwrap();
3811        cx_b.read(|cx| {
3812            assert_eq!(references.len(), 3);
3813            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3814
3815            let two_buffer = references[0].buffer.read(cx);
3816            let three_buffer = references[2].buffer.read(cx);
3817            assert_eq!(
3818                two_buffer.file().unwrap().path().as_ref(),
3819                Path::new("two.rs")
3820            );
3821            assert_eq!(references[1].buffer, references[0].buffer);
3822            assert_eq!(
3823                three_buffer.file().unwrap().full_path(cx),
3824                Path::new("three.rs")
3825            );
3826
3827            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3828            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3829            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3830        });
3831    }
3832
3833    #[gpui::test(iterations = 10)]
3834    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3835        cx_a.foreground().forbid_parking();
3836        let lang_registry = Arc::new(LanguageRegistry::test());
3837        let fs = FakeFs::new(cx_a.background());
3838        fs.insert_tree(
3839            "/root-1",
3840            json!({
3841                "a": "hello world",
3842                "b": "goodnight moon",
3843                "c": "a world of goo",
3844                "d": "world champion of clown world",
3845            }),
3846        )
3847        .await;
3848        fs.insert_tree(
3849            "/root-2",
3850            json!({
3851                "e": "disney world is fun",
3852            }),
3853        )
3854        .await;
3855
3856        // Connect to a server as 2 clients.
3857        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3858        let client_a = server.create_client(cx_a, "user_a").await;
3859        let mut client_b = server.create_client(cx_b, "user_b").await;
3860        server
3861            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3862            .await;
3863
3864        // Share a project as client A
3865        let project_a = cx_a.update(|cx| {
3866            Project::local(
3867                client_a.clone(),
3868                client_a.user_store.clone(),
3869                lang_registry.clone(),
3870                fs.clone(),
3871                cx,
3872            )
3873        });
3874
3875        let (worktree_1, _) = project_a
3876            .update(cx_a, |p, cx| {
3877                p.find_or_create_local_worktree("/root-1", true, cx)
3878            })
3879            .await
3880            .unwrap();
3881        worktree_1
3882            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3883            .await;
3884        let (worktree_2, _) = project_a
3885            .update(cx_a, |p, cx| {
3886                p.find_or_create_local_worktree("/root-2", true, cx)
3887            })
3888            .await
3889            .unwrap();
3890        worktree_2
3891            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3892            .await;
3893
3894        // Join the worktree as client B.
3895        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3896        let results = project_b
3897            .update(cx_b, |project, cx| {
3898                project.search(SearchQuery::text("world", false, false), cx)
3899            })
3900            .await
3901            .unwrap();
3902
3903        let mut ranges_by_path = results
3904            .into_iter()
3905            .map(|(buffer, ranges)| {
3906                buffer.read_with(cx_b, |buffer, cx| {
3907                    let path = buffer.file().unwrap().full_path(cx);
3908                    let offset_ranges = ranges
3909                        .into_iter()
3910                        .map(|range| range.to_offset(buffer))
3911                        .collect::<Vec<_>>();
3912                    (path, offset_ranges)
3913                })
3914            })
3915            .collect::<Vec<_>>();
3916        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3917
3918        assert_eq!(
3919            ranges_by_path,
3920            &[
3921                (PathBuf::from("root-1/a"), vec![6..11]),
3922                (PathBuf::from("root-1/c"), vec![2..7]),
3923                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3924                (PathBuf::from("root-2/e"), vec![7..12]),
3925            ]
3926        );
3927    }
3928
3929    #[gpui::test(iterations = 10)]
3930    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3931        cx_a.foreground().forbid_parking();
3932        let lang_registry = Arc::new(LanguageRegistry::test());
3933        let fs = FakeFs::new(cx_a.background());
3934        fs.insert_tree(
3935            "/root-1",
3936            json!({
3937                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3938            }),
3939        )
3940        .await;
3941
3942        // Set up a fake language server.
3943        let mut language = Language::new(
3944            LanguageConfig {
3945                name: "Rust".into(),
3946                path_suffixes: vec!["rs".to_string()],
3947                ..Default::default()
3948            },
3949            Some(tree_sitter_rust::language()),
3950        );
3951        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3952        lang_registry.add(Arc::new(language));
3953
3954        // Connect to a server as 2 clients.
3955        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3956        let client_a = server.create_client(cx_a, "user_a").await;
3957        let mut client_b = server.create_client(cx_b, "user_b").await;
3958        server
3959            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3960            .await;
3961
3962        // Share a project as client A
3963        let project_a = cx_a.update(|cx| {
3964            Project::local(
3965                client_a.clone(),
3966                client_a.user_store.clone(),
3967                lang_registry.clone(),
3968                fs.clone(),
3969                cx,
3970            )
3971        });
3972        let (worktree_a, _) = project_a
3973            .update(cx_a, |p, cx| {
3974                p.find_or_create_local_worktree("/root-1", true, cx)
3975            })
3976            .await
3977            .unwrap();
3978        worktree_a
3979            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3980            .await;
3981        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3982
3983        // Join the worktree as client B.
3984        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3985
3986        // Open the file on client B.
3987        let buffer_b = cx_b
3988            .background()
3989            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3990            .await
3991            .unwrap();
3992
3993        // Request document highlights as the guest.
3994        let fake_language_server = fake_language_servers.next().await.unwrap();
3995        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3996            |params, _| async move {
3997                assert_eq!(
3998                    params
3999                        .text_document_position_params
4000                        .text_document
4001                        .uri
4002                        .as_str(),
4003                    "file:///root-1/main.rs"
4004                );
4005                assert_eq!(
4006                    params.text_document_position_params.position,
4007                    lsp::Position::new(0, 34)
4008                );
4009                Ok(Some(vec![
4010                    lsp::DocumentHighlight {
4011                        kind: Some(lsp::DocumentHighlightKind::WRITE),
4012                        range: lsp::Range::new(
4013                            lsp::Position::new(0, 10),
4014                            lsp::Position::new(0, 16),
4015                        ),
4016                    },
4017                    lsp::DocumentHighlight {
4018                        kind: Some(lsp::DocumentHighlightKind::READ),
4019                        range: lsp::Range::new(
4020                            lsp::Position::new(0, 32),
4021                            lsp::Position::new(0, 38),
4022                        ),
4023                    },
4024                    lsp::DocumentHighlight {
4025                        kind: Some(lsp::DocumentHighlightKind::READ),
4026                        range: lsp::Range::new(
4027                            lsp::Position::new(0, 41),
4028                            lsp::Position::new(0, 47),
4029                        ),
4030                    },
4031                ]))
4032            },
4033        );
4034
4035        let highlights = project_b
4036            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
4037            .await
4038            .unwrap();
4039        buffer_b.read_with(cx_b, |buffer, _| {
4040            let snapshot = buffer.snapshot();
4041
4042            let highlights = highlights
4043                .into_iter()
4044                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
4045                .collect::<Vec<_>>();
4046            assert_eq!(
4047                highlights,
4048                &[
4049                    (lsp::DocumentHighlightKind::WRITE, 10..16),
4050                    (lsp::DocumentHighlightKind::READ, 32..38),
4051                    (lsp::DocumentHighlightKind::READ, 41..47)
4052                ]
4053            )
4054        });
4055    }
4056
4057    #[gpui::test(iterations = 10)]
4058    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4059        cx_a.foreground().forbid_parking();
4060        let lang_registry = Arc::new(LanguageRegistry::test());
4061        let fs = FakeFs::new(cx_a.background());
4062        fs.insert_tree(
4063            "/code",
4064            json!({
4065                "crate-1": {
4066                    "one.rs": "const ONE: usize = 1;",
4067                },
4068                "crate-2": {
4069                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
4070                },
4071                "private": {
4072                    "passwords.txt": "the-password",
4073                }
4074            }),
4075        )
4076        .await;
4077
4078        // Set up a fake language server.
4079        let mut language = Language::new(
4080            LanguageConfig {
4081                name: "Rust".into(),
4082                path_suffixes: vec!["rs".to_string()],
4083                ..Default::default()
4084            },
4085            Some(tree_sitter_rust::language()),
4086        );
4087        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4088        lang_registry.add(Arc::new(language));
4089
4090        // Connect to a server as 2 clients.
4091        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4092        let client_a = server.create_client(cx_a, "user_a").await;
4093        let mut client_b = server.create_client(cx_b, "user_b").await;
4094        server
4095            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4096            .await;
4097
4098        // Share a project as client A
4099        let project_a = cx_a.update(|cx| {
4100            Project::local(
4101                client_a.clone(),
4102                client_a.user_store.clone(),
4103                lang_registry.clone(),
4104                fs.clone(),
4105                cx,
4106            )
4107        });
4108        let (worktree_a, _) = project_a
4109            .update(cx_a, |p, cx| {
4110                p.find_or_create_local_worktree("/code/crate-1", true, cx)
4111            })
4112            .await
4113            .unwrap();
4114        worktree_a
4115            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4116            .await;
4117        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4118
4119        // Join the worktree as client B.
4120        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4121
4122        // Cause the language server to start.
4123        let _buffer = cx_b
4124            .background()
4125            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
4126            .await
4127            .unwrap();
4128
4129        let fake_language_server = fake_language_servers.next().await.unwrap();
4130        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
4131            |_, _| async move {
4132                #[allow(deprecated)]
4133                Ok(Some(vec![lsp::SymbolInformation {
4134                    name: "TWO".into(),
4135                    location: lsp::Location {
4136                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
4137                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4138                    },
4139                    kind: lsp::SymbolKind::CONSTANT,
4140                    tags: None,
4141                    container_name: None,
4142                    deprecated: None,
4143                }]))
4144            },
4145        );
4146
4147        // Request the definition of a symbol as the guest.
4148        let symbols = project_b
4149            .update(cx_b, |p, cx| p.symbols("two", cx))
4150            .await
4151            .unwrap();
4152        assert_eq!(symbols.len(), 1);
4153        assert_eq!(symbols[0].name, "TWO");
4154
4155        // Open one of the returned symbols.
4156        let buffer_b_2 = project_b
4157            .update(cx_b, |project, cx| {
4158                project.open_buffer_for_symbol(&symbols[0], cx)
4159            })
4160            .await
4161            .unwrap();
4162        buffer_b_2.read_with(cx_b, |buffer, _| {
4163            assert_eq!(
4164                buffer.file().unwrap().path().as_ref(),
4165                Path::new("../crate-2/two.rs")
4166            );
4167        });
4168
4169        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
4170        let mut fake_symbol = symbols[0].clone();
4171        fake_symbol.path = Path::new("/code/secrets").into();
4172        let error = project_b
4173            .update(cx_b, |project, cx| {
4174                project.open_buffer_for_symbol(&fake_symbol, cx)
4175            })
4176            .await
4177            .unwrap_err();
4178        assert!(error.to_string().contains("invalid symbol signature"));
4179    }
4180
4181    #[gpui::test(iterations = 10)]
4182    async fn test_open_buffer_while_getting_definition_pointing_to_it(
4183        cx_a: &mut TestAppContext,
4184        cx_b: &mut TestAppContext,
4185        mut rng: StdRng,
4186    ) {
4187        cx_a.foreground().forbid_parking();
4188        let lang_registry = Arc::new(LanguageRegistry::test());
4189        let fs = FakeFs::new(cx_a.background());
4190        fs.insert_tree(
4191            "/root",
4192            json!({
4193                "a.rs": "const ONE: usize = b::TWO;",
4194                "b.rs": "const TWO: usize = 2",
4195            }),
4196        )
4197        .await;
4198
4199        // Set up a fake language server.
4200        let mut language = Language::new(
4201            LanguageConfig {
4202                name: "Rust".into(),
4203                path_suffixes: vec!["rs".to_string()],
4204                ..Default::default()
4205            },
4206            Some(tree_sitter_rust::language()),
4207        );
4208        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4209        lang_registry.add(Arc::new(language));
4210
4211        // Connect to a server as 2 clients.
4212        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4213        let client_a = server.create_client(cx_a, "user_a").await;
4214        let mut client_b = server.create_client(cx_b, "user_b").await;
4215        server
4216            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4217            .await;
4218
4219        // Share a project as client A
4220        let project_a = cx_a.update(|cx| {
4221            Project::local(
4222                client_a.clone(),
4223                client_a.user_store.clone(),
4224                lang_registry.clone(),
4225                fs.clone(),
4226                cx,
4227            )
4228        });
4229
4230        let (worktree_a, _) = project_a
4231            .update(cx_a, |p, cx| {
4232                p.find_or_create_local_worktree("/root", true, cx)
4233            })
4234            .await
4235            .unwrap();
4236        worktree_a
4237            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4238            .await;
4239        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4240
4241        // Join the project as client B.
4242        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4243
4244        let buffer_b1 = cx_b
4245            .background()
4246            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4247            .await
4248            .unwrap();
4249
4250        let fake_language_server = fake_language_servers.next().await.unwrap();
4251        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4252            |_, _| async move {
4253                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4254                    lsp::Location::new(
4255                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
4256                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4257                    ),
4258                )))
4259            },
4260        );
4261
4262        let definitions;
4263        let buffer_b2;
4264        if rng.gen() {
4265            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4266            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4267        } else {
4268            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4269            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4270        }
4271
4272        let buffer_b2 = buffer_b2.await.unwrap();
4273        let definitions = definitions.await.unwrap();
4274        assert_eq!(definitions.len(), 1);
4275        assert_eq!(definitions[0].buffer, buffer_b2);
4276    }
4277
4278    #[gpui::test(iterations = 10)]
4279    async fn test_collaborating_with_code_actions(
4280        cx_a: &mut TestAppContext,
4281        cx_b: &mut TestAppContext,
4282    ) {
4283        cx_a.foreground().forbid_parking();
4284        let lang_registry = Arc::new(LanguageRegistry::test());
4285        let fs = FakeFs::new(cx_a.background());
4286        cx_b.update(|cx| editor::init(cx));
4287
4288        // Set up a fake language server.
4289        let mut language = Language::new(
4290            LanguageConfig {
4291                name: "Rust".into(),
4292                path_suffixes: vec!["rs".to_string()],
4293                ..Default::default()
4294            },
4295            Some(tree_sitter_rust::language()),
4296        );
4297        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4298        lang_registry.add(Arc::new(language));
4299
4300        // Connect to a server as 2 clients.
4301        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4302        let client_a = server.create_client(cx_a, "user_a").await;
4303        let mut client_b = server.create_client(cx_b, "user_b").await;
4304        server
4305            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4306            .await;
4307
4308        // Share a project as client A
4309        fs.insert_tree(
4310            "/a",
4311            json!({
4312                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4313                "other.rs": "pub fn foo() -> usize { 4 }",
4314            }),
4315        )
4316        .await;
4317        let project_a = cx_a.update(|cx| {
4318            Project::local(
4319                client_a.clone(),
4320                client_a.user_store.clone(),
4321                lang_registry.clone(),
4322                fs.clone(),
4323                cx,
4324            )
4325        });
4326        let (worktree_a, _) = project_a
4327            .update(cx_a, |p, cx| {
4328                p.find_or_create_local_worktree("/a", true, cx)
4329            })
4330            .await
4331            .unwrap();
4332        worktree_a
4333            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4334            .await;
4335        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4336
4337        // Join the project as client B.
4338        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4339        let mut params = cx_b.update(WorkspaceParams::test);
4340        params.languages = lang_registry.clone();
4341        params.project = project_b.clone();
4342        params.client = client_b.client.clone();
4343        params.user_store = client_b.user_store.clone();
4344
4345        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4346        let editor_b = workspace_b
4347            .update(cx_b, |workspace, cx| {
4348                workspace.open_path((worktree_id, "main.rs"), true, cx)
4349            })
4350            .await
4351            .unwrap()
4352            .downcast::<Editor>()
4353            .unwrap();
4354
4355        let mut fake_language_server = fake_language_servers.next().await.unwrap();
4356        fake_language_server
4357            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4358                assert_eq!(
4359                    params.text_document.uri,
4360                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4361                );
4362                assert_eq!(params.range.start, lsp::Position::new(0, 0));
4363                assert_eq!(params.range.end, lsp::Position::new(0, 0));
4364                Ok(None)
4365            })
4366            .next()
4367            .await;
4368
4369        // Move cursor to a location that contains code actions.
4370        editor_b.update(cx_b, |editor, cx| {
4371            editor.change_selections(None, cx, |s| {
4372                s.select_ranges([Point::new(1, 31)..Point::new(1, 31)])
4373            });
4374            cx.focus(&editor_b);
4375        });
4376
4377        fake_language_server
4378            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4379                assert_eq!(
4380                    params.text_document.uri,
4381                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4382                );
4383                assert_eq!(params.range.start, lsp::Position::new(1, 31));
4384                assert_eq!(params.range.end, lsp::Position::new(1, 31));
4385
4386                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4387                    lsp::CodeAction {
4388                        title: "Inline into all callers".to_string(),
4389                        edit: Some(lsp::WorkspaceEdit {
4390                            changes: Some(
4391                                [
4392                                    (
4393                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
4394                                        vec![lsp::TextEdit::new(
4395                                            lsp::Range::new(
4396                                                lsp::Position::new(1, 22),
4397                                                lsp::Position::new(1, 34),
4398                                            ),
4399                                            "4".to_string(),
4400                                        )],
4401                                    ),
4402                                    (
4403                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
4404                                        vec![lsp::TextEdit::new(
4405                                            lsp::Range::new(
4406                                                lsp::Position::new(0, 0),
4407                                                lsp::Position::new(0, 27),
4408                                            ),
4409                                            "".to_string(),
4410                                        )],
4411                                    ),
4412                                ]
4413                                .into_iter()
4414                                .collect(),
4415                            ),
4416                            ..Default::default()
4417                        }),
4418                        data: Some(json!({
4419                            "codeActionParams": {
4420                                "range": {
4421                                    "start": {"line": 1, "column": 31},
4422                                    "end": {"line": 1, "column": 31},
4423                                }
4424                            }
4425                        })),
4426                        ..Default::default()
4427                    },
4428                )]))
4429            })
4430            .next()
4431            .await;
4432
4433        // Toggle code actions and wait for them to display.
4434        editor_b.update(cx_b, |editor, cx| {
4435            editor.toggle_code_actions(
4436                &ToggleCodeActions {
4437                    deployed_from_indicator: false,
4438                },
4439                cx,
4440            );
4441        });
4442        editor_b
4443            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4444            .await;
4445
4446        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4447
4448        // Confirming the code action will trigger a resolve request.
4449        let confirm_action = workspace_b
4450            .update(cx_b, |workspace, cx| {
4451                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4452            })
4453            .unwrap();
4454        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4455            |_, _| async move {
4456                Ok(lsp::CodeAction {
4457                    title: "Inline into all callers".to_string(),
4458                    edit: Some(lsp::WorkspaceEdit {
4459                        changes: Some(
4460                            [
4461                                (
4462                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4463                                    vec![lsp::TextEdit::new(
4464                                        lsp::Range::new(
4465                                            lsp::Position::new(1, 22),
4466                                            lsp::Position::new(1, 34),
4467                                        ),
4468                                        "4".to_string(),
4469                                    )],
4470                                ),
4471                                (
4472                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4473                                    vec![lsp::TextEdit::new(
4474                                        lsp::Range::new(
4475                                            lsp::Position::new(0, 0),
4476                                            lsp::Position::new(0, 27),
4477                                        ),
4478                                        "".to_string(),
4479                                    )],
4480                                ),
4481                            ]
4482                            .into_iter()
4483                            .collect(),
4484                        ),
4485                        ..Default::default()
4486                    }),
4487                    ..Default::default()
4488                })
4489            },
4490        );
4491
4492        // After the action is confirmed, an editor containing both modified files is opened.
4493        confirm_action.await.unwrap();
4494        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4495            workspace
4496                .active_item(cx)
4497                .unwrap()
4498                .downcast::<Editor>()
4499                .unwrap()
4500        });
4501        code_action_editor.update(cx_b, |editor, cx| {
4502            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4503            editor.undo(&Undo, cx);
4504            assert_eq!(
4505                editor.text(cx),
4506                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4507            );
4508            editor.redo(&Redo, cx);
4509            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4510        });
4511    }
4512
4513    #[gpui::test(iterations = 10)]
4514    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4515        cx_a.foreground().forbid_parking();
4516        let lang_registry = Arc::new(LanguageRegistry::test());
4517        let fs = FakeFs::new(cx_a.background());
4518        cx_b.update(|cx| editor::init(cx));
4519
4520        // Set up a fake language server.
4521        let mut language = Language::new(
4522            LanguageConfig {
4523                name: "Rust".into(),
4524                path_suffixes: vec!["rs".to_string()],
4525                ..Default::default()
4526            },
4527            Some(tree_sitter_rust::language()),
4528        );
4529        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4530            capabilities: lsp::ServerCapabilities {
4531                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4532                    prepare_provider: Some(true),
4533                    work_done_progress_options: Default::default(),
4534                })),
4535                ..Default::default()
4536            },
4537            ..Default::default()
4538        });
4539        lang_registry.add(Arc::new(language));
4540
4541        // Connect to a server as 2 clients.
4542        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4543        let client_a = server.create_client(cx_a, "user_a").await;
4544        let mut client_b = server.create_client(cx_b, "user_b").await;
4545        server
4546            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4547            .await;
4548
4549        // Share a project as client A
4550        fs.insert_tree(
4551            "/dir",
4552            json!({
4553                "one.rs": "const ONE: usize = 1;",
4554                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4555            }),
4556        )
4557        .await;
4558        let project_a = cx_a.update(|cx| {
4559            Project::local(
4560                client_a.clone(),
4561                client_a.user_store.clone(),
4562                lang_registry.clone(),
4563                fs.clone(),
4564                cx,
4565            )
4566        });
4567        let (worktree_a, _) = project_a
4568            .update(cx_a, |p, cx| {
4569                p.find_or_create_local_worktree("/dir", true, cx)
4570            })
4571            .await
4572            .unwrap();
4573        worktree_a
4574            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4575            .await;
4576        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4577
4578        // Join the worktree as client B.
4579        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4580        let mut params = cx_b.update(WorkspaceParams::test);
4581        params.languages = lang_registry.clone();
4582        params.project = project_b.clone();
4583        params.client = client_b.client.clone();
4584        params.user_store = client_b.user_store.clone();
4585
4586        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4587        let editor_b = workspace_b
4588            .update(cx_b, |workspace, cx| {
4589                workspace.open_path((worktree_id, "one.rs"), true, cx)
4590            })
4591            .await
4592            .unwrap()
4593            .downcast::<Editor>()
4594            .unwrap();
4595        let fake_language_server = fake_language_servers.next().await.unwrap();
4596
4597        // Move cursor to a location that can be renamed.
4598        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4599            editor.change_selections(None, cx, |s| s.select_ranges([7..7]));
4600            editor.rename(&Rename, cx).unwrap()
4601        });
4602
4603        fake_language_server
4604            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4605                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4606                assert_eq!(params.position, lsp::Position::new(0, 7));
4607                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4608                    lsp::Position::new(0, 6),
4609                    lsp::Position::new(0, 9),
4610                ))))
4611            })
4612            .next()
4613            .await
4614            .unwrap();
4615        prepare_rename.await.unwrap();
4616        editor_b.update(cx_b, |editor, cx| {
4617            let rename = editor.pending_rename().unwrap();
4618            let buffer = editor.buffer().read(cx).snapshot(cx);
4619            assert_eq!(
4620                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4621                6..9
4622            );
4623            rename.editor.update(cx, |rename_editor, cx| {
4624                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4625                    rename_buffer.edit([(0..3, "THREE")], cx);
4626                });
4627            });
4628        });
4629
4630        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4631            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4632        });
4633        fake_language_server
4634            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4635                assert_eq!(
4636                    params.text_document_position.text_document.uri.as_str(),
4637                    "file:///dir/one.rs"
4638                );
4639                assert_eq!(
4640                    params.text_document_position.position,
4641                    lsp::Position::new(0, 6)
4642                );
4643                assert_eq!(params.new_name, "THREE");
4644                Ok(Some(lsp::WorkspaceEdit {
4645                    changes: Some(
4646                        [
4647                            (
4648                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4649                                vec![lsp::TextEdit::new(
4650                                    lsp::Range::new(
4651                                        lsp::Position::new(0, 6),
4652                                        lsp::Position::new(0, 9),
4653                                    ),
4654                                    "THREE".to_string(),
4655                                )],
4656                            ),
4657                            (
4658                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4659                                vec![
4660                                    lsp::TextEdit::new(
4661                                        lsp::Range::new(
4662                                            lsp::Position::new(0, 24),
4663                                            lsp::Position::new(0, 27),
4664                                        ),
4665                                        "THREE".to_string(),
4666                                    ),
4667                                    lsp::TextEdit::new(
4668                                        lsp::Range::new(
4669                                            lsp::Position::new(0, 35),
4670                                            lsp::Position::new(0, 38),
4671                                        ),
4672                                        "THREE".to_string(),
4673                                    ),
4674                                ],
4675                            ),
4676                        ]
4677                        .into_iter()
4678                        .collect(),
4679                    ),
4680                    ..Default::default()
4681                }))
4682            })
4683            .next()
4684            .await
4685            .unwrap();
4686        confirm_rename.await.unwrap();
4687
4688        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4689            workspace
4690                .active_item(cx)
4691                .unwrap()
4692                .downcast::<Editor>()
4693                .unwrap()
4694        });
4695        rename_editor.update(cx_b, |editor, cx| {
4696            assert_eq!(
4697                editor.text(cx),
4698                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4699            );
4700            editor.undo(&Undo, cx);
4701            assert_eq!(
4702                editor.text(cx),
4703                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4704            );
4705            editor.redo(&Redo, cx);
4706            assert_eq!(
4707                editor.text(cx),
4708                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4709            );
4710        });
4711
4712        // Ensure temporary rename edits cannot be undone/redone.
4713        editor_b.update(cx_b, |editor, cx| {
4714            editor.undo(&Undo, cx);
4715            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4716            editor.undo(&Undo, cx);
4717            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4718            editor.redo(&Redo, cx);
4719            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4720        })
4721    }
4722
4723    #[gpui::test(iterations = 10)]
4724    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4725        cx_a.foreground().forbid_parking();
4726
4727        // Connect to a server as 2 clients.
4728        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4729        let client_a = server.create_client(cx_a, "user_a").await;
4730        let client_b = server.create_client(cx_b, "user_b").await;
4731
4732        // Create an org that includes these 2 users.
4733        let db = &server.app_state.db;
4734        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4735        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4736            .await
4737            .unwrap();
4738        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4739            .await
4740            .unwrap();
4741
4742        // Create a channel that includes all the users.
4743        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4744        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4745            .await
4746            .unwrap();
4747        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4748            .await
4749            .unwrap();
4750        db.create_channel_message(
4751            channel_id,
4752            client_b.current_user_id(&cx_b),
4753            "hello A, it's B.",
4754            OffsetDateTime::now_utc(),
4755            1,
4756        )
4757        .await
4758        .unwrap();
4759
4760        let channels_a = cx_a
4761            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4762        channels_a
4763            .condition(cx_a, |list, _| list.available_channels().is_some())
4764            .await;
4765        channels_a.read_with(cx_a, |list, _| {
4766            assert_eq!(
4767                list.available_channels().unwrap(),
4768                &[ChannelDetails {
4769                    id: channel_id.to_proto(),
4770                    name: "test-channel".to_string()
4771                }]
4772            )
4773        });
4774        let channel_a = channels_a.update(cx_a, |this, cx| {
4775            this.get_channel(channel_id.to_proto(), cx).unwrap()
4776        });
4777        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4778        channel_a
4779            .condition(&cx_a, |channel, _| {
4780                channel_messages(channel)
4781                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4782            })
4783            .await;
4784
4785        let channels_b = cx_b
4786            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4787        channels_b
4788            .condition(cx_b, |list, _| list.available_channels().is_some())
4789            .await;
4790        channels_b.read_with(cx_b, |list, _| {
4791            assert_eq!(
4792                list.available_channels().unwrap(),
4793                &[ChannelDetails {
4794                    id: channel_id.to_proto(),
4795                    name: "test-channel".to_string()
4796                }]
4797            )
4798        });
4799
4800        let channel_b = channels_b.update(cx_b, |this, cx| {
4801            this.get_channel(channel_id.to_proto(), cx).unwrap()
4802        });
4803        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4804        channel_b
4805            .condition(&cx_b, |channel, _| {
4806                channel_messages(channel)
4807                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4808            })
4809            .await;
4810
4811        channel_a
4812            .update(cx_a, |channel, cx| {
4813                channel
4814                    .send_message("oh, hi B.".to_string(), cx)
4815                    .unwrap()
4816                    .detach();
4817                let task = channel.send_message("sup".to_string(), cx).unwrap();
4818                assert_eq!(
4819                    channel_messages(channel),
4820                    &[
4821                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4822                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4823                        ("user_a".to_string(), "sup".to_string(), true)
4824                    ]
4825                );
4826                task
4827            })
4828            .await
4829            .unwrap();
4830
4831        channel_b
4832            .condition(&cx_b, |channel, _| {
4833                channel_messages(channel)
4834                    == [
4835                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4836                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4837                        ("user_a".to_string(), "sup".to_string(), false),
4838                    ]
4839            })
4840            .await;
4841
4842        assert_eq!(
4843            server
4844                .state()
4845                .await
4846                .channel(channel_id)
4847                .unwrap()
4848                .connection_ids
4849                .len(),
4850            2
4851        );
4852        cx_b.update(|_| drop(channel_b));
4853        server
4854            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4855            .await;
4856
4857        cx_a.update(|_| drop(channel_a));
4858        server
4859            .condition(|state| state.channel(channel_id).is_none())
4860            .await;
4861    }
4862
4863    #[gpui::test(iterations = 10)]
4864    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4865        cx_a.foreground().forbid_parking();
4866
4867        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4868        let client_a = server.create_client(cx_a, "user_a").await;
4869
4870        let db = &server.app_state.db;
4871        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4872        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4873        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4874            .await
4875            .unwrap();
4876        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4877            .await
4878            .unwrap();
4879
4880        let channels_a = cx_a
4881            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4882        channels_a
4883            .condition(cx_a, |list, _| list.available_channels().is_some())
4884            .await;
4885        let channel_a = channels_a.update(cx_a, |this, cx| {
4886            this.get_channel(channel_id.to_proto(), cx).unwrap()
4887        });
4888
4889        // Messages aren't allowed to be too long.
4890        channel_a
4891            .update(cx_a, |channel, cx| {
4892                let long_body = "this is long.\n".repeat(1024);
4893                channel.send_message(long_body, cx).unwrap()
4894            })
4895            .await
4896            .unwrap_err();
4897
4898        // Messages aren't allowed to be blank.
4899        channel_a.update(cx_a, |channel, cx| {
4900            channel.send_message(String::new(), cx).unwrap_err()
4901        });
4902
4903        // Leading and trailing whitespace are trimmed.
4904        channel_a
4905            .update(cx_a, |channel, cx| {
4906                channel
4907                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4908                    .unwrap()
4909            })
4910            .await
4911            .unwrap();
4912        assert_eq!(
4913            db.get_channel_messages(channel_id, 10, None)
4914                .await
4915                .unwrap()
4916                .iter()
4917                .map(|m| &m.body)
4918                .collect::<Vec<_>>(),
4919            &["surrounded by whitespace"]
4920        );
4921    }
4922
4923    #[gpui::test(iterations = 10)]
4924    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4925        cx_a.foreground().forbid_parking();
4926
4927        // Connect to a server as 2 clients.
4928        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4929        let client_a = server.create_client(cx_a, "user_a").await;
4930        let client_b = server.create_client(cx_b, "user_b").await;
4931        let mut status_b = client_b.status();
4932
4933        // Create an org that includes these 2 users.
4934        let db = &server.app_state.db;
4935        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4936        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4937            .await
4938            .unwrap();
4939        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4940            .await
4941            .unwrap();
4942
4943        // Create a channel that includes all the users.
4944        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4945        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4946            .await
4947            .unwrap();
4948        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4949            .await
4950            .unwrap();
4951        db.create_channel_message(
4952            channel_id,
4953            client_b.current_user_id(&cx_b),
4954            "hello A, it's B.",
4955            OffsetDateTime::now_utc(),
4956            2,
4957        )
4958        .await
4959        .unwrap();
4960
4961        let channels_a = cx_a
4962            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4963        channels_a
4964            .condition(cx_a, |list, _| list.available_channels().is_some())
4965            .await;
4966
4967        channels_a.read_with(cx_a, |list, _| {
4968            assert_eq!(
4969                list.available_channels().unwrap(),
4970                &[ChannelDetails {
4971                    id: channel_id.to_proto(),
4972                    name: "test-channel".to_string()
4973                }]
4974            )
4975        });
4976        let channel_a = channels_a.update(cx_a, |this, cx| {
4977            this.get_channel(channel_id.to_proto(), cx).unwrap()
4978        });
4979        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4980        channel_a
4981            .condition(&cx_a, |channel, _| {
4982                channel_messages(channel)
4983                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4984            })
4985            .await;
4986
4987        let channels_b = cx_b
4988            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4989        channels_b
4990            .condition(cx_b, |list, _| list.available_channels().is_some())
4991            .await;
4992        channels_b.read_with(cx_b, |list, _| {
4993            assert_eq!(
4994                list.available_channels().unwrap(),
4995                &[ChannelDetails {
4996                    id: channel_id.to_proto(),
4997                    name: "test-channel".to_string()
4998                }]
4999            )
5000        });
5001
5002        let channel_b = channels_b.update(cx_b, |this, cx| {
5003            this.get_channel(channel_id.to_proto(), cx).unwrap()
5004        });
5005        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
5006        channel_b
5007            .condition(&cx_b, |channel, _| {
5008                channel_messages(channel)
5009                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5010            })
5011            .await;
5012
5013        // Disconnect client B, ensuring we can still access its cached channel data.
5014        server.forbid_connections();
5015        server.disconnect_client(client_b.current_user_id(&cx_b));
5016        cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
5017        while !matches!(
5018            status_b.next().await,
5019            Some(client::Status::ReconnectionError { .. })
5020        ) {}
5021
5022        channels_b.read_with(cx_b, |channels, _| {
5023            assert_eq!(
5024                channels.available_channels().unwrap(),
5025                [ChannelDetails {
5026                    id: channel_id.to_proto(),
5027                    name: "test-channel".to_string()
5028                }]
5029            )
5030        });
5031        channel_b.read_with(cx_b, |channel, _| {
5032            assert_eq!(
5033                channel_messages(channel),
5034                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5035            )
5036        });
5037
5038        // Send a message from client B while it is disconnected.
5039        channel_b
5040            .update(cx_b, |channel, cx| {
5041                let task = channel
5042                    .send_message("can you see this?".to_string(), cx)
5043                    .unwrap();
5044                assert_eq!(
5045                    channel_messages(channel),
5046                    &[
5047                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5048                        ("user_b".to_string(), "can you see this?".to_string(), true)
5049                    ]
5050                );
5051                task
5052            })
5053            .await
5054            .unwrap_err();
5055
5056        // Send a message from client A while B is disconnected.
5057        channel_a
5058            .update(cx_a, |channel, cx| {
5059                channel
5060                    .send_message("oh, hi B.".to_string(), cx)
5061                    .unwrap()
5062                    .detach();
5063                let task = channel.send_message("sup".to_string(), cx).unwrap();
5064                assert_eq!(
5065                    channel_messages(channel),
5066                    &[
5067                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5068                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
5069                        ("user_a".to_string(), "sup".to_string(), true)
5070                    ]
5071                );
5072                task
5073            })
5074            .await
5075            .unwrap();
5076
5077        // Give client B a chance to reconnect.
5078        server.allow_connections();
5079        cx_b.foreground().advance_clock(Duration::from_secs(10));
5080
5081        // Verify that B sees the new messages upon reconnection, as well as the message client B
5082        // sent while offline.
5083        channel_b
5084            .condition(&cx_b, |channel, _| {
5085                channel_messages(channel)
5086                    == [
5087                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5088                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
5089                        ("user_a".to_string(), "sup".to_string(), false),
5090                        ("user_b".to_string(), "can you see this?".to_string(), false),
5091                    ]
5092            })
5093            .await;
5094
5095        // Ensure client A and B can communicate normally after reconnection.
5096        channel_a
5097            .update(cx_a, |channel, cx| {
5098                channel.send_message("you online?".to_string(), cx).unwrap()
5099            })
5100            .await
5101            .unwrap();
5102        channel_b
5103            .condition(&cx_b, |channel, _| {
5104                channel_messages(channel)
5105                    == [
5106                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5107                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
5108                        ("user_a".to_string(), "sup".to_string(), false),
5109                        ("user_b".to_string(), "can you see this?".to_string(), false),
5110                        ("user_a".to_string(), "you online?".to_string(), false),
5111                    ]
5112            })
5113            .await;
5114
5115        channel_b
5116            .update(cx_b, |channel, cx| {
5117                channel.send_message("yep".to_string(), cx).unwrap()
5118            })
5119            .await
5120            .unwrap();
5121        channel_a
5122            .condition(&cx_a, |channel, _| {
5123                channel_messages(channel)
5124                    == [
5125                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5126                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
5127                        ("user_a".to_string(), "sup".to_string(), false),
5128                        ("user_b".to_string(), "can you see this?".to_string(), false),
5129                        ("user_a".to_string(), "you online?".to_string(), false),
5130                        ("user_b".to_string(), "yep".to_string(), false),
5131                    ]
5132            })
5133            .await;
5134    }
5135
5136    #[gpui::test(iterations = 10)]
5137    async fn test_contacts(
5138        deterministic: Arc<Deterministic>,
5139        cx_a: &mut TestAppContext,
5140        cx_b: &mut TestAppContext,
5141        cx_c: &mut TestAppContext,
5142    ) {
5143        cx_a.foreground().forbid_parking();
5144
5145        // Connect to a server as 3 clients.
5146        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5147        let mut client_a = server.create_client(cx_a, "user_a").await;
5148        let mut client_b = server.create_client(cx_b, "user_b").await;
5149        let client_c = server.create_client(cx_c, "user_c").await;
5150        server
5151            .make_contacts(vec![
5152                (&client_a, cx_a),
5153                (&client_b, cx_b),
5154                (&client_c, cx_c),
5155            ])
5156            .await;
5157
5158        deterministic.run_until_parked();
5159        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5160            client.user_store.read_with(*cx, |store, _| {
5161                assert_eq!(
5162                    contacts(store),
5163                    [
5164                        ("user_a", true, vec![]),
5165                        ("user_b", true, vec![]),
5166                        ("user_c", true, vec![])
5167                    ],
5168                    "{} has the wrong contacts",
5169                    client.username
5170                )
5171            });
5172        }
5173
5174        // Share a project as client A.
5175        let fs = FakeFs::new(cx_a.background());
5176        fs.create_dir(Path::new("/a")).await.unwrap();
5177        let (project_a, _) = client_a.build_local_project(fs, "/a", cx_a).await;
5178
5179        deterministic.run_until_parked();
5180        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5181            client.user_store.read_with(*cx, |store, _| {
5182                assert_eq!(
5183                    contacts(store),
5184                    [
5185                        ("user_a", true, vec![("a", vec![])]),
5186                        ("user_b", true, vec![]),
5187                        ("user_c", true, vec![])
5188                    ],
5189                    "{} has the wrong contacts",
5190                    client.username
5191                )
5192            });
5193        }
5194
5195        let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5196
5197        deterministic.run_until_parked();
5198        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5199            client.user_store.read_with(*cx, |store, _| {
5200                assert_eq!(
5201                    contacts(store),
5202                    [
5203                        ("user_a", true, vec![("a", vec!["user_b"])]),
5204                        ("user_b", true, vec![]),
5205                        ("user_c", true, vec![])
5206                    ],
5207                    "{} has the wrong contacts",
5208                    client.username
5209                )
5210            });
5211        }
5212
5213        // Add a local project as client B
5214        let fs = FakeFs::new(cx_b.background());
5215        fs.create_dir(Path::new("/b")).await.unwrap();
5216        let (_project_b, _) = client_b.build_local_project(fs, "/b", cx_a).await;
5217
5218        deterministic.run_until_parked();
5219        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5220            client.user_store.read_with(*cx, |store, _| {
5221                assert_eq!(
5222                    contacts(store),
5223                    [
5224                        ("user_a", true, vec![("a", vec!["user_b"])]),
5225                        ("user_b", true, vec![("b", vec![])]),
5226                        ("user_c", true, vec![])
5227                    ],
5228                    "{} has the wrong contacts",
5229                    client.username
5230                )
5231            });
5232        }
5233
5234        project_a
5235            .condition(&cx_a, |project, _| {
5236                project.collaborators().contains_key(&client_b.peer_id)
5237            })
5238            .await;
5239
5240        client_a.project.take();
5241        cx_a.update(move |_| drop(project_a));
5242        deterministic.run_until_parked();
5243        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5244            client.user_store.read_with(*cx, |store, _| {
5245                assert_eq!(
5246                    contacts(store),
5247                    [
5248                        ("user_a", true, vec![]),
5249                        ("user_b", true, vec![("b", vec![])]),
5250                        ("user_c", true, vec![])
5251                    ],
5252                    "{} has the wrong contacts",
5253                    client.username
5254                )
5255            });
5256        }
5257
5258        server.disconnect_client(client_c.current_user_id(cx_c));
5259        server.forbid_connections();
5260        deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5261        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5262            client.user_store.read_with(*cx, |store, _| {
5263                assert_eq!(
5264                    contacts(store),
5265                    [
5266                        ("user_a", true, vec![]),
5267                        ("user_b", true, vec![("b", vec![])]),
5268                        ("user_c", false, vec![])
5269                    ],
5270                    "{} has the wrong contacts",
5271                    client.username
5272                )
5273            });
5274        }
5275        client_c
5276            .user_store
5277            .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5278
5279        server.allow_connections();
5280        client_c
5281            .authenticate_and_connect(false, &cx_c.to_async())
5282            .await
5283            .unwrap();
5284
5285        deterministic.run_until_parked();
5286        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5287            client.user_store.read_with(*cx, |store, _| {
5288                assert_eq!(
5289                    contacts(store),
5290                    [
5291                        ("user_a", true, vec![]),
5292                        ("user_b", true, vec![("b", vec![])]),
5293                        ("user_c", true, vec![])
5294                    ],
5295                    "{} has the wrong contacts",
5296                    client.username
5297                )
5298            });
5299        }
5300
5301        fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, Vec<&str>)>)> {
5302            user_store
5303                .contacts()
5304                .iter()
5305                .map(|contact| {
5306                    let projects = contact
5307                        .projects
5308                        .iter()
5309                        .map(|p| {
5310                            (
5311                                p.worktree_root_names[0].as_str(),
5312                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5313                            )
5314                        })
5315                        .collect();
5316                    (contact.user.github_login.as_str(), contact.online, projects)
5317                })
5318                .collect()
5319        }
5320    }
5321
5322    #[gpui::test(iterations = 10)]
5323    async fn test_contact_requests(
5324        executor: Arc<Deterministic>,
5325        cx_a: &mut TestAppContext,
5326        cx_a2: &mut TestAppContext,
5327        cx_b: &mut TestAppContext,
5328        cx_b2: &mut TestAppContext,
5329        cx_c: &mut TestAppContext,
5330        cx_c2: &mut TestAppContext,
5331    ) {
5332        cx_a.foreground().forbid_parking();
5333
5334        // Connect to a server as 3 clients.
5335        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5336        let client_a = server.create_client(cx_a, "user_a").await;
5337        let client_a2 = server.create_client(cx_a2, "user_a").await;
5338        let client_b = server.create_client(cx_b, "user_b").await;
5339        let client_b2 = server.create_client(cx_b2, "user_b").await;
5340        let client_c = server.create_client(cx_c, "user_c").await;
5341        let client_c2 = server.create_client(cx_c2, "user_c").await;
5342
5343        assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5344        assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5345        assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5346
5347        // User A and User C request that user B become their contact.
5348        client_a
5349            .user_store
5350            .update(cx_a, |store, cx| {
5351                store.request_contact(client_b.user_id().unwrap(), cx)
5352            })
5353            .await
5354            .unwrap();
5355        client_c
5356            .user_store
5357            .update(cx_c, |store, cx| {
5358                store.request_contact(client_b.user_id().unwrap(), cx)
5359            })
5360            .await
5361            .unwrap();
5362        executor.run_until_parked();
5363
5364        // All users see the pending request appear in all their clients.
5365        assert_eq!(
5366            client_a.summarize_contacts(&cx_a).outgoing_requests,
5367            &["user_b"]
5368        );
5369        assert_eq!(
5370            client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5371            &["user_b"]
5372        );
5373        assert_eq!(
5374            client_b.summarize_contacts(&cx_b).incoming_requests,
5375            &["user_a", "user_c"]
5376        );
5377        assert_eq!(
5378            client_b2.summarize_contacts(&cx_b2).incoming_requests,
5379            &["user_a", "user_c"]
5380        );
5381        assert_eq!(
5382            client_c.summarize_contacts(&cx_c).outgoing_requests,
5383            &["user_b"]
5384        );
5385        assert_eq!(
5386            client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5387            &["user_b"]
5388        );
5389
5390        // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5391        disconnect_and_reconnect(&client_a, cx_a).await;
5392        disconnect_and_reconnect(&client_b, cx_b).await;
5393        disconnect_and_reconnect(&client_c, cx_c).await;
5394        executor.run_until_parked();
5395        assert_eq!(
5396            client_a.summarize_contacts(&cx_a).outgoing_requests,
5397            &["user_b"]
5398        );
5399        assert_eq!(
5400            client_b.summarize_contacts(&cx_b).incoming_requests,
5401            &["user_a", "user_c"]
5402        );
5403        assert_eq!(
5404            client_c.summarize_contacts(&cx_c).outgoing_requests,
5405            &["user_b"]
5406        );
5407
5408        // User B accepts the request from user A.
5409        client_b
5410            .user_store
5411            .update(cx_b, |store, cx| {
5412                store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5413            })
5414            .await
5415            .unwrap();
5416
5417        executor.run_until_parked();
5418
5419        // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5420        let contacts_b = client_b.summarize_contacts(&cx_b);
5421        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5422        assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5423        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5424        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5425        assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5426
5427        // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5428        let contacts_a = client_a.summarize_contacts(&cx_a);
5429        assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5430        assert!(contacts_a.outgoing_requests.is_empty());
5431        let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5432        assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5433        assert!(contacts_a2.outgoing_requests.is_empty());
5434
5435        // Contacts are present upon connecting (tested here via disconnect/reconnect)
5436        disconnect_and_reconnect(&client_a, cx_a).await;
5437        disconnect_and_reconnect(&client_b, cx_b).await;
5438        disconnect_and_reconnect(&client_c, cx_c).await;
5439        executor.run_until_parked();
5440        assert_eq!(
5441            client_a.summarize_contacts(&cx_a).current,
5442            &["user_a", "user_b"]
5443        );
5444        assert_eq!(
5445            client_b.summarize_contacts(&cx_b).current,
5446            &["user_a", "user_b"]
5447        );
5448        assert_eq!(
5449            client_b.summarize_contacts(&cx_b).incoming_requests,
5450            &["user_c"]
5451        );
5452        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5453        assert_eq!(
5454            client_c.summarize_contacts(&cx_c).outgoing_requests,
5455            &["user_b"]
5456        );
5457
5458        // User B rejects the request from user C.
5459        client_b
5460            .user_store
5461            .update(cx_b, |store, cx| {
5462                store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5463            })
5464            .await
5465            .unwrap();
5466
5467        executor.run_until_parked();
5468
5469        // User B doesn't see user C as their contact, and the incoming request from them is removed.
5470        let contacts_b = client_b.summarize_contacts(&cx_b);
5471        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5472        assert!(contacts_b.incoming_requests.is_empty());
5473        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5474        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5475        assert!(contacts_b2.incoming_requests.is_empty());
5476
5477        // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5478        let contacts_c = client_c.summarize_contacts(&cx_c);
5479        assert_eq!(contacts_c.current, &["user_c"]);
5480        assert!(contacts_c.outgoing_requests.is_empty());
5481        let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5482        assert_eq!(contacts_c2.current, &["user_c"]);
5483        assert!(contacts_c2.outgoing_requests.is_empty());
5484
5485        // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5486        disconnect_and_reconnect(&client_a, cx_a).await;
5487        disconnect_and_reconnect(&client_b, cx_b).await;
5488        disconnect_and_reconnect(&client_c, cx_c).await;
5489        executor.run_until_parked();
5490        assert_eq!(
5491            client_a.summarize_contacts(&cx_a).current,
5492            &["user_a", "user_b"]
5493        );
5494        assert_eq!(
5495            client_b.summarize_contacts(&cx_b).current,
5496            &["user_a", "user_b"]
5497        );
5498        assert!(client_b
5499            .summarize_contacts(&cx_b)
5500            .incoming_requests
5501            .is_empty());
5502        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5503        assert!(client_c
5504            .summarize_contacts(&cx_c)
5505            .outgoing_requests
5506            .is_empty());
5507
5508        async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5509            client.disconnect(&cx.to_async()).unwrap();
5510            client.clear_contacts(cx).await;
5511            client
5512                .authenticate_and_connect(false, &cx.to_async())
5513                .await
5514                .unwrap();
5515        }
5516    }
5517
5518    #[gpui::test(iterations = 10)]
5519    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5520        cx_a.foreground().forbid_parking();
5521        let fs = FakeFs::new(cx_a.background());
5522
5523        // 2 clients connect to a server.
5524        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5525        let mut client_a = server.create_client(cx_a, "user_a").await;
5526        let mut client_b = server.create_client(cx_b, "user_b").await;
5527        server
5528            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5529            .await;
5530        cx_a.update(editor::init);
5531        cx_b.update(editor::init);
5532
5533        // Client A shares a project.
5534        fs.insert_tree(
5535            "/a",
5536            json!({
5537                "1.txt": "one",
5538                "2.txt": "two",
5539                "3.txt": "three",
5540            }),
5541        )
5542        .await;
5543        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5544
5545        // Client B joins the project.
5546        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5547
5548        // Client A opens some editors.
5549        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5550        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5551        let editor_a1 = workspace_a
5552            .update(cx_a, |workspace, cx| {
5553                workspace.open_path((worktree_id, "1.txt"), true, cx)
5554            })
5555            .await
5556            .unwrap()
5557            .downcast::<Editor>()
5558            .unwrap();
5559        let editor_a2 = workspace_a
5560            .update(cx_a, |workspace, cx| {
5561                workspace.open_path((worktree_id, "2.txt"), true, cx)
5562            })
5563            .await
5564            .unwrap()
5565            .downcast::<Editor>()
5566            .unwrap();
5567
5568        // Client B opens an editor.
5569        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5570        let editor_b1 = workspace_b
5571            .update(cx_b, |workspace, cx| {
5572                workspace.open_path((worktree_id, "1.txt"), true, cx)
5573            })
5574            .await
5575            .unwrap()
5576            .downcast::<Editor>()
5577            .unwrap();
5578
5579        let client_a_id = project_b.read_with(cx_b, |project, _| {
5580            project.collaborators().values().next().unwrap().peer_id
5581        });
5582        let client_b_id = project_a.read_with(cx_a, |project, _| {
5583            project.collaborators().values().next().unwrap().peer_id
5584        });
5585
5586        // When client B starts following client A, all visible view states are replicated to client B.
5587        editor_a1.update(cx_a, |editor, cx| {
5588            editor.change_selections(None, cx, |s| s.select_ranges([0..1]))
5589        });
5590        editor_a2.update(cx_a, |editor, cx| {
5591            editor.change_selections(None, cx, |s| s.select_ranges([2..3]))
5592        });
5593        workspace_b
5594            .update(cx_b, |workspace, cx| {
5595                workspace
5596                    .toggle_follow(&ToggleFollow(client_a_id), cx)
5597                    .unwrap()
5598            })
5599            .await
5600            .unwrap();
5601
5602        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5603            workspace
5604                .active_item(cx)
5605                .unwrap()
5606                .downcast::<Editor>()
5607                .unwrap()
5608        });
5609        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5610        assert_eq!(
5611            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5612            Some((worktree_id, "2.txt").into())
5613        );
5614        assert_eq!(
5615            editor_b2.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5616            vec![2..3]
5617        );
5618        assert_eq!(
5619            editor_b1.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5620            vec![0..1]
5621        );
5622
5623        // When client A activates a different editor, client B does so as well.
5624        workspace_a.update(cx_a, |workspace, cx| {
5625            workspace.activate_item(&editor_a1, cx)
5626        });
5627        workspace_b
5628            .condition(cx_b, |workspace, cx| {
5629                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5630            })
5631            .await;
5632
5633        // When client A navigates back and forth, client B does so as well.
5634        workspace_a
5635            .update(cx_a, |workspace, cx| {
5636                workspace::Pane::go_back(workspace, None, cx)
5637            })
5638            .await;
5639        workspace_b
5640            .condition(cx_b, |workspace, cx| {
5641                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5642            })
5643            .await;
5644
5645        workspace_a
5646            .update(cx_a, |workspace, cx| {
5647                workspace::Pane::go_forward(workspace, None, cx)
5648            })
5649            .await;
5650        workspace_b
5651            .condition(cx_b, |workspace, cx| {
5652                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5653            })
5654            .await;
5655
5656        // Changes to client A's editor are reflected on client B.
5657        editor_a1.update(cx_a, |editor, cx| {
5658            editor.change_selections(None, cx, |s| s.select_ranges([1..1, 2..2]));
5659        });
5660        editor_b1
5661            .condition(cx_b, |editor, cx| {
5662                editor.selections.ranges(cx) == vec![1..1, 2..2]
5663            })
5664            .await;
5665
5666        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5667        editor_b1
5668            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5669            .await;
5670
5671        editor_a1.update(cx_a, |editor, cx| {
5672            editor.change_selections(None, cx, |s| s.select_ranges([3..3]));
5673            editor.set_scroll_position(vec2f(0., 100.), cx);
5674        });
5675        editor_b1
5676            .condition(cx_b, |editor, cx| {
5677                editor.selections.ranges(cx) == vec![3..3]
5678            })
5679            .await;
5680
5681        // After unfollowing, client B stops receiving updates from client A.
5682        workspace_b.update(cx_b, |workspace, cx| {
5683            workspace.unfollow(&workspace.active_pane().clone(), cx)
5684        });
5685        workspace_a.update(cx_a, |workspace, cx| {
5686            workspace.activate_item(&editor_a2, cx)
5687        });
5688        cx_a.foreground().run_until_parked();
5689        assert_eq!(
5690            workspace_b.read_with(cx_b, |workspace, cx| workspace
5691                .active_item(cx)
5692                .unwrap()
5693                .id()),
5694            editor_b1.id()
5695        );
5696
5697        // Client A starts following client B.
5698        workspace_a
5699            .update(cx_a, |workspace, cx| {
5700                workspace
5701                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5702                    .unwrap()
5703            })
5704            .await
5705            .unwrap();
5706        assert_eq!(
5707            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5708            Some(client_b_id)
5709        );
5710        assert_eq!(
5711            workspace_a.read_with(cx_a, |workspace, cx| workspace
5712                .active_item(cx)
5713                .unwrap()
5714                .id()),
5715            editor_a1.id()
5716        );
5717
5718        // Following interrupts when client B disconnects.
5719        client_b.disconnect(&cx_b.to_async()).unwrap();
5720        cx_a.foreground().run_until_parked();
5721        assert_eq!(
5722            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5723            None
5724        );
5725    }
5726
5727    #[gpui::test(iterations = 10)]
5728    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5729        cx_a.foreground().forbid_parking();
5730        let fs = FakeFs::new(cx_a.background());
5731
5732        // 2 clients connect to a server.
5733        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5734        let mut client_a = server.create_client(cx_a, "user_a").await;
5735        let mut client_b = server.create_client(cx_b, "user_b").await;
5736        server
5737            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5738            .await;
5739        cx_a.update(editor::init);
5740        cx_b.update(editor::init);
5741
5742        // Client A shares a project.
5743        fs.insert_tree(
5744            "/a",
5745            json!({
5746                "1.txt": "one",
5747                "2.txt": "two",
5748                "3.txt": "three",
5749                "4.txt": "four",
5750            }),
5751        )
5752        .await;
5753        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5754
5755        // Client B joins the project.
5756        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5757
5758        // Client A opens some editors.
5759        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5760        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5761        let _editor_a1 = workspace_a
5762            .update(cx_a, |workspace, cx| {
5763                workspace.open_path((worktree_id, "1.txt"), true, cx)
5764            })
5765            .await
5766            .unwrap()
5767            .downcast::<Editor>()
5768            .unwrap();
5769
5770        // Client B opens an editor.
5771        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5772        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5773        let _editor_b1 = workspace_b
5774            .update(cx_b, |workspace, cx| {
5775                workspace.open_path((worktree_id, "2.txt"), true, cx)
5776            })
5777            .await
5778            .unwrap()
5779            .downcast::<Editor>()
5780            .unwrap();
5781
5782        // Clients A and B follow each other in split panes
5783        workspace_a
5784            .update(cx_a, |workspace, cx| {
5785                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5786                assert_ne!(*workspace.active_pane(), pane_a1);
5787                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5788                workspace
5789                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5790                    .unwrap()
5791            })
5792            .await
5793            .unwrap();
5794        workspace_b
5795            .update(cx_b, |workspace, cx| {
5796                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5797                assert_ne!(*workspace.active_pane(), pane_b1);
5798                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5799                workspace
5800                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5801                    .unwrap()
5802            })
5803            .await
5804            .unwrap();
5805
5806        workspace_a
5807            .update(cx_a, |workspace, cx| {
5808                workspace.activate_next_pane(cx);
5809                assert_eq!(*workspace.active_pane(), pane_a1);
5810                workspace.open_path((worktree_id, "3.txt"), true, cx)
5811            })
5812            .await
5813            .unwrap();
5814        workspace_b
5815            .update(cx_b, |workspace, cx| {
5816                workspace.activate_next_pane(cx);
5817                assert_eq!(*workspace.active_pane(), pane_b1);
5818                workspace.open_path((worktree_id, "4.txt"), true, cx)
5819            })
5820            .await
5821            .unwrap();
5822        cx_a.foreground().run_until_parked();
5823
5824        // Ensure leader updates don't change the active pane of followers
5825        workspace_a.read_with(cx_a, |workspace, _| {
5826            assert_eq!(*workspace.active_pane(), pane_a1);
5827        });
5828        workspace_b.read_with(cx_b, |workspace, _| {
5829            assert_eq!(*workspace.active_pane(), pane_b1);
5830        });
5831
5832        // Ensure peers following each other doesn't cause an infinite loop.
5833        assert_eq!(
5834            workspace_a.read_with(cx_a, |workspace, cx| workspace
5835                .active_item(cx)
5836                .unwrap()
5837                .project_path(cx)),
5838            Some((worktree_id, "3.txt").into())
5839        );
5840        workspace_a.update(cx_a, |workspace, cx| {
5841            assert_eq!(
5842                workspace.active_item(cx).unwrap().project_path(cx),
5843                Some((worktree_id, "3.txt").into())
5844            );
5845            workspace.activate_next_pane(cx);
5846            assert_eq!(
5847                workspace.active_item(cx).unwrap().project_path(cx),
5848                Some((worktree_id, "4.txt").into())
5849            );
5850        });
5851        workspace_b.update(cx_b, |workspace, cx| {
5852            assert_eq!(
5853                workspace.active_item(cx).unwrap().project_path(cx),
5854                Some((worktree_id, "4.txt").into())
5855            );
5856            workspace.activate_next_pane(cx);
5857            assert_eq!(
5858                workspace.active_item(cx).unwrap().project_path(cx),
5859                Some((worktree_id, "3.txt").into())
5860            );
5861        });
5862    }
5863
5864    #[gpui::test(iterations = 10)]
5865    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5866        cx_a.foreground().forbid_parking();
5867        let fs = FakeFs::new(cx_a.background());
5868
5869        // 2 clients connect to a server.
5870        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5871        let mut client_a = server.create_client(cx_a, "user_a").await;
5872        let mut client_b = server.create_client(cx_b, "user_b").await;
5873        server
5874            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5875            .await;
5876        cx_a.update(editor::init);
5877        cx_b.update(editor::init);
5878
5879        // Client A shares a project.
5880        fs.insert_tree(
5881            "/a",
5882            json!({
5883                "1.txt": "one",
5884                "2.txt": "two",
5885                "3.txt": "three",
5886            }),
5887        )
5888        .await;
5889        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5890
5891        // Client B joins the project.
5892        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5893
5894        // Client A opens some editors.
5895        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5896        let _editor_a1 = workspace_a
5897            .update(cx_a, |workspace, cx| {
5898                workspace.open_path((worktree_id, "1.txt"), true, cx)
5899            })
5900            .await
5901            .unwrap()
5902            .downcast::<Editor>()
5903            .unwrap();
5904
5905        // Client B starts following client A.
5906        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5907        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5908        let leader_id = project_b.read_with(cx_b, |project, _| {
5909            project.collaborators().values().next().unwrap().peer_id
5910        });
5911        workspace_b
5912            .update(cx_b, |workspace, cx| {
5913                workspace
5914                    .toggle_follow(&ToggleFollow(leader_id), cx)
5915                    .unwrap()
5916            })
5917            .await
5918            .unwrap();
5919        assert_eq!(
5920            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5921            Some(leader_id)
5922        );
5923        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5924            workspace
5925                .active_item(cx)
5926                .unwrap()
5927                .downcast::<Editor>()
5928                .unwrap()
5929        });
5930
5931        // When client B moves, it automatically stops following client A.
5932        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5933        assert_eq!(
5934            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5935            None
5936        );
5937
5938        workspace_b
5939            .update(cx_b, |workspace, cx| {
5940                workspace
5941                    .toggle_follow(&ToggleFollow(leader_id), cx)
5942                    .unwrap()
5943            })
5944            .await
5945            .unwrap();
5946        assert_eq!(
5947            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5948            Some(leader_id)
5949        );
5950
5951        // When client B edits, it automatically stops following client A.
5952        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5953        assert_eq!(
5954            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5955            None
5956        );
5957
5958        workspace_b
5959            .update(cx_b, |workspace, cx| {
5960                workspace
5961                    .toggle_follow(&ToggleFollow(leader_id), cx)
5962                    .unwrap()
5963            })
5964            .await
5965            .unwrap();
5966        assert_eq!(
5967            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5968            Some(leader_id)
5969        );
5970
5971        // When client B scrolls, it automatically stops following client A.
5972        editor_b2.update(cx_b, |editor, cx| {
5973            editor.set_scroll_position(vec2f(0., 3.), cx)
5974        });
5975        assert_eq!(
5976            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5977            None
5978        );
5979
5980        workspace_b
5981            .update(cx_b, |workspace, cx| {
5982                workspace
5983                    .toggle_follow(&ToggleFollow(leader_id), cx)
5984                    .unwrap()
5985            })
5986            .await
5987            .unwrap();
5988        assert_eq!(
5989            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5990            Some(leader_id)
5991        );
5992
5993        // When client B activates a different pane, it continues following client A in the original pane.
5994        workspace_b.update(cx_b, |workspace, cx| {
5995            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5996        });
5997        assert_eq!(
5998            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5999            Some(leader_id)
6000        );
6001
6002        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
6003        assert_eq!(
6004            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6005            Some(leader_id)
6006        );
6007
6008        // When client B activates a different item in the original pane, it automatically stops following client A.
6009        workspace_b
6010            .update(cx_b, |workspace, cx| {
6011                workspace.open_path((worktree_id, "2.txt"), true, cx)
6012            })
6013            .await
6014            .unwrap();
6015        assert_eq!(
6016            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6017            None
6018        );
6019    }
6020
6021    #[gpui::test(iterations = 100)]
6022    async fn test_random_collaboration(
6023        cx: &mut TestAppContext,
6024        deterministic: Arc<Deterministic>,
6025        rng: StdRng,
6026    ) {
6027        cx.foreground().forbid_parking();
6028        let max_peers = env::var("MAX_PEERS")
6029            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
6030            .unwrap_or(5);
6031        assert!(max_peers <= 5);
6032
6033        let max_operations = env::var("OPERATIONS")
6034            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
6035            .unwrap_or(10);
6036
6037        let rng = Arc::new(Mutex::new(rng));
6038
6039        let guest_lang_registry = Arc::new(LanguageRegistry::test());
6040        let host_language_registry = Arc::new(LanguageRegistry::test());
6041
6042        let fs = FakeFs::new(cx.background());
6043        fs.insert_tree("/_collab", json!({"init": ""})).await;
6044
6045        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
6046        let db = server.app_state.db.clone();
6047        let host_user_id = db.create_user("host", false).await.unwrap();
6048        for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
6049            let guest_user_id = db.create_user(username, false).await.unwrap();
6050            server
6051                .app_state
6052                .db
6053                .send_contact_request(guest_user_id, host_user_id)
6054                .await
6055                .unwrap();
6056            server
6057                .app_state
6058                .db
6059                .respond_to_contact_request(host_user_id, guest_user_id, true)
6060                .await
6061                .unwrap();
6062        }
6063
6064        let mut clients = Vec::new();
6065        let mut user_ids = Vec::new();
6066        let mut op_start_signals = Vec::new();
6067
6068        let mut next_entity_id = 100000;
6069        let mut host_cx = TestAppContext::new(
6070            cx.foreground_platform(),
6071            cx.platform(),
6072            deterministic.build_foreground(next_entity_id),
6073            deterministic.build_background(),
6074            cx.font_cache(),
6075            cx.leak_detector(),
6076            next_entity_id,
6077        );
6078        let host = server.create_client(&mut host_cx, "host").await;
6079        let host_project = host_cx.update(|cx| {
6080            Project::local(
6081                host.client.clone(),
6082                host.user_store.clone(),
6083                host_language_registry.clone(),
6084                fs.clone(),
6085                cx,
6086            )
6087        });
6088        let host_project_id = host_project
6089            .update(&mut host_cx, |p, _| p.next_remote_id())
6090            .await;
6091
6092        let (collab_worktree, _) = host_project
6093            .update(&mut host_cx, |project, cx| {
6094                project.find_or_create_local_worktree("/_collab", true, cx)
6095            })
6096            .await
6097            .unwrap();
6098        collab_worktree
6099            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
6100            .await;
6101
6102        // Set up fake language servers.
6103        let mut language = Language::new(
6104            LanguageConfig {
6105                name: "Rust".into(),
6106                path_suffixes: vec!["rs".to_string()],
6107                ..Default::default()
6108            },
6109            None,
6110        );
6111        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6112            name: "the-fake-language-server",
6113            capabilities: lsp::LanguageServer::full_capabilities(),
6114            initializer: Some(Box::new({
6115                let rng = rng.clone();
6116                let fs = fs.clone();
6117                let project = host_project.downgrade();
6118                move |fake_server: &mut FakeLanguageServer| {
6119                    fake_server.handle_request::<lsp::request::Completion, _, _>(
6120                        |_, _| async move {
6121                            Ok(Some(lsp::CompletionResponse::Array(vec![
6122                                lsp::CompletionItem {
6123                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6124                                        range: lsp::Range::new(
6125                                            lsp::Position::new(0, 0),
6126                                            lsp::Position::new(0, 0),
6127                                        ),
6128                                        new_text: "the-new-text".to_string(),
6129                                    })),
6130                                    ..Default::default()
6131                                },
6132                            ])))
6133                        },
6134                    );
6135
6136                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6137                        |_, _| async move {
6138                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6139                                lsp::CodeAction {
6140                                    title: "the-code-action".to_string(),
6141                                    ..Default::default()
6142                                },
6143                            )]))
6144                        },
6145                    );
6146
6147                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6148                        |params, _| async move {
6149                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6150                                params.position,
6151                                params.position,
6152                            ))))
6153                        },
6154                    );
6155
6156                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6157                        let fs = fs.clone();
6158                        let rng = rng.clone();
6159                        move |_, _| {
6160                            let fs = fs.clone();
6161                            let rng = rng.clone();
6162                            async move {
6163                                let files = fs.files().await;
6164                                let mut rng = rng.lock();
6165                                let count = rng.gen_range::<usize, _>(1..3);
6166                                let files = (0..count)
6167                                    .map(|_| files.choose(&mut *rng).unwrap())
6168                                    .collect::<Vec<_>>();
6169                                log::info!("LSP: Returning definitions in files {:?}", &files);
6170                                Ok(Some(lsp::GotoDefinitionResponse::Array(
6171                                    files
6172                                        .into_iter()
6173                                        .map(|file| lsp::Location {
6174                                            uri: lsp::Url::from_file_path(file).unwrap(),
6175                                            range: Default::default(),
6176                                        })
6177                                        .collect(),
6178                                )))
6179                            }
6180                        }
6181                    });
6182
6183                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6184                        let rng = rng.clone();
6185                        let project = project.clone();
6186                        move |params, mut cx| {
6187                            let highlights = if let Some(project) = project.upgrade(&cx) {
6188                                project.update(&mut cx, |project, cx| {
6189                                    let path = params
6190                                        .text_document_position_params
6191                                        .text_document
6192                                        .uri
6193                                        .to_file_path()
6194                                        .unwrap();
6195                                    let (worktree, relative_path) =
6196                                        project.find_local_worktree(&path, cx)?;
6197                                    let project_path =
6198                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
6199                                    let buffer =
6200                                        project.get_open_buffer(&project_path, cx)?.read(cx);
6201
6202                                    let mut highlights = Vec::new();
6203                                    let highlight_count = rng.lock().gen_range(1..=5);
6204                                    let mut prev_end = 0;
6205                                    for _ in 0..highlight_count {
6206                                        let range =
6207                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
6208
6209                                        highlights.push(lsp::DocumentHighlight {
6210                                            range: range_to_lsp(range.to_point_utf16(buffer)),
6211                                            kind: Some(lsp::DocumentHighlightKind::READ),
6212                                        });
6213                                        prev_end = range.end;
6214                                    }
6215                                    Some(highlights)
6216                                })
6217                            } else {
6218                                None
6219                            };
6220                            async move { Ok(highlights) }
6221                        }
6222                    });
6223                }
6224            })),
6225            ..Default::default()
6226        });
6227        host_language_registry.add(Arc::new(language));
6228
6229        let op_start_signal = futures::channel::mpsc::unbounded();
6230        user_ids.push(host.current_user_id(&host_cx));
6231        op_start_signals.push(op_start_signal.0);
6232        clients.push(host_cx.foreground().spawn(host.simulate_host(
6233            host_project,
6234            op_start_signal.1,
6235            rng.clone(),
6236            host_cx,
6237        )));
6238
6239        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6240            rng.lock().gen_range(0..max_operations)
6241        } else {
6242            max_operations
6243        };
6244        let mut available_guests = vec![
6245            "guest-1".to_string(),
6246            "guest-2".to_string(),
6247            "guest-3".to_string(),
6248            "guest-4".to_string(),
6249        ];
6250        let mut operations = 0;
6251        while operations < max_operations {
6252            if operations == disconnect_host_at {
6253                server.disconnect_client(user_ids[0]);
6254                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6255                drop(op_start_signals);
6256                let mut clients = futures::future::join_all(clients).await;
6257                cx.foreground().run_until_parked();
6258
6259                let (host, mut host_cx, host_err) = clients.remove(0);
6260                if let Some(host_err) = host_err {
6261                    log::error!("host error - {:?}", host_err);
6262                }
6263                host.project
6264                    .as_ref()
6265                    .unwrap()
6266                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6267                for (guest, mut guest_cx, guest_err) in clients {
6268                    if let Some(guest_err) = guest_err {
6269                        log::error!("{} error - {:?}", guest.username, guest_err);
6270                    }
6271
6272                    let contacts = server
6273                        .app_state
6274                        .db
6275                        .get_contacts(guest.current_user_id(&guest_cx))
6276                        .await
6277                        .unwrap();
6278                    let contacts = server
6279                        .store
6280                        .read()
6281                        .await
6282                        .build_initial_contacts_update(contacts)
6283                        .contacts;
6284                    assert!(!contacts
6285                        .iter()
6286                        .flat_map(|contact| &contact.projects)
6287                        .any(|project| project.id == host_project_id));
6288                    guest
6289                        .project
6290                        .as_ref()
6291                        .unwrap()
6292                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6293                    guest_cx.update(|_| drop(guest));
6294                }
6295                host_cx.update(|_| drop(host));
6296
6297                return;
6298            }
6299
6300            let distribution = rng.lock().gen_range(0..100);
6301            match distribution {
6302                0..=19 if !available_guests.is_empty() => {
6303                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
6304                    let guest_username = available_guests.remove(guest_ix);
6305                    log::info!("Adding new connection for {}", guest_username);
6306                    next_entity_id += 100000;
6307                    let mut guest_cx = TestAppContext::new(
6308                        cx.foreground_platform(),
6309                        cx.platform(),
6310                        deterministic.build_foreground(next_entity_id),
6311                        deterministic.build_background(),
6312                        cx.font_cache(),
6313                        cx.leak_detector(),
6314                        next_entity_id,
6315                    );
6316                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
6317                    let guest_project = Project::remote(
6318                        host_project_id,
6319                        guest.client.clone(),
6320                        guest.user_store.clone(),
6321                        guest_lang_registry.clone(),
6322                        FakeFs::new(cx.background()),
6323                        &mut guest_cx.to_async(),
6324                    )
6325                    .await
6326                    .unwrap();
6327                    let op_start_signal = futures::channel::mpsc::unbounded();
6328                    user_ids.push(guest.current_user_id(&guest_cx));
6329                    op_start_signals.push(op_start_signal.0);
6330                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6331                        guest_username.clone(),
6332                        guest_project,
6333                        op_start_signal.1,
6334                        rng.clone(),
6335                        guest_cx,
6336                    )));
6337
6338                    log::info!("Added connection for {}", guest_username);
6339                    operations += 1;
6340                }
6341                20..=29 if clients.len() > 1 => {
6342                    let guest_ix = rng.lock().gen_range(1..clients.len());
6343                    log::info!("Removing guest {}", user_ids[guest_ix]);
6344                    let removed_guest_id = user_ids.remove(guest_ix);
6345                    let guest = clients.remove(guest_ix);
6346                    op_start_signals.remove(guest_ix);
6347                    server.forbid_connections();
6348                    server.disconnect_client(removed_guest_id);
6349                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6350                    let (guest, mut guest_cx, guest_err) = guest.await;
6351                    server.allow_connections();
6352
6353                    if let Some(guest_err) = guest_err {
6354                        log::error!("{} error - {:?}", guest.username, guest_err);
6355                    }
6356                    guest
6357                        .project
6358                        .as_ref()
6359                        .unwrap()
6360                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6361                    for user_id in &user_ids {
6362                        let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap();
6363                        let contacts = server
6364                            .store
6365                            .read()
6366                            .await
6367                            .build_initial_contacts_update(contacts)
6368                            .contacts;
6369                        for contact in contacts {
6370                            if contact.online {
6371                                assert_ne!(
6372                                    contact.user_id, removed_guest_id.0 as u64,
6373                                    "removed guest is still a contact of another peer"
6374                                );
6375                            }
6376                            for project in contact.projects {
6377                                for project_guest_id in project.guests {
6378                                    assert_ne!(
6379                                        project_guest_id, removed_guest_id.0 as u64,
6380                                        "removed guest appears as still participating on a project"
6381                                    );
6382                                }
6383                            }
6384                        }
6385                    }
6386
6387                    log::info!("{} removed", guest.username);
6388                    available_guests.push(guest.username.clone());
6389                    guest_cx.update(|_| drop(guest));
6390
6391                    operations += 1;
6392                }
6393                _ => {
6394                    while operations < max_operations && rng.lock().gen_bool(0.7) {
6395                        op_start_signals
6396                            .choose(&mut *rng.lock())
6397                            .unwrap()
6398                            .unbounded_send(())
6399                            .unwrap();
6400                        operations += 1;
6401                    }
6402
6403                    if rng.lock().gen_bool(0.8) {
6404                        cx.foreground().run_until_parked();
6405                    }
6406                }
6407            }
6408        }
6409
6410        drop(op_start_signals);
6411        let mut clients = futures::future::join_all(clients).await;
6412        cx.foreground().run_until_parked();
6413
6414        let (host_client, mut host_cx, host_err) = clients.remove(0);
6415        if let Some(host_err) = host_err {
6416            panic!("host error - {:?}", host_err);
6417        }
6418        let host_project = host_client.project.as_ref().unwrap();
6419        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6420            project
6421                .worktrees(cx)
6422                .map(|worktree| {
6423                    let snapshot = worktree.read(cx).snapshot();
6424                    (snapshot.id(), snapshot)
6425                })
6426                .collect::<BTreeMap<_, _>>()
6427        });
6428
6429        host_client
6430            .project
6431            .as_ref()
6432            .unwrap()
6433            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6434
6435        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6436            if let Some(guest_err) = guest_err {
6437                panic!("{} error - {:?}", guest_client.username, guest_err);
6438            }
6439            let worktree_snapshots =
6440                guest_client
6441                    .project
6442                    .as_ref()
6443                    .unwrap()
6444                    .read_with(&guest_cx, |project, cx| {
6445                        project
6446                            .worktrees(cx)
6447                            .map(|worktree| {
6448                                let worktree = worktree.read(cx);
6449                                (worktree.id(), worktree.snapshot())
6450                            })
6451                            .collect::<BTreeMap<_, _>>()
6452                    });
6453
6454            assert_eq!(
6455                worktree_snapshots.keys().collect::<Vec<_>>(),
6456                host_worktree_snapshots.keys().collect::<Vec<_>>(),
6457                "{} has different worktrees than the host",
6458                guest_client.username
6459            );
6460            for (id, host_snapshot) in &host_worktree_snapshots {
6461                let guest_snapshot = &worktree_snapshots[id];
6462                assert_eq!(
6463                    guest_snapshot.root_name(),
6464                    host_snapshot.root_name(),
6465                    "{} has different root name than the host for worktree {}",
6466                    guest_client.username,
6467                    id
6468                );
6469                assert_eq!(
6470                    guest_snapshot.entries(false).collect::<Vec<_>>(),
6471                    host_snapshot.entries(false).collect::<Vec<_>>(),
6472                    "{} has different snapshot than the host for worktree {}",
6473                    guest_client.username,
6474                    id
6475                );
6476                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6477            }
6478
6479            guest_client
6480                .project
6481                .as_ref()
6482                .unwrap()
6483                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6484
6485            for guest_buffer in &guest_client.buffers {
6486                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6487                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6488                    project.buffer_for_id(buffer_id, cx).expect(&format!(
6489                        "host does not have buffer for guest:{}, peer:{}, id:{}",
6490                        guest_client.username, guest_client.peer_id, buffer_id
6491                    ))
6492                });
6493                let path = host_buffer
6494                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6495
6496                assert_eq!(
6497                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6498                    0,
6499                    "{}, buffer {}, path {:?} has deferred operations",
6500                    guest_client.username,
6501                    buffer_id,
6502                    path,
6503                );
6504                assert_eq!(
6505                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6506                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6507                    "{}, buffer {}, path {:?}, differs from the host's buffer",
6508                    guest_client.username,
6509                    buffer_id,
6510                    path
6511                );
6512            }
6513
6514            guest_cx.update(|_| drop(guest_client));
6515        }
6516
6517        host_cx.update(|_| drop(host_client));
6518    }
6519
6520    struct TestServer {
6521        peer: Arc<Peer>,
6522        app_state: Arc<AppState>,
6523        server: Arc<Server>,
6524        foreground: Rc<executor::Foreground>,
6525        notifications: mpsc::UnboundedReceiver<()>,
6526        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6527        forbid_connections: Arc<AtomicBool>,
6528        _test_db: TestDb,
6529    }
6530
6531    impl TestServer {
6532        async fn start(
6533            foreground: Rc<executor::Foreground>,
6534            background: Arc<executor::Background>,
6535        ) -> Self {
6536            let test_db = TestDb::fake(background);
6537            let app_state = Self::build_app_state(&test_db).await;
6538            let peer = Peer::new();
6539            let notifications = mpsc::unbounded();
6540            let server = Server::new(app_state.clone(), Some(notifications.0));
6541            Self {
6542                peer,
6543                app_state,
6544                server,
6545                foreground,
6546                notifications: notifications.1,
6547                connection_killers: Default::default(),
6548                forbid_connections: Default::default(),
6549                _test_db: test_db,
6550            }
6551        }
6552
6553        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6554            cx.update(|cx| {
6555                let settings = Settings::test(cx);
6556                cx.set_global(settings);
6557            });
6558
6559            let http = FakeHttpClient::with_404_response();
6560            let user_id =
6561                if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6562                    user.id
6563                } else {
6564                    self.app_state.db.create_user(name, false).await.unwrap()
6565                };
6566            let client_name = name.to_string();
6567            let mut client = Client::new(http.clone());
6568            let server = self.server.clone();
6569            let db = self.app_state.db.clone();
6570            let connection_killers = self.connection_killers.clone();
6571            let forbid_connections = self.forbid_connections.clone();
6572            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6573
6574            Arc::get_mut(&mut client)
6575                .unwrap()
6576                .override_authenticate(move |cx| {
6577                    cx.spawn(|_| async move {
6578                        let access_token = "the-token".to_string();
6579                        Ok(Credentials {
6580                            user_id: user_id.0 as u64,
6581                            access_token,
6582                        })
6583                    })
6584                })
6585                .override_establish_connection(move |credentials, cx| {
6586                    assert_eq!(credentials.user_id, user_id.0 as u64);
6587                    assert_eq!(credentials.access_token, "the-token");
6588
6589                    let server = server.clone();
6590                    let db = db.clone();
6591                    let connection_killers = connection_killers.clone();
6592                    let forbid_connections = forbid_connections.clone();
6593                    let client_name = client_name.clone();
6594                    let connection_id_tx = connection_id_tx.clone();
6595                    cx.spawn(move |cx| async move {
6596                        if forbid_connections.load(SeqCst) {
6597                            Err(EstablishConnectionError::other(anyhow!(
6598                                "server is forbidding connections"
6599                            )))
6600                        } else {
6601                            let (client_conn, server_conn, killed) =
6602                                Connection::in_memory(cx.background());
6603                            connection_killers.lock().insert(user_id, killed);
6604                            let user = db.get_user_by_id(user_id).await.unwrap().unwrap();
6605                            cx.background()
6606                                .spawn(server.handle_connection(
6607                                    server_conn,
6608                                    client_name,
6609                                    user,
6610                                    Some(connection_id_tx),
6611                                    cx.background(),
6612                                ))
6613                                .detach();
6614                            Ok(client_conn)
6615                        }
6616                    })
6617                });
6618
6619            Channel::init(&client);
6620            Project::init(&client);
6621            cx.update(|cx| {
6622                workspace::init(&client, cx);
6623            });
6624
6625            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6626            client
6627                .authenticate_and_connect(false, &cx.to_async())
6628                .await
6629                .unwrap();
6630            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6631
6632            let client = TestClient {
6633                client,
6634                peer_id,
6635                username: name.to_string(),
6636                user_store,
6637                language_registry: Arc::new(LanguageRegistry::test()),
6638                project: Default::default(),
6639                buffers: Default::default(),
6640            };
6641            client.wait_for_current_user(cx).await;
6642            client
6643        }
6644
6645        fn disconnect_client(&self, user_id: UserId) {
6646            self.connection_killers
6647                .lock()
6648                .remove(&user_id)
6649                .unwrap()
6650                .store(true, SeqCst);
6651        }
6652
6653        fn forbid_connections(&self) {
6654            self.forbid_connections.store(true, SeqCst);
6655        }
6656
6657        fn allow_connections(&self) {
6658            self.forbid_connections.store(false, SeqCst);
6659        }
6660
6661        async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6662            while let Some((client_a, cx_a)) = clients.pop() {
6663                for (client_b, cx_b) in &mut clients {
6664                    client_a
6665                        .user_store
6666                        .update(cx_a, |store, cx| {
6667                            store.request_contact(client_b.user_id().unwrap(), cx)
6668                        })
6669                        .await
6670                        .unwrap();
6671                    cx_a.foreground().run_until_parked();
6672                    client_b
6673                        .user_store
6674                        .update(*cx_b, |store, cx| {
6675                            store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6676                        })
6677                        .await
6678                        .unwrap();
6679                }
6680            }
6681        }
6682
6683        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6684            Arc::new(AppState {
6685                db: test_db.db().clone(),
6686                api_token: Default::default(),
6687                invite_link_prefix: Default::default(),
6688            })
6689        }
6690
6691        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6692            self.server.store.read().await
6693        }
6694
6695        async fn condition<F>(&mut self, mut predicate: F)
6696        where
6697            F: FnMut(&Store) -> bool,
6698        {
6699            assert!(
6700                self.foreground.parking_forbidden(),
6701                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6702            );
6703            while !(predicate)(&*self.server.store.read().await) {
6704                self.foreground.start_waiting();
6705                self.notifications.next().await;
6706                self.foreground.finish_waiting();
6707            }
6708        }
6709    }
6710
6711    impl Deref for TestServer {
6712        type Target = Server;
6713
6714        fn deref(&self) -> &Self::Target {
6715            &self.server
6716        }
6717    }
6718
6719    impl Drop for TestServer {
6720        fn drop(&mut self) {
6721            self.peer.reset();
6722        }
6723    }
6724
6725    struct TestClient {
6726        client: Arc<Client>,
6727        username: String,
6728        pub peer_id: PeerId,
6729        pub user_store: ModelHandle<UserStore>,
6730        language_registry: Arc<LanguageRegistry>,
6731        project: Option<ModelHandle<Project>>,
6732        buffers: HashSet<ModelHandle<language::Buffer>>,
6733    }
6734
6735    impl Deref for TestClient {
6736        type Target = Arc<Client>;
6737
6738        fn deref(&self) -> &Self::Target {
6739            &self.client
6740        }
6741    }
6742
6743    struct ContactsSummary {
6744        pub current: Vec<String>,
6745        pub outgoing_requests: Vec<String>,
6746        pub incoming_requests: Vec<String>,
6747    }
6748
6749    impl TestClient {
6750        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6751            UserId::from_proto(
6752                self.user_store
6753                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6754            )
6755        }
6756
6757        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6758            let mut authed_user = self
6759                .user_store
6760                .read_with(cx, |user_store, _| user_store.watch_current_user());
6761            while authed_user.next().await.unwrap().is_none() {}
6762        }
6763
6764        async fn clear_contacts(&self, cx: &mut TestAppContext) {
6765            self.user_store
6766                .update(cx, |store, _| store.clear_contacts())
6767                .await;
6768        }
6769
6770        fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6771            self.user_store.read_with(cx, |store, _| ContactsSummary {
6772                current: store
6773                    .contacts()
6774                    .iter()
6775                    .map(|contact| contact.user.github_login.clone())
6776                    .collect(),
6777                outgoing_requests: store
6778                    .outgoing_contact_requests()
6779                    .iter()
6780                    .map(|user| user.github_login.clone())
6781                    .collect(),
6782                incoming_requests: store
6783                    .incoming_contact_requests()
6784                    .iter()
6785                    .map(|user| user.github_login.clone())
6786                    .collect(),
6787            })
6788        }
6789
6790        async fn build_local_project(
6791            &mut self,
6792            fs: Arc<FakeFs>,
6793            root_path: impl AsRef<Path>,
6794            cx: &mut TestAppContext,
6795        ) -> (ModelHandle<Project>, WorktreeId) {
6796            let project = cx.update(|cx| {
6797                Project::local(
6798                    self.client.clone(),
6799                    self.user_store.clone(),
6800                    self.language_registry.clone(),
6801                    fs,
6802                    cx,
6803                )
6804            });
6805            self.project = Some(project.clone());
6806            let (worktree, _) = project
6807                .update(cx, |p, cx| {
6808                    p.find_or_create_local_worktree(root_path, true, cx)
6809                })
6810                .await
6811                .unwrap();
6812            worktree
6813                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6814                .await;
6815            project
6816                .update(cx, |project, _| project.next_remote_id())
6817                .await;
6818            (project, worktree.read_with(cx, |tree, _| tree.id()))
6819        }
6820
6821        async fn build_remote_project(
6822            &mut self,
6823            host_project: &ModelHandle<Project>,
6824            host_cx: &mut TestAppContext,
6825            guest_cx: &mut TestAppContext,
6826        ) -> ModelHandle<Project> {
6827            let host_project_id = host_project
6828                .read_with(host_cx, |project, _| project.next_remote_id())
6829                .await;
6830            let guest_user_id = self.user_id().unwrap();
6831            let languages =
6832                host_project.read_with(host_cx, |project, _| project.languages().clone());
6833            let project_b = guest_cx.spawn(|mut cx| {
6834                let user_store = self.user_store.clone();
6835                let guest_client = self.client.clone();
6836                async move {
6837                    Project::remote(
6838                        host_project_id,
6839                        guest_client,
6840                        user_store.clone(),
6841                        languages,
6842                        FakeFs::new(cx.background()),
6843                        &mut cx,
6844                    )
6845                    .await
6846                    .unwrap()
6847                }
6848            });
6849            host_cx.foreground().run_until_parked();
6850            host_project.update(host_cx, |project, cx| {
6851                project.respond_to_join_request(guest_user_id, true, cx)
6852            });
6853            let project = project_b.await;
6854            self.project = Some(project.clone());
6855            project
6856        }
6857
6858        fn build_workspace(
6859            &self,
6860            project: &ModelHandle<Project>,
6861            cx: &mut TestAppContext,
6862        ) -> ViewHandle<Workspace> {
6863            let (window_id, _) = cx.add_window(|_| EmptyView);
6864            cx.add_view(window_id, |cx| {
6865                let fs = project.read(cx).fs().clone();
6866                Workspace::new(
6867                    &WorkspaceParams {
6868                        fs,
6869                        project: project.clone(),
6870                        user_store: self.user_store.clone(),
6871                        languages: self.language_registry.clone(),
6872                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6873                        channel_list: cx.add_model(|cx| {
6874                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6875                        }),
6876                        client: self.client.clone(),
6877                    },
6878                    cx,
6879                )
6880            })
6881        }
6882
6883        async fn simulate_host(
6884            mut self,
6885            project: ModelHandle<Project>,
6886            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6887            rng: Arc<Mutex<StdRng>>,
6888            mut cx: TestAppContext,
6889        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6890            async fn simulate_host_internal(
6891                client: &mut TestClient,
6892                project: ModelHandle<Project>,
6893                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6894                rng: Arc<Mutex<StdRng>>,
6895                cx: &mut TestAppContext,
6896            ) -> anyhow::Result<()> {
6897                let fs = project.read_with(cx, |project, _| project.fs().clone());
6898
6899                cx.update(|cx| {
6900                    cx.subscribe(&project, move |project, event, cx| {
6901                        if let project::Event::ContactRequestedJoin(user) = event {
6902                            log::info!("Host: accepting join request from {}", user.github_login);
6903                            project.update(cx, |project, cx| {
6904                                project.respond_to_join_request(user.id, true, cx)
6905                            });
6906                        }
6907                    })
6908                    .detach();
6909                });
6910
6911                while op_start_signal.next().await.is_some() {
6912                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6913                    let files = fs.as_fake().files().await;
6914                    match distribution {
6915                        0..=19 if !files.is_empty() => {
6916                            let path = files.choose(&mut *rng.lock()).unwrap();
6917                            let mut path = path.as_path();
6918                            while let Some(parent_path) = path.parent() {
6919                                path = parent_path;
6920                                if rng.lock().gen() {
6921                                    break;
6922                                }
6923                            }
6924
6925                            log::info!("Host: find/create local worktree {:?}", path);
6926                            let find_or_create_worktree = project.update(cx, |project, cx| {
6927                                project.find_or_create_local_worktree(path, true, cx)
6928                            });
6929                            if rng.lock().gen() {
6930                                cx.background().spawn(find_or_create_worktree).detach();
6931                            } else {
6932                                find_or_create_worktree.await?;
6933                            }
6934                        }
6935                        20..=79 if !files.is_empty() => {
6936                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6937                                let file = files.choose(&mut *rng.lock()).unwrap();
6938                                let (worktree, path) = project
6939                                    .update(cx, |project, cx| {
6940                                        project.find_or_create_local_worktree(
6941                                            file.clone(),
6942                                            true,
6943                                            cx,
6944                                        )
6945                                    })
6946                                    .await?;
6947                                let project_path =
6948                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6949                                log::info!(
6950                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6951                                    file,
6952                                    project_path.0,
6953                                    project_path.1
6954                                );
6955                                let buffer = project
6956                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6957                                    .await
6958                                    .unwrap();
6959                                client.buffers.insert(buffer.clone());
6960                                buffer
6961                            } else {
6962                                client
6963                                    .buffers
6964                                    .iter()
6965                                    .choose(&mut *rng.lock())
6966                                    .unwrap()
6967                                    .clone()
6968                            };
6969
6970                            if rng.lock().gen_bool(0.1) {
6971                                cx.update(|cx| {
6972                                    log::info!(
6973                                        "Host: dropping buffer {:?}",
6974                                        buffer.read(cx).file().unwrap().full_path(cx)
6975                                    );
6976                                    client.buffers.remove(&buffer);
6977                                    drop(buffer);
6978                                });
6979                            } else {
6980                                buffer.update(cx, |buffer, cx| {
6981                                    log::info!(
6982                                        "Host: updating buffer {:?} ({})",
6983                                        buffer.file().unwrap().full_path(cx),
6984                                        buffer.remote_id()
6985                                    );
6986
6987                                    if rng.lock().gen_bool(0.7) {
6988                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6989                                    } else {
6990                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6991                                    }
6992                                });
6993                            }
6994                        }
6995                        _ => loop {
6996                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6997                            let mut path = PathBuf::new();
6998                            path.push("/");
6999                            for _ in 0..path_component_count {
7000                                let letter = rng.lock().gen_range(b'a'..=b'z');
7001                                path.push(std::str::from_utf8(&[letter]).unwrap());
7002                            }
7003                            path.set_extension("rs");
7004                            let parent_path = path.parent().unwrap();
7005
7006                            log::info!("Host: creating file {:?}", path,);
7007
7008                            if fs.create_dir(&parent_path).await.is_ok()
7009                                && fs.create_file(&path, Default::default()).await.is_ok()
7010                            {
7011                                break;
7012                            } else {
7013                                log::info!("Host: cannot create file");
7014                            }
7015                        },
7016                    }
7017
7018                    cx.background().simulate_random_delay().await;
7019                }
7020
7021                Ok(())
7022            }
7023
7024            let result =
7025                simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
7026                    .await;
7027            log::info!("Host done");
7028            self.project = Some(project);
7029            (self, cx, result.err())
7030        }
7031
7032        pub async fn simulate_guest(
7033            mut self,
7034            guest_username: String,
7035            project: ModelHandle<Project>,
7036            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
7037            rng: Arc<Mutex<StdRng>>,
7038            mut cx: TestAppContext,
7039        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
7040            async fn simulate_guest_internal(
7041                client: &mut TestClient,
7042                guest_username: &str,
7043                project: ModelHandle<Project>,
7044                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
7045                rng: Arc<Mutex<StdRng>>,
7046                cx: &mut TestAppContext,
7047            ) -> anyhow::Result<()> {
7048                while op_start_signal.next().await.is_some() {
7049                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
7050                        let worktree = if let Some(worktree) =
7051                            project.read_with(cx, |project, cx| {
7052                                project
7053                                    .worktrees(&cx)
7054                                    .filter(|worktree| {
7055                                        let worktree = worktree.read(cx);
7056                                        worktree.is_visible()
7057                                            && worktree.entries(false).any(|e| e.is_file())
7058                                    })
7059                                    .choose(&mut *rng.lock())
7060                            }) {
7061                            worktree
7062                        } else {
7063                            cx.background().simulate_random_delay().await;
7064                            continue;
7065                        };
7066
7067                        let (worktree_root_name, project_path) =
7068                            worktree.read_with(cx, |worktree, _| {
7069                                let entry = worktree
7070                                    .entries(false)
7071                                    .filter(|e| e.is_file())
7072                                    .choose(&mut *rng.lock())
7073                                    .unwrap();
7074                                (
7075                                    worktree.root_name().to_string(),
7076                                    (worktree.id(), entry.path.clone()),
7077                                )
7078                            });
7079                        log::info!(
7080                            "{}: opening path {:?} in worktree {} ({})",
7081                            guest_username,
7082                            project_path.1,
7083                            project_path.0,
7084                            worktree_root_name,
7085                        );
7086                        let buffer = project
7087                            .update(cx, |project, cx| {
7088                                project.open_buffer(project_path.clone(), cx)
7089                            })
7090                            .await?;
7091                        log::info!(
7092                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
7093                            guest_username,
7094                            project_path.1,
7095                            project_path.0,
7096                            worktree_root_name,
7097                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
7098                        );
7099                        client.buffers.insert(buffer.clone());
7100                        buffer
7101                    } else {
7102                        client
7103                            .buffers
7104                            .iter()
7105                            .choose(&mut *rng.lock())
7106                            .unwrap()
7107                            .clone()
7108                    };
7109
7110                    let choice = rng.lock().gen_range(0..100);
7111                    match choice {
7112                        0..=9 => {
7113                            cx.update(|cx| {
7114                                log::info!(
7115                                    "{}: dropping buffer {:?}",
7116                                    guest_username,
7117                                    buffer.read(cx).file().unwrap().full_path(cx)
7118                                );
7119                                client.buffers.remove(&buffer);
7120                                drop(buffer);
7121                            });
7122                        }
7123                        10..=19 => {
7124                            let completions = project.update(cx, |project, cx| {
7125                                log::info!(
7126                                    "{}: requesting completions for buffer {} ({:?})",
7127                                    guest_username,
7128                                    buffer.read(cx).remote_id(),
7129                                    buffer.read(cx).file().unwrap().full_path(cx)
7130                                );
7131                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7132                                project.completions(&buffer, offset, cx)
7133                            });
7134                            let completions = cx.background().spawn(async move {
7135                                completions
7136                                    .await
7137                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
7138                            });
7139                            if rng.lock().gen_bool(0.3) {
7140                                log::info!("{}: detaching completions request", guest_username);
7141                                cx.update(|cx| completions.detach_and_log_err(cx));
7142                            } else {
7143                                completions.await?;
7144                            }
7145                        }
7146                        20..=29 => {
7147                            let code_actions = project.update(cx, |project, cx| {
7148                                log::info!(
7149                                    "{}: requesting code actions for buffer {} ({:?})",
7150                                    guest_username,
7151                                    buffer.read(cx).remote_id(),
7152                                    buffer.read(cx).file().unwrap().full_path(cx)
7153                                );
7154                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7155                                project.code_actions(&buffer, range, cx)
7156                            });
7157                            let code_actions = cx.background().spawn(async move {
7158                                code_actions.await.map_err(|err| {
7159                                    anyhow!("code actions request failed: {:?}", err)
7160                                })
7161                            });
7162                            if rng.lock().gen_bool(0.3) {
7163                                log::info!("{}: detaching code actions request", guest_username);
7164                                cx.update(|cx| code_actions.detach_and_log_err(cx));
7165                            } else {
7166                                code_actions.await?;
7167                            }
7168                        }
7169                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7170                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7171                                log::info!(
7172                                    "{}: saving buffer {} ({:?})",
7173                                    guest_username,
7174                                    buffer.remote_id(),
7175                                    buffer.file().unwrap().full_path(cx)
7176                                );
7177                                (buffer.version(), buffer.save(cx))
7178                            });
7179                            let save = cx.background().spawn(async move {
7180                                let (saved_version, _) = save
7181                                    .await
7182                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7183                                assert!(saved_version.observed_all(&requested_version));
7184                                Ok::<_, anyhow::Error>(())
7185                            });
7186                            if rng.lock().gen_bool(0.3) {
7187                                log::info!("{}: detaching save request", guest_username);
7188                                cx.update(|cx| save.detach_and_log_err(cx));
7189                            } else {
7190                                save.await?;
7191                            }
7192                        }
7193                        40..=44 => {
7194                            let prepare_rename = project.update(cx, |project, cx| {
7195                                log::info!(
7196                                    "{}: preparing rename for buffer {} ({:?})",
7197                                    guest_username,
7198                                    buffer.read(cx).remote_id(),
7199                                    buffer.read(cx).file().unwrap().full_path(cx)
7200                                );
7201                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7202                                project.prepare_rename(buffer, offset, cx)
7203                            });
7204                            let prepare_rename = cx.background().spawn(async move {
7205                                prepare_rename.await.map_err(|err| {
7206                                    anyhow!("prepare rename request failed: {:?}", err)
7207                                })
7208                            });
7209                            if rng.lock().gen_bool(0.3) {
7210                                log::info!("{}: detaching prepare rename request", guest_username);
7211                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7212                            } else {
7213                                prepare_rename.await?;
7214                            }
7215                        }
7216                        45..=49 => {
7217                            let definitions = project.update(cx, |project, cx| {
7218                                log::info!(
7219                                    "{}: requesting definitions for buffer {} ({:?})",
7220                                    guest_username,
7221                                    buffer.read(cx).remote_id(),
7222                                    buffer.read(cx).file().unwrap().full_path(cx)
7223                                );
7224                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7225                                project.definition(&buffer, offset, cx)
7226                            });
7227                            let definitions = cx.background().spawn(async move {
7228                                definitions
7229                                    .await
7230                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7231                            });
7232                            if rng.lock().gen_bool(0.3) {
7233                                log::info!("{}: detaching definitions request", guest_username);
7234                                cx.update(|cx| definitions.detach_and_log_err(cx));
7235                            } else {
7236                                client
7237                                    .buffers
7238                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7239                            }
7240                        }
7241                        50..=54 => {
7242                            let highlights = project.update(cx, |project, cx| {
7243                                log::info!(
7244                                    "{}: requesting highlights for buffer {} ({:?})",
7245                                    guest_username,
7246                                    buffer.read(cx).remote_id(),
7247                                    buffer.read(cx).file().unwrap().full_path(cx)
7248                                );
7249                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7250                                project.document_highlights(&buffer, offset, cx)
7251                            });
7252                            let highlights = cx.background().spawn(async move {
7253                                highlights
7254                                    .await
7255                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7256                            });
7257                            if rng.lock().gen_bool(0.3) {
7258                                log::info!("{}: detaching highlights request", guest_username);
7259                                cx.update(|cx| highlights.detach_and_log_err(cx));
7260                            } else {
7261                                highlights.await?;
7262                            }
7263                        }
7264                        55..=59 => {
7265                            let search = project.update(cx, |project, cx| {
7266                                let query = rng.lock().gen_range('a'..='z');
7267                                log::info!("{}: project-wide search {:?}", guest_username, query);
7268                                project.search(SearchQuery::text(query, false, false), cx)
7269                            });
7270                            let search = cx.background().spawn(async move {
7271                                search
7272                                    .await
7273                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
7274                            });
7275                            if rng.lock().gen_bool(0.3) {
7276                                log::info!("{}: detaching search request", guest_username);
7277                                cx.update(|cx| search.detach_and_log_err(cx));
7278                            } else {
7279                                client.buffers.extend(search.await?.into_keys());
7280                            }
7281                        }
7282                        60..=69 => {
7283                            let worktree = project
7284                                .read_with(cx, |project, cx| {
7285                                    project
7286                                        .worktrees(&cx)
7287                                        .filter(|worktree| {
7288                                            let worktree = worktree.read(cx);
7289                                            worktree.is_visible()
7290                                                && worktree.entries(false).any(|e| e.is_file())
7291                                                && worktree
7292                                                    .root_entry()
7293                                                    .map_or(false, |e| e.is_dir())
7294                                        })
7295                                        .choose(&mut *rng.lock())
7296                                })
7297                                .unwrap();
7298                            let (worktree_id, worktree_root_name) = worktree
7299                                .read_with(cx, |worktree, _| {
7300                                    (worktree.id(), worktree.root_name().to_string())
7301                                });
7302
7303                            let mut new_name = String::new();
7304                            for _ in 0..10 {
7305                                let letter = rng.lock().gen_range('a'..='z');
7306                                new_name.push(letter);
7307                            }
7308                            let mut new_path = PathBuf::new();
7309                            new_path.push(new_name);
7310                            new_path.set_extension("rs");
7311                            log::info!(
7312                                "{}: creating {:?} in worktree {} ({})",
7313                                guest_username,
7314                                new_path,
7315                                worktree_id,
7316                                worktree_root_name,
7317                            );
7318                            project
7319                                .update(cx, |project, cx| {
7320                                    project.create_entry((worktree_id, new_path), false, cx)
7321                                })
7322                                .unwrap()
7323                                .await?;
7324                        }
7325                        _ => {
7326                            buffer.update(cx, |buffer, cx| {
7327                                log::info!(
7328                                    "{}: updating buffer {} ({:?})",
7329                                    guest_username,
7330                                    buffer.remote_id(),
7331                                    buffer.file().unwrap().full_path(cx)
7332                                );
7333                                if rng.lock().gen_bool(0.7) {
7334                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7335                                } else {
7336                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7337                                }
7338                            });
7339                        }
7340                    }
7341                    cx.background().simulate_random_delay().await;
7342                }
7343                Ok(())
7344            }
7345
7346            let result = simulate_guest_internal(
7347                &mut self,
7348                &guest_username,
7349                project.clone(),
7350                op_start_signal,
7351                rng,
7352                &mut cx,
7353            )
7354            .await;
7355            log::info!("{}: done", guest_username);
7356
7357            self.project = Some(project);
7358            (self, cx, result.err())
7359        }
7360    }
7361
7362    impl Drop for TestClient {
7363        fn drop(&mut self) {
7364            self.client.tear_down();
7365        }
7366    }
7367
7368    impl Executor for Arc<gpui::executor::Background> {
7369        type Sleep = gpui::executor::Timer;
7370
7371        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7372            self.spawn(future).detach();
7373        }
7374
7375        fn sleep(&self, duration: Duration) -> Self::Sleep {
7376            self.as_ref().timer(duration)
7377        }
7378    }
7379
7380    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7381        channel
7382            .messages()
7383            .cursor::<()>()
7384            .map(|m| {
7385                (
7386                    m.sender.github_login.clone(),
7387                    m.body.clone(),
7388                    m.is_pending(),
7389                )
7390            })
7391            .collect()
7392    }
7393
7394    struct EmptyView;
7395
7396    impl gpui::Entity for EmptyView {
7397        type Event = ();
7398    }
7399
7400    impl gpui::View for EmptyView {
7401        fn ui_name() -> &'static str {
7402            "empty view"
7403        }
7404
7405        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7406            gpui::Element::boxed(gpui::elements::Empty::new())
7407        }
7408    }
7409}