rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{self, ChannelId, MessageId, User, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::IntoResponse,
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::HashMap;
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::{
  40        atomic::{AtomicBool, Ordering::SeqCst},
  41        Arc,
  42    },
  43    time::Duration,
  44};
  45use store::{Store, Worktree};
  46use time::OffsetDateTime;
  47use tokio::{
  48    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  49    time::Sleep,
  50};
  51use tower::ServiceBuilder;
  52use tracing::{info_span, instrument, Instrument};
  53
  54type MessageHandler =
  55    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  56
  57struct Response<R> {
  58    server: Arc<Server>,
  59    receipt: Receipt<R>,
  60    responded: Arc<AtomicBool>,
  61}
  62
  63impl<R: RequestMessage> Response<R> {
  64    fn send(self, payload: R::Response) -> Result<()> {
  65        self.responded.store(true, SeqCst);
  66        self.server.peer.respond(self.receipt, payload)?;
  67        Ok(())
  68    }
  69
  70    fn into_receipt(self) -> Receipt<R> {
  71        self.responded.store(true, SeqCst);
  72        self.receipt
  73    }
  74}
  75
  76pub struct Server {
  77    peer: Arc<Peer>,
  78    store: RwLock<Store>,
  79    app_state: Arc<AppState>,
  80    handlers: HashMap<TypeId, MessageHandler>,
  81    notifications: Option<mpsc::UnboundedSender<()>>,
  82}
  83
  84pub trait Executor: Send + Clone {
  85    type Sleep: Send + Future;
  86    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  87    fn sleep(&self, duration: Duration) -> Self::Sleep;
  88}
  89
  90#[derive(Clone)]
  91pub struct RealExecutor;
  92
  93const MESSAGE_COUNT_PER_PAGE: usize = 100;
  94const MAX_MESSAGE_LEN: usize = 1024;
  95
  96struct StoreReadGuard<'a> {
  97    guard: RwLockReadGuard<'a, Store>,
  98    _not_send: PhantomData<Rc<()>>,
  99}
 100
 101struct StoreWriteGuard<'a> {
 102    guard: RwLockWriteGuard<'a, Store>,
 103    _not_send: PhantomData<Rc<()>>,
 104}
 105
 106impl Server {
 107    pub fn new(
 108        app_state: Arc<AppState>,
 109        notifications: Option<mpsc::UnboundedSender<()>>,
 110    ) -> Arc<Self> {
 111        let mut server = Self {
 112            peer: Peer::new(),
 113            app_state,
 114            store: Default::default(),
 115            handlers: Default::default(),
 116            notifications,
 117        };
 118
 119        server
 120            .add_request_handler(Server::ping)
 121            .add_request_handler(Server::register_project)
 122            .add_message_handler(Server::unregister_project)
 123            .add_request_handler(Server::join_project)
 124            .add_message_handler(Server::leave_project)
 125            .add_message_handler(Server::respond_to_join_project_request)
 126            .add_request_handler(Server::register_worktree)
 127            .add_message_handler(Server::unregister_worktree)
 128            .add_request_handler(Server::update_worktree)
 129            .add_message_handler(Server::start_language_server)
 130            .add_message_handler(Server::update_language_server)
 131            .add_message_handler(Server::update_diagnostic_summary)
 132            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 133            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 134            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 135            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 136            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 137            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 138            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 139            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 140            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 141            .add_request_handler(
 142                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 143            )
 144            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 145            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 146            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 147            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 148            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 149            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 150            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 151            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 152            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 153            .add_request_handler(Server::update_buffer)
 154            .add_message_handler(Server::update_buffer_file)
 155            .add_message_handler(Server::buffer_reloaded)
 156            .add_message_handler(Server::buffer_saved)
 157            .add_request_handler(Server::save_buffer)
 158            .add_request_handler(Server::get_channels)
 159            .add_request_handler(Server::get_users)
 160            .add_request_handler(Server::fuzzy_search_users)
 161            .add_request_handler(Server::request_contact)
 162            .add_request_handler(Server::remove_contact)
 163            .add_request_handler(Server::respond_to_contact_request)
 164            .add_request_handler(Server::join_channel)
 165            .add_message_handler(Server::leave_channel)
 166            .add_request_handler(Server::send_channel_message)
 167            .add_request_handler(Server::follow)
 168            .add_message_handler(Server::unfollow)
 169            .add_message_handler(Server::update_followers)
 170            .add_request_handler(Server::get_channel_messages);
 171
 172        Arc::new(server)
 173    }
 174
 175    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 176    where
 177        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 178        Fut: 'static + Send + Future<Output = Result<()>>,
 179        M: EnvelopedMessage,
 180    {
 181        let prev_handler = self.handlers.insert(
 182            TypeId::of::<M>(),
 183            Box::new(move |server, envelope| {
 184                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 185                let span = info_span!(
 186                    "handle message",
 187                    payload_type = envelope.payload_type_name(),
 188                    payload = format!("{:?}", envelope.payload).as_str(),
 189                );
 190                let future = (handler)(server, *envelope);
 191                async move {
 192                    if let Err(error) = future.await {
 193                        tracing::error!(%error, "error handling message");
 194                    }
 195                }
 196                .instrument(span)
 197                .boxed()
 198            }),
 199        );
 200        if prev_handler.is_some() {
 201            panic!("registered a handler for the same message twice");
 202        }
 203        self
 204    }
 205
 206    /// Handle a request while holding a lock to the store. This is useful when we're registering
 207    /// a connection but we want to respond on the connection before anybody else can send on it.
 208    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 209    where
 210        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
 211        Fut: Send + Future<Output = Result<()>>,
 212        M: RequestMessage,
 213    {
 214        let handler = Arc::new(handler);
 215        self.add_message_handler(move |server, envelope| {
 216            let receipt = envelope.receipt();
 217            let handler = handler.clone();
 218            async move {
 219                let responded = Arc::new(AtomicBool::default());
 220                let response = Response {
 221                    server: server.clone(),
 222                    responded: responded.clone(),
 223                    receipt: envelope.receipt(),
 224                };
 225                match (handler)(server.clone(), envelope, response).await {
 226                    Ok(()) => {
 227                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 228                            Ok(())
 229                        } else {
 230                            Err(anyhow!("handler did not send a response"))?
 231                        }
 232                    }
 233                    Err(error) => {
 234                        server.peer.respond_with_error(
 235                            receipt,
 236                            proto::Error {
 237                                message: error.to_string(),
 238                            },
 239                        )?;
 240                        Err(error)
 241                    }
 242                }
 243            }
 244        })
 245    }
 246
 247    pub fn handle_connection<E: Executor>(
 248        self: &Arc<Self>,
 249        connection: Connection,
 250        address: String,
 251        user: User,
 252        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 253        executor: E,
 254    ) -> impl Future<Output = Result<()>> {
 255        let mut this = self.clone();
 256        let user_id = user.id;
 257        let login = user.github_login;
 258        let span = info_span!("handle connection", %user_id, %login, %address);
 259        async move {
 260            let (connection_id, handle_io, mut incoming_rx) = this
 261                .peer
 262                .add_connection(connection, {
 263                    let executor = executor.clone();
 264                    move |duration| {
 265                        let timer = executor.sleep(duration);
 266                        async move {
 267                            timer.await;
 268                        }
 269                    }
 270                })
 271                .await;
 272
 273            tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
 274
 275            if let Some(send_connection_id) = send_connection_id.as_mut() {
 276                let _ = send_connection_id.send(connection_id).await;
 277            }
 278
 279            let contacts = this.app_state.db.get_contacts(user_id).await?;
 280
 281            {
 282                let mut store = this.store_mut().await;
 283                store.add_connection(connection_id, user_id);
 284                this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 285            }
 286            this.update_user_contacts(user_id).await?;
 287
 288            let handle_io = handle_io.fuse();
 289            futures::pin_mut!(handle_io);
 290            loop {
 291                let next_message = incoming_rx.next().fuse();
 292                futures::pin_mut!(next_message);
 293                futures::select_biased! {
 294                    result = handle_io => {
 295                        if let Err(error) = result {
 296                            tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
 297                        }
 298                        break;
 299                    }
 300                    message = next_message => {
 301                        if let Some(message) = message {
 302                            let type_name = message.payload_type_name();
 303                            let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
 304                            async {
 305                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 306                                    let notifications = this.notifications.clone();
 307                                    let is_background = message.is_background();
 308                                    let handle_message = (handler)(this.clone(), message);
 309                                    let handle_message = async move {
 310                                        handle_message.await;
 311                                        if let Some(mut notifications) = notifications {
 312                                            let _ = notifications.send(()).await;
 313                                        }
 314                                    };
 315                                    if is_background {
 316                                        executor.spawn_detached(handle_message);
 317                                    } else {
 318                                        handle_message.await;
 319                                    }
 320                                } else {
 321                                    tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
 322                                }
 323                            }.instrument(span).await;
 324                        } else {
 325                            tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
 326                            break;
 327                        }
 328                    }
 329                }
 330            }
 331
 332            tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
 333            if let Err(error) = this.sign_out(connection_id).await {
 334                tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
 335            }
 336
 337            Ok(())
 338        }.instrument(span)
 339    }
 340
 341    #[instrument(skip(self), err)]
 342    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 343        self.peer.disconnect(connection_id);
 344
 345        let removed_user_id = {
 346            let mut store = self.store_mut().await;
 347            let removed_connection = store.remove_connection(connection_id)?;
 348
 349            for (project_id, project) in removed_connection.hosted_projects {
 350                broadcast(connection_id, project.guests.keys().copied(), |conn_id| {
 351                    self.peer
 352                        .send(conn_id, proto::UnregisterProject { project_id })
 353                });
 354
 355                for (_, receipts) in project.join_requests {
 356                    for receipt in receipts {
 357                        self.peer.respond(
 358                            receipt,
 359                            proto::JoinProjectResponse {
 360                                variant: Some(proto::join_project_response::Variant::Decline(
 361                                    proto::join_project_response::Decline {
 362                                        reason: proto::join_project_response::decline::Reason::WentOffline as i32
 363                                    },
 364                                )),
 365                            },
 366                        )?;
 367                    }
 368                }
 369            }
 370
 371            for project_id in removed_connection.guest_project_ids {
 372                if let Some(project) = store.project(project_id).trace_err() {
 373                    broadcast(connection_id, project.connection_ids(), |conn_id| {
 374                        self.peer.send(
 375                            conn_id,
 376                            proto::RemoveProjectCollaborator {
 377                                project_id,
 378                                peer_id: connection_id.0,
 379                            },
 380                        )
 381                    });
 382                    if project.guests.is_empty() {
 383                        self.peer
 384                            .send(
 385                                project.host_connection_id,
 386                                proto::ProjectUnshared { project_id },
 387                            )
 388                            .trace_err();
 389                    }
 390                }
 391            }
 392
 393            removed_connection.user_id
 394        };
 395
 396        self.update_user_contacts(removed_user_id).await?;
 397
 398        Ok(())
 399    }
 400
 401    async fn ping(
 402        self: Arc<Server>,
 403        _: TypedEnvelope<proto::Ping>,
 404        response: Response<proto::Ping>,
 405    ) -> Result<()> {
 406        response.send(proto::Ack {})?;
 407        Ok(())
 408    }
 409
 410    async fn register_project(
 411        self: Arc<Server>,
 412        request: TypedEnvelope<proto::RegisterProject>,
 413        response: Response<proto::RegisterProject>,
 414    ) -> Result<()> {
 415        let user_id;
 416        let project_id;
 417        {
 418            let mut state = self.store_mut().await;
 419            user_id = state.user_id_for_connection(request.sender_id)?;
 420            project_id = state.register_project(request.sender_id, user_id);
 421        };
 422        self.update_user_contacts(user_id).await?;
 423        response.send(proto::RegisterProjectResponse { project_id })?;
 424        Ok(())
 425    }
 426
 427    async fn unregister_project(
 428        self: Arc<Server>,
 429        request: TypedEnvelope<proto::UnregisterProject>,
 430    ) -> Result<()> {
 431        let (user_id, project) = {
 432            let mut state = self.store_mut().await;
 433            let project =
 434                state.unregister_project(request.payload.project_id, request.sender_id)?;
 435            (state.user_id_for_connection(request.sender_id)?, project)
 436        };
 437
 438        broadcast(
 439            request.sender_id,
 440            project.guests.keys().copied(),
 441            |conn_id| {
 442                self.peer.send(
 443                    conn_id,
 444                    proto::UnregisterProject {
 445                        project_id: request.payload.project_id,
 446                    },
 447                )
 448            },
 449        );
 450        for (_, receipts) in project.join_requests {
 451            for receipt in receipts {
 452                self.peer.respond(
 453                    receipt,
 454                    proto::JoinProjectResponse {
 455                        variant: Some(proto::join_project_response::Variant::Decline(
 456                            proto::join_project_response::Decline {
 457                                reason: proto::join_project_response::decline::Reason::Closed
 458                                    as i32,
 459                            },
 460                        )),
 461                    },
 462                )?;
 463            }
 464        }
 465
 466        self.update_user_contacts(user_id).await?;
 467        Ok(())
 468    }
 469
 470    async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
 471        let contacts = self.app_state.db.get_contacts(user_id).await?;
 472        let store = self.store().await;
 473        let updated_contact = store.contact_for_user(user_id, false);
 474        for contact in contacts {
 475            if let db::Contact::Accepted {
 476                user_id: contact_user_id,
 477                ..
 478            } = contact
 479            {
 480                for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
 481                    self.peer
 482                        .send(
 483                            contact_conn_id,
 484                            proto::UpdateContacts {
 485                                contacts: vec![updated_contact.clone()],
 486                                remove_contacts: Default::default(),
 487                                incoming_requests: Default::default(),
 488                                remove_incoming_requests: Default::default(),
 489                                outgoing_requests: Default::default(),
 490                                remove_outgoing_requests: Default::default(),
 491                            },
 492                        )
 493                        .trace_err();
 494                }
 495            }
 496        }
 497        Ok(())
 498    }
 499
 500    async fn join_project(
 501        self: Arc<Server>,
 502        request: TypedEnvelope<proto::JoinProject>,
 503        response: Response<proto::JoinProject>,
 504    ) -> Result<()> {
 505        let project_id = request.payload.project_id;
 506        let host_user_id;
 507        let guest_user_id;
 508        let host_connection_id;
 509        {
 510            let state = self.store().await;
 511            let project = state.project(project_id)?;
 512            host_user_id = project.host_user_id;
 513            host_connection_id = project.host_connection_id;
 514            guest_user_id = state.user_id_for_connection(request.sender_id)?;
 515        };
 516
 517        let has_contact = self
 518            .app_state
 519            .db
 520            .has_contact(guest_user_id, host_user_id)
 521            .await?;
 522        if !has_contact {
 523            return Err(anyhow!("no such project"))?;
 524        }
 525
 526        self.store_mut().await.request_join_project(
 527            guest_user_id,
 528            project_id,
 529            response.into_receipt(),
 530        )?;
 531        self.peer.send(
 532            host_connection_id,
 533            proto::RequestJoinProject {
 534                project_id,
 535                requester_id: guest_user_id.to_proto(),
 536            },
 537        )?;
 538        Ok(())
 539    }
 540
 541    async fn respond_to_join_project_request(
 542        self: Arc<Server>,
 543        request: TypedEnvelope<proto::RespondToJoinProjectRequest>,
 544    ) -> Result<()> {
 545        let host_user_id;
 546
 547        {
 548            let mut state = self.store_mut().await;
 549            let project_id = request.payload.project_id;
 550            let project = state.project(project_id)?;
 551            if project.host_connection_id != request.sender_id {
 552                Err(anyhow!("no such connection"))?;
 553            }
 554
 555            host_user_id = project.host_user_id;
 556            let guest_user_id = UserId::from_proto(request.payload.requester_id);
 557
 558            if !request.payload.allow {
 559                let receipts = state
 560                    .deny_join_project_request(request.sender_id, guest_user_id, project_id)
 561                    .ok_or_else(|| anyhow!("no such request"))?;
 562                for receipt in receipts {
 563                    self.peer.respond(
 564                        receipt,
 565                        proto::JoinProjectResponse {
 566                            variant: Some(proto::join_project_response::Variant::Decline(
 567                                proto::join_project_response::Decline {
 568                                    reason: proto::join_project_response::decline::Reason::Declined
 569                                        as i32,
 570                                },
 571                            )),
 572                        },
 573                    )?;
 574                }
 575                return Ok(());
 576            }
 577
 578            let (receipts_with_replica_ids, project) = state
 579                .accept_join_project_request(request.sender_id, guest_user_id, project_id)
 580                .ok_or_else(|| anyhow!("no such request"))?;
 581
 582            let peer_count = project.guests.len();
 583            let mut collaborators = Vec::with_capacity(peer_count);
 584            collaborators.push(proto::Collaborator {
 585                peer_id: project.host_connection_id.0,
 586                replica_id: 0,
 587                user_id: project.host_user_id.to_proto(),
 588            });
 589            let worktrees = project
 590                .worktrees
 591                .iter()
 592                .filter_map(|(id, shared_worktree)| {
 593                    let worktree = project.worktrees.get(&id)?;
 594                    Some(proto::Worktree {
 595                        id: *id,
 596                        root_name: worktree.root_name.clone(),
 597                        entries: shared_worktree.entries.values().cloned().collect(),
 598                        diagnostic_summaries: shared_worktree
 599                            .diagnostic_summaries
 600                            .values()
 601                            .cloned()
 602                            .collect(),
 603                        visible: worktree.visible,
 604                        scan_id: shared_worktree.scan_id,
 605                    })
 606                })
 607                .collect::<Vec<_>>();
 608
 609            // Add all guests other than the requesting user's own connections as collaborators
 610            for (peer_conn_id, (peer_replica_id, peer_user_id)) in &project.guests {
 611                if receipts_with_replica_ids
 612                    .iter()
 613                    .all(|(receipt, _)| receipt.sender_id != *peer_conn_id)
 614                {
 615                    collaborators.push(proto::Collaborator {
 616                        peer_id: peer_conn_id.0,
 617                        replica_id: *peer_replica_id as u32,
 618                        user_id: peer_user_id.to_proto(),
 619                    });
 620                }
 621            }
 622
 623            for conn_id in project.connection_ids() {
 624                for (receipt, replica_id) in &receipts_with_replica_ids {
 625                    if conn_id != receipt.sender_id {
 626                        self.peer.send(
 627                            conn_id,
 628                            proto::AddProjectCollaborator {
 629                                project_id,
 630                                collaborator: Some(proto::Collaborator {
 631                                    peer_id: receipt.sender_id.0,
 632                                    replica_id: *replica_id as u32,
 633                                    user_id: guest_user_id.to_proto(),
 634                                }),
 635                            },
 636                        )?;
 637                    }
 638                }
 639            }
 640
 641            for (receipt, replica_id) in receipts_with_replica_ids {
 642                self.peer.respond(
 643                    receipt,
 644                    proto::JoinProjectResponse {
 645                        variant: Some(proto::join_project_response::Variant::Accept(
 646                            proto::join_project_response::Accept {
 647                                worktrees: worktrees.clone(),
 648                                replica_id: replica_id as u32,
 649                                collaborators: collaborators.clone(),
 650                                language_servers: project.language_servers.clone(),
 651                            },
 652                        )),
 653                    },
 654                )?;
 655            }
 656        }
 657
 658        self.update_user_contacts(host_user_id).await?;
 659        Ok(())
 660    }
 661
 662    async fn leave_project(
 663        self: Arc<Server>,
 664        request: TypedEnvelope<proto::LeaveProject>,
 665    ) -> Result<()> {
 666        let sender_id = request.sender_id;
 667        let project_id = request.payload.project_id;
 668        let project;
 669        {
 670            let mut store = self.store_mut().await;
 671            project = store.leave_project(sender_id, project_id)?;
 672
 673            if project.remove_collaborator {
 674                broadcast(sender_id, project.connection_ids, |conn_id| {
 675                    self.peer.send(
 676                        conn_id,
 677                        proto::RemoveProjectCollaborator {
 678                            project_id,
 679                            peer_id: sender_id.0,
 680                        },
 681                    )
 682                });
 683            }
 684
 685            if let Some(requester_id) = project.cancel_request {
 686                self.peer.send(
 687                    project.host_connection_id,
 688                    proto::JoinProjectRequestCancelled {
 689                        project_id,
 690                        requester_id: requester_id.to_proto(),
 691                    },
 692                )?;
 693            }
 694
 695            if project.unshare {
 696                self.peer.send(
 697                    project.host_connection_id,
 698                    proto::ProjectUnshared { project_id },
 699                )?;
 700            }
 701        }
 702        self.update_user_contacts(project.host_user_id).await?;
 703        Ok(())
 704    }
 705
 706    async fn register_worktree(
 707        self: Arc<Server>,
 708        request: TypedEnvelope<proto::RegisterWorktree>,
 709        response: Response<proto::RegisterWorktree>,
 710    ) -> Result<()> {
 711        let host_user_id;
 712        {
 713            let mut state = self.store_mut().await;
 714            host_user_id = state.user_id_for_connection(request.sender_id)?;
 715
 716            let guest_connection_ids = state
 717                .read_project(request.payload.project_id, request.sender_id)?
 718                .guest_connection_ids();
 719            state.register_worktree(
 720                request.payload.project_id,
 721                request.payload.worktree_id,
 722                request.sender_id,
 723                Worktree {
 724                    root_name: request.payload.root_name.clone(),
 725                    visible: request.payload.visible,
 726                    ..Default::default()
 727                },
 728            )?;
 729
 730            broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 731                self.peer
 732                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 733            });
 734        }
 735        self.update_user_contacts(host_user_id).await?;
 736        response.send(proto::Ack {})?;
 737        Ok(())
 738    }
 739
 740    async fn unregister_worktree(
 741        self: Arc<Server>,
 742        request: TypedEnvelope<proto::UnregisterWorktree>,
 743    ) -> Result<()> {
 744        let host_user_id;
 745        let project_id = request.payload.project_id;
 746        let worktree_id = request.payload.worktree_id;
 747        {
 748            let mut state = self.store_mut().await;
 749            let (_, guest_connection_ids) =
 750                state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 751            host_user_id = state.user_id_for_connection(request.sender_id)?;
 752            broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 753                self.peer.send(
 754                    conn_id,
 755                    proto::UnregisterWorktree {
 756                        project_id,
 757                        worktree_id,
 758                    },
 759                )
 760            });
 761        }
 762        self.update_user_contacts(host_user_id).await?;
 763        Ok(())
 764    }
 765
 766    async fn update_worktree(
 767        self: Arc<Server>,
 768        request: TypedEnvelope<proto::UpdateWorktree>,
 769        response: Response<proto::UpdateWorktree>,
 770    ) -> Result<()> {
 771        let connection_ids = self.store_mut().await.update_worktree(
 772            request.sender_id,
 773            request.payload.project_id,
 774            request.payload.worktree_id,
 775            &request.payload.removed_entries,
 776            &request.payload.updated_entries,
 777            request.payload.scan_id,
 778        )?;
 779
 780        broadcast(request.sender_id, connection_ids, |connection_id| {
 781            self.peer
 782                .forward_send(request.sender_id, connection_id, request.payload.clone())
 783        });
 784        response.send(proto::Ack {})?;
 785        Ok(())
 786    }
 787
 788    async fn update_diagnostic_summary(
 789        self: Arc<Server>,
 790        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 791    ) -> Result<()> {
 792        let summary = request
 793            .payload
 794            .summary
 795            .clone()
 796            .ok_or_else(|| anyhow!("invalid summary"))?;
 797        let receiver_ids = self.store_mut().await.update_diagnostic_summary(
 798            request.payload.project_id,
 799            request.payload.worktree_id,
 800            request.sender_id,
 801            summary,
 802        )?;
 803
 804        broadcast(request.sender_id, receiver_ids, |connection_id| {
 805            self.peer
 806                .forward_send(request.sender_id, connection_id, request.payload.clone())
 807        });
 808        Ok(())
 809    }
 810
 811    async fn start_language_server(
 812        self: Arc<Server>,
 813        request: TypedEnvelope<proto::StartLanguageServer>,
 814    ) -> Result<()> {
 815        let receiver_ids = self.store_mut().await.start_language_server(
 816            request.payload.project_id,
 817            request.sender_id,
 818            request
 819                .payload
 820                .server
 821                .clone()
 822                .ok_or_else(|| anyhow!("invalid language server"))?,
 823        )?;
 824        broadcast(request.sender_id, receiver_ids, |connection_id| {
 825            self.peer
 826                .forward_send(request.sender_id, connection_id, request.payload.clone())
 827        });
 828        Ok(())
 829    }
 830
 831    async fn update_language_server(
 832        self: Arc<Server>,
 833        request: TypedEnvelope<proto::UpdateLanguageServer>,
 834    ) -> Result<()> {
 835        let receiver_ids = self
 836            .store()
 837            .await
 838            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 839        broadcast(request.sender_id, receiver_ids, |connection_id| {
 840            self.peer
 841                .forward_send(request.sender_id, connection_id, request.payload.clone())
 842        });
 843        Ok(())
 844    }
 845
 846    async fn forward_project_request<T>(
 847        self: Arc<Server>,
 848        request: TypedEnvelope<T>,
 849        response: Response<T>,
 850    ) -> Result<()>
 851    where
 852        T: EntityMessage + RequestMessage,
 853    {
 854        let host_connection_id = self
 855            .store()
 856            .await
 857            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 858            .host_connection_id;
 859
 860        response.send(
 861            self.peer
 862                .forward_request(request.sender_id, host_connection_id, request.payload)
 863                .await?,
 864        )?;
 865        Ok(())
 866    }
 867
 868    async fn save_buffer(
 869        self: Arc<Server>,
 870        request: TypedEnvelope<proto::SaveBuffer>,
 871        response: Response<proto::SaveBuffer>,
 872    ) -> Result<()> {
 873        let host = self
 874            .store()
 875            .await
 876            .read_project(request.payload.project_id, request.sender_id)?
 877            .host_connection_id;
 878        let response_payload = self
 879            .peer
 880            .forward_request(request.sender_id, host, request.payload.clone())
 881            .await?;
 882
 883        let mut guests = self
 884            .store()
 885            .await
 886            .read_project(request.payload.project_id, request.sender_id)?
 887            .connection_ids();
 888        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 889        broadcast(host, guests, |conn_id| {
 890            self.peer
 891                .forward_send(host, conn_id, response_payload.clone())
 892        });
 893        response.send(response_payload)?;
 894        Ok(())
 895    }
 896
 897    async fn update_buffer(
 898        self: Arc<Server>,
 899        request: TypedEnvelope<proto::UpdateBuffer>,
 900        response: Response<proto::UpdateBuffer>,
 901    ) -> Result<()> {
 902        let receiver_ids = self
 903            .store()
 904            .await
 905            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 906        broadcast(request.sender_id, receiver_ids, |connection_id| {
 907            self.peer
 908                .forward_send(request.sender_id, connection_id, request.payload.clone())
 909        });
 910        response.send(proto::Ack {})?;
 911        Ok(())
 912    }
 913
 914    async fn update_buffer_file(
 915        self: Arc<Server>,
 916        request: TypedEnvelope<proto::UpdateBufferFile>,
 917    ) -> Result<()> {
 918        let receiver_ids = self
 919            .store()
 920            .await
 921            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 922        broadcast(request.sender_id, receiver_ids, |connection_id| {
 923            self.peer
 924                .forward_send(request.sender_id, connection_id, request.payload.clone())
 925        });
 926        Ok(())
 927    }
 928
 929    async fn buffer_reloaded(
 930        self: Arc<Server>,
 931        request: TypedEnvelope<proto::BufferReloaded>,
 932    ) -> Result<()> {
 933        let receiver_ids = self
 934            .store()
 935            .await
 936            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 937        broadcast(request.sender_id, receiver_ids, |connection_id| {
 938            self.peer
 939                .forward_send(request.sender_id, connection_id, request.payload.clone())
 940        });
 941        Ok(())
 942    }
 943
 944    async fn buffer_saved(
 945        self: Arc<Server>,
 946        request: TypedEnvelope<proto::BufferSaved>,
 947    ) -> Result<()> {
 948        let receiver_ids = self
 949            .store()
 950            .await
 951            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 952        broadcast(request.sender_id, receiver_ids, |connection_id| {
 953            self.peer
 954                .forward_send(request.sender_id, connection_id, request.payload.clone())
 955        });
 956        Ok(())
 957    }
 958
 959    async fn follow(
 960        self: Arc<Self>,
 961        request: TypedEnvelope<proto::Follow>,
 962        response: Response<proto::Follow>,
 963    ) -> Result<()> {
 964        let leader_id = ConnectionId(request.payload.leader_id);
 965        let follower_id = request.sender_id;
 966        if !self
 967            .store()
 968            .await
 969            .project_connection_ids(request.payload.project_id, follower_id)?
 970            .contains(&leader_id)
 971        {
 972            Err(anyhow!("no such peer"))?;
 973        }
 974        let mut response_payload = self
 975            .peer
 976            .forward_request(request.sender_id, leader_id, request.payload)
 977            .await?;
 978        response_payload
 979            .views
 980            .retain(|view| view.leader_id != Some(follower_id.0));
 981        response.send(response_payload)?;
 982        Ok(())
 983    }
 984
 985    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 986        let leader_id = ConnectionId(request.payload.leader_id);
 987        if !self
 988            .store()
 989            .await
 990            .project_connection_ids(request.payload.project_id, request.sender_id)?
 991            .contains(&leader_id)
 992        {
 993            Err(anyhow!("no such peer"))?;
 994        }
 995        self.peer
 996            .forward_send(request.sender_id, leader_id, request.payload)?;
 997        Ok(())
 998    }
 999
1000    async fn update_followers(
1001        self: Arc<Self>,
1002        request: TypedEnvelope<proto::UpdateFollowers>,
1003    ) -> Result<()> {
1004        let connection_ids = self
1005            .store()
1006            .await
1007            .project_connection_ids(request.payload.project_id, request.sender_id)?;
1008        let leader_id = request
1009            .payload
1010            .variant
1011            .as_ref()
1012            .and_then(|variant| match variant {
1013                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1014                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1015                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1016            });
1017        for follower_id in &request.payload.follower_ids {
1018            let follower_id = ConnectionId(*follower_id);
1019            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
1020                self.peer
1021                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
1022            }
1023        }
1024        Ok(())
1025    }
1026
1027    async fn get_channels(
1028        self: Arc<Server>,
1029        request: TypedEnvelope<proto::GetChannels>,
1030        response: Response<proto::GetChannels>,
1031    ) -> Result<()> {
1032        let user_id = self
1033            .store()
1034            .await
1035            .user_id_for_connection(request.sender_id)?;
1036        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
1037        response.send(proto::GetChannelsResponse {
1038            channels: channels
1039                .into_iter()
1040                .map(|chan| proto::Channel {
1041                    id: chan.id.to_proto(),
1042                    name: chan.name,
1043                })
1044                .collect(),
1045        })?;
1046        Ok(())
1047    }
1048
1049    async fn get_users(
1050        self: Arc<Server>,
1051        request: TypedEnvelope<proto::GetUsers>,
1052        response: Response<proto::GetUsers>,
1053    ) -> Result<()> {
1054        let user_ids = request
1055            .payload
1056            .user_ids
1057            .into_iter()
1058            .map(UserId::from_proto)
1059            .collect();
1060        let users = self
1061            .app_state
1062            .db
1063            .get_users_by_ids(user_ids)
1064            .await?
1065            .into_iter()
1066            .map(|user| proto::User {
1067                id: user.id.to_proto(),
1068                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1069                github_login: user.github_login,
1070            })
1071            .collect();
1072        response.send(proto::UsersResponse { users })?;
1073        Ok(())
1074    }
1075
1076    async fn fuzzy_search_users(
1077        self: Arc<Server>,
1078        request: TypedEnvelope<proto::FuzzySearchUsers>,
1079        response: Response<proto::FuzzySearchUsers>,
1080    ) -> Result<()> {
1081        let user_id = self
1082            .store()
1083            .await
1084            .user_id_for_connection(request.sender_id)?;
1085        let query = request.payload.query;
1086        let db = &self.app_state.db;
1087        let users = match query.len() {
1088            0 => vec![],
1089            1 | 2 => db
1090                .get_user_by_github_login(&query)
1091                .await?
1092                .into_iter()
1093                .collect(),
1094            _ => db.fuzzy_search_users(&query, 10).await?,
1095        };
1096        let users = users
1097            .into_iter()
1098            .filter(|user| user.id != user_id)
1099            .map(|user| proto::User {
1100                id: user.id.to_proto(),
1101                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1102                github_login: user.github_login,
1103            })
1104            .collect();
1105        response.send(proto::UsersResponse { users })?;
1106        Ok(())
1107    }
1108
1109    async fn request_contact(
1110        self: Arc<Server>,
1111        request: TypedEnvelope<proto::RequestContact>,
1112        response: Response<proto::RequestContact>,
1113    ) -> Result<()> {
1114        let requester_id = self
1115            .store()
1116            .await
1117            .user_id_for_connection(request.sender_id)?;
1118        let responder_id = UserId::from_proto(request.payload.responder_id);
1119        if requester_id == responder_id {
1120            return Err(anyhow!("cannot add yourself as a contact"))?;
1121        }
1122
1123        self.app_state
1124            .db
1125            .send_contact_request(requester_id, responder_id)
1126            .await?;
1127
1128        // Update outgoing contact requests of requester
1129        let mut update = proto::UpdateContacts::default();
1130        update.outgoing_requests.push(responder_id.to_proto());
1131        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1132            self.peer.send(connection_id, update.clone())?;
1133        }
1134
1135        // Update incoming contact requests of responder
1136        let mut update = proto::UpdateContacts::default();
1137        update
1138            .incoming_requests
1139            .push(proto::IncomingContactRequest {
1140                requester_id: requester_id.to_proto(),
1141                should_notify: true,
1142            });
1143        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1144            self.peer.send(connection_id, update.clone())?;
1145        }
1146
1147        response.send(proto::Ack {})?;
1148        Ok(())
1149    }
1150
1151    async fn respond_to_contact_request(
1152        self: Arc<Server>,
1153        request: TypedEnvelope<proto::RespondToContactRequest>,
1154        response: Response<proto::RespondToContactRequest>,
1155    ) -> Result<()> {
1156        let responder_id = self
1157            .store()
1158            .await
1159            .user_id_for_connection(request.sender_id)?;
1160        let requester_id = UserId::from_proto(request.payload.requester_id);
1161        if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1162            self.app_state
1163                .db
1164                .dismiss_contact_notification(responder_id, requester_id)
1165                .await?;
1166        } else {
1167            let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1168            self.app_state
1169                .db
1170                .respond_to_contact_request(responder_id, requester_id, accept)
1171                .await?;
1172
1173            let store = self.store().await;
1174            // Update responder with new contact
1175            let mut update = proto::UpdateContacts::default();
1176            if accept {
1177                update
1178                    .contacts
1179                    .push(store.contact_for_user(requester_id, false));
1180            }
1181            update
1182                .remove_incoming_requests
1183                .push(requester_id.to_proto());
1184            for connection_id in store.connection_ids_for_user(responder_id) {
1185                self.peer.send(connection_id, update.clone())?;
1186            }
1187
1188            // Update requester with new contact
1189            let mut update = proto::UpdateContacts::default();
1190            if accept {
1191                update
1192                    .contacts
1193                    .push(store.contact_for_user(responder_id, true));
1194            }
1195            update
1196                .remove_outgoing_requests
1197                .push(responder_id.to_proto());
1198            for connection_id in store.connection_ids_for_user(requester_id) {
1199                self.peer.send(connection_id, update.clone())?;
1200            }
1201        }
1202
1203        response.send(proto::Ack {})?;
1204        Ok(())
1205    }
1206
1207    async fn remove_contact(
1208        self: Arc<Server>,
1209        request: TypedEnvelope<proto::RemoveContact>,
1210        response: Response<proto::RemoveContact>,
1211    ) -> Result<()> {
1212        let requester_id = self
1213            .store()
1214            .await
1215            .user_id_for_connection(request.sender_id)?;
1216        let responder_id = UserId::from_proto(request.payload.user_id);
1217        self.app_state
1218            .db
1219            .remove_contact(requester_id, responder_id)
1220            .await?;
1221
1222        // Update outgoing contact requests of requester
1223        let mut update = proto::UpdateContacts::default();
1224        update
1225            .remove_outgoing_requests
1226            .push(responder_id.to_proto());
1227        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1228            self.peer.send(connection_id, update.clone())?;
1229        }
1230
1231        // Update incoming contact requests of responder
1232        let mut update = proto::UpdateContacts::default();
1233        update
1234            .remove_incoming_requests
1235            .push(requester_id.to_proto());
1236        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1237            self.peer.send(connection_id, update.clone())?;
1238        }
1239
1240        response.send(proto::Ack {})?;
1241        Ok(())
1242    }
1243
1244    async fn join_channel(
1245        self: Arc<Self>,
1246        request: TypedEnvelope<proto::JoinChannel>,
1247        response: Response<proto::JoinChannel>,
1248    ) -> Result<()> {
1249        let user_id = self
1250            .store()
1251            .await
1252            .user_id_for_connection(request.sender_id)?;
1253        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1254        if !self
1255            .app_state
1256            .db
1257            .can_user_access_channel(user_id, channel_id)
1258            .await?
1259        {
1260            Err(anyhow!("access denied"))?;
1261        }
1262
1263        self.store_mut()
1264            .await
1265            .join_channel(request.sender_id, channel_id);
1266        let messages = self
1267            .app_state
1268            .db
1269            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1270            .await?
1271            .into_iter()
1272            .map(|msg| proto::ChannelMessage {
1273                id: msg.id.to_proto(),
1274                body: msg.body,
1275                timestamp: msg.sent_at.unix_timestamp() as u64,
1276                sender_id: msg.sender_id.to_proto(),
1277                nonce: Some(msg.nonce.as_u128().into()),
1278            })
1279            .collect::<Vec<_>>();
1280        response.send(proto::JoinChannelResponse {
1281            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1282            messages,
1283        })?;
1284        Ok(())
1285    }
1286
1287    async fn leave_channel(
1288        self: Arc<Self>,
1289        request: TypedEnvelope<proto::LeaveChannel>,
1290    ) -> Result<()> {
1291        let user_id = self
1292            .store()
1293            .await
1294            .user_id_for_connection(request.sender_id)?;
1295        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1296        if !self
1297            .app_state
1298            .db
1299            .can_user_access_channel(user_id, channel_id)
1300            .await?
1301        {
1302            Err(anyhow!("access denied"))?;
1303        }
1304
1305        self.store_mut()
1306            .await
1307            .leave_channel(request.sender_id, channel_id);
1308
1309        Ok(())
1310    }
1311
1312    async fn send_channel_message(
1313        self: Arc<Self>,
1314        request: TypedEnvelope<proto::SendChannelMessage>,
1315        response: Response<proto::SendChannelMessage>,
1316    ) -> Result<()> {
1317        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1318        let user_id;
1319        let connection_ids;
1320        {
1321            let state = self.store().await;
1322            user_id = state.user_id_for_connection(request.sender_id)?;
1323            connection_ids = state.channel_connection_ids(channel_id)?;
1324        }
1325
1326        // Validate the message body.
1327        let body = request.payload.body.trim().to_string();
1328        if body.len() > MAX_MESSAGE_LEN {
1329            return Err(anyhow!("message is too long"))?;
1330        }
1331        if body.is_empty() {
1332            return Err(anyhow!("message can't be blank"))?;
1333        }
1334
1335        let timestamp = OffsetDateTime::now_utc();
1336        let nonce = request
1337            .payload
1338            .nonce
1339            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1340
1341        let message_id = self
1342            .app_state
1343            .db
1344            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1345            .await?
1346            .to_proto();
1347        let message = proto::ChannelMessage {
1348            sender_id: user_id.to_proto(),
1349            id: message_id,
1350            body,
1351            timestamp: timestamp.unix_timestamp() as u64,
1352            nonce: Some(nonce),
1353        };
1354        broadcast(request.sender_id, connection_ids, |conn_id| {
1355            self.peer.send(
1356                conn_id,
1357                proto::ChannelMessageSent {
1358                    channel_id: channel_id.to_proto(),
1359                    message: Some(message.clone()),
1360                },
1361            )
1362        });
1363        response.send(proto::SendChannelMessageResponse {
1364            message: Some(message),
1365        })?;
1366        Ok(())
1367    }
1368
1369    async fn get_channel_messages(
1370        self: Arc<Self>,
1371        request: TypedEnvelope<proto::GetChannelMessages>,
1372        response: Response<proto::GetChannelMessages>,
1373    ) -> Result<()> {
1374        let user_id = self
1375            .store()
1376            .await
1377            .user_id_for_connection(request.sender_id)?;
1378        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1379        if !self
1380            .app_state
1381            .db
1382            .can_user_access_channel(user_id, channel_id)
1383            .await?
1384        {
1385            Err(anyhow!("access denied"))?;
1386        }
1387
1388        let messages = self
1389            .app_state
1390            .db
1391            .get_channel_messages(
1392                channel_id,
1393                MESSAGE_COUNT_PER_PAGE,
1394                Some(MessageId::from_proto(request.payload.before_message_id)),
1395            )
1396            .await?
1397            .into_iter()
1398            .map(|msg| proto::ChannelMessage {
1399                id: msg.id.to_proto(),
1400                body: msg.body,
1401                timestamp: msg.sent_at.unix_timestamp() as u64,
1402                sender_id: msg.sender_id.to_proto(),
1403                nonce: Some(msg.nonce.as_u128().into()),
1404            })
1405            .collect::<Vec<_>>();
1406        response.send(proto::GetChannelMessagesResponse {
1407            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1408            messages,
1409        })?;
1410        Ok(())
1411    }
1412
1413    async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1414        #[cfg(test)]
1415        tokio::task::yield_now().await;
1416        let guard = self.store.read().await;
1417        #[cfg(test)]
1418        tokio::task::yield_now().await;
1419        StoreReadGuard {
1420            guard,
1421            _not_send: PhantomData,
1422        }
1423    }
1424
1425    async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1426        #[cfg(test)]
1427        tokio::task::yield_now().await;
1428        let guard = self.store.write().await;
1429        #[cfg(test)]
1430        tokio::task::yield_now().await;
1431        StoreWriteGuard {
1432            guard,
1433            _not_send: PhantomData,
1434        }
1435    }
1436}
1437
1438impl<'a> Deref for StoreReadGuard<'a> {
1439    type Target = Store;
1440
1441    fn deref(&self) -> &Self::Target {
1442        &*self.guard
1443    }
1444}
1445
1446impl<'a> Deref for StoreWriteGuard<'a> {
1447    type Target = Store;
1448
1449    fn deref(&self) -> &Self::Target {
1450        &*self.guard
1451    }
1452}
1453
1454impl<'a> DerefMut for StoreWriteGuard<'a> {
1455    fn deref_mut(&mut self) -> &mut Self::Target {
1456        &mut *self.guard
1457    }
1458}
1459
1460impl<'a> Drop for StoreWriteGuard<'a> {
1461    fn drop(&mut self) {
1462        #[cfg(test)]
1463        self.check_invariants();
1464
1465        let metrics = self.metrics();
1466        tracing::info!(
1467            connections = metrics.connections,
1468            registered_projects = metrics.registered_projects,
1469            shared_projects = metrics.shared_projects,
1470            "metrics"
1471        );
1472    }
1473}
1474
1475impl Executor for RealExecutor {
1476    type Sleep = Sleep;
1477
1478    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1479        tokio::task::spawn(future);
1480    }
1481
1482    fn sleep(&self, duration: Duration) -> Self::Sleep {
1483        tokio::time::sleep(duration)
1484    }
1485}
1486
1487fn broadcast<F>(
1488    sender_id: ConnectionId,
1489    receiver_ids: impl IntoIterator<Item = ConnectionId>,
1490    mut f: F,
1491) where
1492    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1493{
1494    for receiver_id in receiver_ids {
1495        if receiver_id != sender_id {
1496            f(receiver_id).trace_err();
1497        }
1498    }
1499}
1500
1501lazy_static! {
1502    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1503}
1504
1505pub struct ProtocolVersion(u32);
1506
1507impl Header for ProtocolVersion {
1508    fn name() -> &'static HeaderName {
1509        &ZED_PROTOCOL_VERSION
1510    }
1511
1512    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1513    where
1514        Self: Sized,
1515        I: Iterator<Item = &'i axum::http::HeaderValue>,
1516    {
1517        let version = values
1518            .next()
1519            .ok_or_else(|| axum::headers::Error::invalid())?
1520            .to_str()
1521            .map_err(|_| axum::headers::Error::invalid())?
1522            .parse()
1523            .map_err(|_| axum::headers::Error::invalid())?;
1524        Ok(Self(version))
1525    }
1526
1527    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1528        values.extend([self.0.to_string().parse().unwrap()]);
1529    }
1530}
1531
1532pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1533    let server = Server::new(app_state.clone(), None);
1534    Router::new()
1535        .route("/rpc", get(handle_websocket_request))
1536        .layer(
1537            ServiceBuilder::new()
1538                .layer(Extension(app_state))
1539                .layer(middleware::from_fn(auth::validate_header))
1540                .layer(Extension(server)),
1541        )
1542}
1543
1544pub async fn handle_websocket_request(
1545    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1546    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1547    Extension(server): Extension<Arc<Server>>,
1548    Extension(user): Extension<User>,
1549    ws: WebSocketUpgrade,
1550) -> axum::response::Response {
1551    if protocol_version != rpc::PROTOCOL_VERSION {
1552        return (
1553            StatusCode::UPGRADE_REQUIRED,
1554            "client must be upgraded".to_string(),
1555        )
1556            .into_response();
1557    }
1558    let socket_address = socket_address.to_string();
1559    ws.on_upgrade(move |socket| {
1560        use util::ResultExt;
1561        let socket = socket
1562            .map_ok(to_tungstenite_message)
1563            .err_into()
1564            .with(|message| async move { Ok(to_axum_message(message)) });
1565        let connection = Connection::new(Box::pin(socket));
1566        async move {
1567            server
1568                .handle_connection(connection, socket_address, user, None, RealExecutor)
1569                .await
1570                .log_err();
1571        }
1572    })
1573}
1574
1575fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1576    match message {
1577        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1578        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1579        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1580        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1581        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1582            code: frame.code.into(),
1583            reason: frame.reason,
1584        })),
1585    }
1586}
1587
1588fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1589    match message {
1590        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1591        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1592        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1593        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1594        AxumMessage::Close(frame) => {
1595            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1596                code: frame.code.into(),
1597                reason: frame.reason,
1598            }))
1599        }
1600    }
1601}
1602
1603pub trait ResultExt {
1604    type Ok;
1605
1606    fn trace_err(self) -> Option<Self::Ok>;
1607}
1608
1609impl<T, E> ResultExt for Result<T, E>
1610where
1611    E: std::fmt::Debug,
1612{
1613    type Ok = T;
1614
1615    fn trace_err(self) -> Option<T> {
1616        match self {
1617            Ok(value) => Some(value),
1618            Err(error) => {
1619                tracing::error!("{:?}", error);
1620                None
1621            }
1622        }
1623    }
1624}
1625
1626#[cfg(test)]
1627mod tests {
1628    use super::*;
1629    use crate::{
1630        db::{tests::TestDb, UserId},
1631        AppState,
1632    };
1633    use ::rpc::Peer;
1634    use client::{
1635        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1636        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1637    };
1638    use collections::{BTreeMap, HashSet};
1639    use editor::{
1640        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1641        ToOffset, ToggleCodeActions, Undo,
1642    };
1643    use gpui::{
1644        executor::{self, Deterministic},
1645        geometry::vector::vec2f,
1646        ModelHandle, Task, TestAppContext, ViewHandle,
1647    };
1648    use language::{
1649        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1650        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1651    };
1652    use lsp::{self, FakeLanguageServer};
1653    use parking_lot::Mutex;
1654    use project::{
1655        fs::{FakeFs, Fs as _},
1656        search::SearchQuery,
1657        worktree::WorktreeHandle,
1658        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1659    };
1660    use rand::prelude::*;
1661    use rpc::PeerId;
1662    use serde_json::json;
1663    use settings::Settings;
1664    use sqlx::types::time::OffsetDateTime;
1665    use std::{
1666        cell::RefCell,
1667        env,
1668        ops::Deref,
1669        path::{Path, PathBuf},
1670        rc::Rc,
1671        sync::{
1672            atomic::{AtomicBool, Ordering::SeqCst},
1673            Arc,
1674        },
1675        time::Duration,
1676    };
1677    use theme::ThemeRegistry;
1678    use workspace::{Item, SplitDirection, ToggleFollow, Workspace};
1679
1680    #[cfg(test)]
1681    #[ctor::ctor]
1682    fn init_logger() {
1683        if std::env::var("RUST_LOG").is_ok() {
1684            env_logger::init();
1685        }
1686    }
1687
1688    #[gpui::test(iterations = 10)]
1689    async fn test_share_project(
1690        deterministic: Arc<Deterministic>,
1691        cx_a: &mut TestAppContext,
1692        cx_b: &mut TestAppContext,
1693        cx_b2: &mut TestAppContext,
1694    ) {
1695        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1696        let lang_registry = Arc::new(LanguageRegistry::test());
1697        let fs = FakeFs::new(cx_a.background());
1698        cx_a.foreground().forbid_parking();
1699
1700        // Connect to a server as 2 clients.
1701        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1702        let client_a = server.create_client(cx_a, "user_a").await;
1703        let mut client_b = server.create_client(cx_b, "user_b").await;
1704        server
1705            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1706            .await;
1707
1708        // Share a project as client A
1709        fs.insert_tree(
1710            "/a",
1711            json!({
1712                "a.txt": "a-contents",
1713                "b.txt": "b-contents",
1714            }),
1715        )
1716        .await;
1717        let project_a = cx_a.update(|cx| {
1718            Project::local(
1719                client_a.clone(),
1720                client_a.user_store.clone(),
1721                lang_registry.clone(),
1722                fs.clone(),
1723                cx,
1724            )
1725        });
1726        let project_id = project_a
1727            .read_with(cx_a, |project, _| project.next_remote_id())
1728            .await;
1729        let (worktree_a, _) = project_a
1730            .update(cx_a, |p, cx| {
1731                p.find_or_create_local_worktree("/a", true, cx)
1732            })
1733            .await
1734            .unwrap();
1735        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1736        worktree_a
1737            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1738            .await;
1739
1740        // Join that project as client B
1741        let client_b_peer_id = client_b.peer_id;
1742        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1743
1744        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1745            assert_eq!(
1746                project
1747                    .collaborators()
1748                    .get(&client_a.peer_id)
1749                    .unwrap()
1750                    .user
1751                    .github_login,
1752                "user_a"
1753            );
1754            project.replica_id()
1755        });
1756        project_a
1757            .condition(&cx_a, |tree, _| {
1758                tree.collaborators()
1759                    .get(&client_b_peer_id)
1760                    .map_or(false, |collaborator| {
1761                        collaborator.replica_id == replica_id_b
1762                            && collaborator.user.github_login == "user_b"
1763                    })
1764            })
1765            .await;
1766
1767        // Open the same file as client B and client A.
1768        let buffer_b = project_b
1769            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1770            .await
1771            .unwrap();
1772        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1773        project_a.read_with(cx_a, |project, cx| {
1774            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1775        });
1776        let buffer_a = project_a
1777            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1778            .await
1779            .unwrap();
1780
1781        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1782
1783        // TODO
1784        // // Create a selection set as client B and see that selection set as client A.
1785        // buffer_a
1786        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1787        //     .await;
1788
1789        // Edit the buffer as client B and see that edit as client A.
1790        editor_b.update(cx_b, |editor, cx| {
1791            editor.handle_input(&Input("ok, ".into()), cx)
1792        });
1793        buffer_a
1794            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1795            .await;
1796
1797        // TODO
1798        // // Remove the selection set as client B, see those selections disappear as client A.
1799        cx_b.update(move |_| drop(editor_b));
1800        // buffer_a
1801        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1802        //     .await;
1803
1804        // Client B can join again on a different window because they are already a participant.
1805        let client_b2 = server.create_client(cx_b2, "user_b").await;
1806        let project_b2 = Project::remote(
1807            project_id,
1808            client_b2.client.clone(),
1809            client_b2.user_store.clone(),
1810            lang_registry.clone(),
1811            FakeFs::new(cx_b2.background()),
1812            &mut cx_b2.to_async(),
1813        )
1814        .await
1815        .unwrap();
1816        deterministic.run_until_parked();
1817        project_a.read_with(cx_a, |project, _| {
1818            assert_eq!(project.collaborators().len(), 2);
1819        });
1820        project_b.read_with(cx_b, |project, _| {
1821            assert_eq!(project.collaborators().len(), 2);
1822        });
1823        project_b2.read_with(cx_b2, |project, _| {
1824            assert_eq!(project.collaborators().len(), 2);
1825        });
1826
1827        // Dropping client B's first project removes only that from client A's collaborators.
1828        cx_b.update(move |_| {
1829            drop(client_b.project.take());
1830            drop(project_b);
1831        });
1832        deterministic.run_until_parked();
1833        project_a.read_with(cx_a, |project, _| {
1834            assert_eq!(project.collaborators().len(), 1);
1835        });
1836        project_b2.read_with(cx_b2, |project, _| {
1837            assert_eq!(project.collaborators().len(), 1);
1838        });
1839    }
1840
1841    #[gpui::test(iterations = 10)]
1842    async fn test_unshare_project(
1843        deterministic: Arc<Deterministic>,
1844        cx_a: &mut TestAppContext,
1845        cx_b: &mut TestAppContext,
1846    ) {
1847        let lang_registry = Arc::new(LanguageRegistry::test());
1848        let fs = FakeFs::new(cx_a.background());
1849        cx_a.foreground().forbid_parking();
1850
1851        // Connect to a server as 2 clients.
1852        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1853        let client_a = server.create_client(cx_a, "user_a").await;
1854        let mut client_b = server.create_client(cx_b, "user_b").await;
1855        server
1856            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1857            .await;
1858
1859        // Share a project as client A
1860        fs.insert_tree(
1861            "/a",
1862            json!({
1863                "a.txt": "a-contents",
1864                "b.txt": "b-contents",
1865            }),
1866        )
1867        .await;
1868        let project_a = cx_a.update(|cx| {
1869            Project::local(
1870                client_a.clone(),
1871                client_a.user_store.clone(),
1872                lang_registry.clone(),
1873                fs.clone(),
1874                cx,
1875            )
1876        });
1877        let (worktree_a, _) = project_a
1878            .update(cx_a, |p, cx| {
1879                p.find_or_create_local_worktree("/a", true, cx)
1880            })
1881            .await
1882            .unwrap();
1883        worktree_a
1884            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1885            .await;
1886        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1887
1888        // Join that project as client B
1889        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1890        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1891        project_b
1892            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1893            .await
1894            .unwrap();
1895
1896        // When client B leaves the project, it gets automatically unshared.
1897        cx_b.update(|_| {
1898            drop(client_b.project.take());
1899            drop(project_b);
1900        });
1901        deterministic.run_until_parked();
1902        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1903
1904        // When client B joins again, the project gets re-shared.
1905        let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1906        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1907        project_b2
1908            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1909            .await
1910            .unwrap();
1911
1912        // When client A (the host) leaves, the project gets unshared and guests are notified.
1913        cx_a.update(|_| drop(project_a));
1914        deterministic.run_until_parked();
1915        project_b2.read_with(cx_b, |project, _| {
1916            assert!(project.is_read_only());
1917            assert!(project.collaborators().is_empty());
1918        });
1919    }
1920
1921    #[gpui::test(iterations = 10)]
1922    async fn test_host_disconnect(
1923        deterministic: Arc<Deterministic>,
1924        cx_a: &mut TestAppContext,
1925        cx_b: &mut TestAppContext,
1926        cx_c: &mut TestAppContext,
1927    ) {
1928        let lang_registry = Arc::new(LanguageRegistry::test());
1929        let fs = FakeFs::new(cx_a.background());
1930        cx_a.foreground().forbid_parking();
1931
1932        // Connect to a server as 3 clients.
1933        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1934        let client_a = server.create_client(cx_a, "user_a").await;
1935        let mut client_b = server.create_client(cx_b, "user_b").await;
1936        let client_c = server.create_client(cx_c, "user_c").await;
1937        server
1938            .make_contacts(vec![
1939                (&client_a, cx_a),
1940                (&client_b, cx_b),
1941                (&client_c, cx_c),
1942            ])
1943            .await;
1944
1945        // Share a project as client A
1946        fs.insert_tree(
1947            "/a",
1948            json!({
1949                "a.txt": "a-contents",
1950                "b.txt": "b-contents",
1951            }),
1952        )
1953        .await;
1954        let project_a = cx_a.update(|cx| {
1955            Project::local(
1956                client_a.clone(),
1957                client_a.user_store.clone(),
1958                lang_registry.clone(),
1959                fs.clone(),
1960                cx,
1961            )
1962        });
1963        let project_id = project_a
1964            .read_with(cx_a, |project, _| project.next_remote_id())
1965            .await;
1966        let (worktree_a, _) = project_a
1967            .update(cx_a, |p, cx| {
1968                p.find_or_create_local_worktree("/a", true, cx)
1969            })
1970            .await
1971            .unwrap();
1972        worktree_a
1973            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1974            .await;
1975        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1976
1977        // Join that project as client B
1978        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1979        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1980        project_b
1981            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1982            .await
1983            .unwrap();
1984
1985        // Request to join that project as client C
1986        let project_c = cx_c.spawn(|mut cx| {
1987            let client = client_c.client.clone();
1988            let user_store = client_c.user_store.clone();
1989            let lang_registry = lang_registry.clone();
1990            async move {
1991                Project::remote(
1992                    project_id,
1993                    client,
1994                    user_store,
1995                    lang_registry.clone(),
1996                    FakeFs::new(cx.background()),
1997                    &mut cx,
1998                )
1999                .await
2000            }
2001        });
2002        deterministic.run_until_parked();
2003
2004        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
2005        server.disconnect_client(client_a.current_user_id(cx_a));
2006        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2007        project_a
2008            .condition(cx_a, |project, _| project.collaborators().is_empty())
2009            .await;
2010        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
2011        project_b
2012            .condition(cx_b, |project, _| project.is_read_only())
2013            .await;
2014        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
2015        cx_b.update(|_| {
2016            drop(project_b);
2017        });
2018        assert!(matches!(
2019            project_c.await.unwrap_err(),
2020            project::JoinProjectError::HostWentOffline
2021        ));
2022
2023        // Ensure guests can still join.
2024        let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2025        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
2026        project_b2
2027            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2028            .await
2029            .unwrap();
2030    }
2031
2032    #[gpui::test(iterations = 10)]
2033    async fn test_decline_join_request(
2034        deterministic: Arc<Deterministic>,
2035        cx_a: &mut TestAppContext,
2036        cx_b: &mut TestAppContext,
2037    ) {
2038        let lang_registry = Arc::new(LanguageRegistry::test());
2039        let fs = FakeFs::new(cx_a.background());
2040        cx_a.foreground().forbid_parking();
2041
2042        // Connect to a server as 2 clients.
2043        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2044        let client_a = server.create_client(cx_a, "user_a").await;
2045        let client_b = server.create_client(cx_b, "user_b").await;
2046        server
2047            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2048            .await;
2049
2050        // Share a project as client A
2051        fs.insert_tree("/a", json!({})).await;
2052        let project_a = cx_a.update(|cx| {
2053            Project::local(
2054                client_a.clone(),
2055                client_a.user_store.clone(),
2056                lang_registry.clone(),
2057                fs.clone(),
2058                cx,
2059            )
2060        });
2061        let project_id = project_a
2062            .read_with(cx_a, |project, _| project.next_remote_id())
2063            .await;
2064        let (worktree_a, _) = project_a
2065            .update(cx_a, |p, cx| {
2066                p.find_or_create_local_worktree("/a", true, cx)
2067            })
2068            .await
2069            .unwrap();
2070        worktree_a
2071            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2072            .await;
2073
2074        // Request to join that project as client B
2075        let project_b = cx_b.spawn(|mut cx| {
2076            let client = client_b.client.clone();
2077            let user_store = client_b.user_store.clone();
2078            let lang_registry = lang_registry.clone();
2079            async move {
2080                Project::remote(
2081                    project_id,
2082                    client,
2083                    user_store,
2084                    lang_registry.clone(),
2085                    FakeFs::new(cx.background()),
2086                    &mut cx,
2087                )
2088                .await
2089            }
2090        });
2091        deterministic.run_until_parked();
2092        project_a.update(cx_a, |project, cx| {
2093            project.respond_to_join_request(client_b.user_id().unwrap(), false, cx)
2094        });
2095        assert!(matches!(
2096            project_b.await.unwrap_err(),
2097            project::JoinProjectError::HostDeclined
2098        ));
2099
2100        // Request to join the project again as client B
2101        let project_b = cx_b.spawn(|mut cx| {
2102            let client = client_b.client.clone();
2103            let user_store = client_b.user_store.clone();
2104            let lang_registry = lang_registry.clone();
2105            async move {
2106                Project::remote(
2107                    project_id,
2108                    client,
2109                    user_store,
2110                    lang_registry.clone(),
2111                    FakeFs::new(cx.background()),
2112                    &mut cx,
2113                )
2114                .await
2115            }
2116        });
2117
2118        // Close the project on the host
2119        deterministic.run_until_parked();
2120        cx_a.update(|_| drop(project_a));
2121        deterministic.run_until_parked();
2122        assert!(matches!(
2123            project_b.await.unwrap_err(),
2124            project::JoinProjectError::HostClosedProject
2125        ));
2126    }
2127
2128    #[gpui::test(iterations = 10)]
2129    async fn test_cancel_join_request(
2130        deterministic: Arc<Deterministic>,
2131        cx_a: &mut TestAppContext,
2132        cx_b: &mut TestAppContext,
2133    ) {
2134        let lang_registry = Arc::new(LanguageRegistry::test());
2135        let fs = FakeFs::new(cx_a.background());
2136        cx_a.foreground().forbid_parking();
2137
2138        // Connect to a server as 2 clients.
2139        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2140        let client_a = server.create_client(cx_a, "user_a").await;
2141        let client_b = server.create_client(cx_b, "user_b").await;
2142        server
2143            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2144            .await;
2145
2146        // Share a project as client A
2147        fs.insert_tree("/a", json!({})).await;
2148        let project_a = cx_a.update(|cx| {
2149            Project::local(
2150                client_a.clone(),
2151                client_a.user_store.clone(),
2152                lang_registry.clone(),
2153                fs.clone(),
2154                cx,
2155            )
2156        });
2157        let project_id = project_a
2158            .read_with(cx_a, |project, _| project.next_remote_id())
2159            .await;
2160
2161        let project_a_events = Rc::new(RefCell::new(Vec::new()));
2162        let user_b = client_a
2163            .user_store
2164            .update(cx_a, |store, cx| {
2165                store.fetch_user(client_b.user_id().unwrap(), cx)
2166            })
2167            .await
2168            .unwrap();
2169        project_a.update(cx_a, {
2170            let project_a_events = project_a_events.clone();
2171            move |_, cx| {
2172                cx.subscribe(&cx.handle(), move |_, _, event, _| {
2173                    project_a_events.borrow_mut().push(event.clone());
2174                })
2175                .detach();
2176            }
2177        });
2178
2179        let (worktree_a, _) = project_a
2180            .update(cx_a, |p, cx| {
2181                p.find_or_create_local_worktree("/a", true, cx)
2182            })
2183            .await
2184            .unwrap();
2185        worktree_a
2186            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2187            .await;
2188
2189        // Request to join that project as client B
2190        let project_b = cx_b.spawn(|mut cx| {
2191            let client = client_b.client.clone();
2192            let user_store = client_b.user_store.clone();
2193            let lang_registry = lang_registry.clone();
2194            async move {
2195                Project::remote(
2196                    project_id,
2197                    client,
2198                    user_store,
2199                    lang_registry.clone(),
2200                    FakeFs::new(cx.background()),
2201                    &mut cx,
2202                )
2203                .await
2204            }
2205        });
2206        deterministic.run_until_parked();
2207        assert_eq!(
2208            &*project_a_events.borrow(),
2209            &[project::Event::ContactRequestedJoin(user_b.clone())]
2210        );
2211        project_a_events.borrow_mut().clear();
2212
2213        // Cancel the join request by leaving the project
2214        client_b
2215            .client
2216            .send(proto::LeaveProject { project_id })
2217            .unwrap();
2218        drop(project_b);
2219
2220        deterministic.run_until_parked();
2221        assert_eq!(
2222            &*project_a_events.borrow(),
2223            &[project::Event::ContactCancelledJoinRequest(user_b.clone())]
2224        );
2225    }
2226
2227    #[gpui::test(iterations = 10)]
2228    async fn test_propagate_saves_and_fs_changes(
2229        cx_a: &mut TestAppContext,
2230        cx_b: &mut TestAppContext,
2231        cx_c: &mut TestAppContext,
2232    ) {
2233        let lang_registry = Arc::new(LanguageRegistry::test());
2234        let fs = FakeFs::new(cx_a.background());
2235        cx_a.foreground().forbid_parking();
2236
2237        // Connect to a server as 3 clients.
2238        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2239        let client_a = server.create_client(cx_a, "user_a").await;
2240        let mut client_b = server.create_client(cx_b, "user_b").await;
2241        let mut client_c = server.create_client(cx_c, "user_c").await;
2242        server
2243            .make_contacts(vec![
2244                (&client_a, cx_a),
2245                (&client_b, cx_b),
2246                (&client_c, cx_c),
2247            ])
2248            .await;
2249
2250        // Share a worktree as client A.
2251        fs.insert_tree(
2252            "/a",
2253            json!({
2254                "file1": "",
2255                "file2": ""
2256            }),
2257        )
2258        .await;
2259        let project_a = cx_a.update(|cx| {
2260            Project::local(
2261                client_a.clone(),
2262                client_a.user_store.clone(),
2263                lang_registry.clone(),
2264                fs.clone(),
2265                cx,
2266            )
2267        });
2268        let (worktree_a, _) = project_a
2269            .update(cx_a, |p, cx| {
2270                p.find_or_create_local_worktree("/a", true, cx)
2271            })
2272            .await
2273            .unwrap();
2274        worktree_a
2275            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2276            .await;
2277        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2278
2279        // Join that worktree as clients B and C.
2280        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2281        let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
2282        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2283        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
2284
2285        // Open and edit a buffer as both guests B and C.
2286        let buffer_b = project_b
2287            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2288            .await
2289            .unwrap();
2290        let buffer_c = project_c
2291            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2292            .await
2293            .unwrap();
2294        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
2295        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
2296
2297        // Open and edit that buffer as the host.
2298        let buffer_a = project_a
2299            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2300            .await
2301            .unwrap();
2302
2303        buffer_a
2304            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
2305            .await;
2306        buffer_a.update(cx_a, |buf, cx| {
2307            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
2308        });
2309
2310        // Wait for edits to propagate
2311        buffer_a
2312            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2313            .await;
2314        buffer_b
2315            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2316            .await;
2317        buffer_c
2318            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2319            .await;
2320
2321        // Edit the buffer as the host and concurrently save as guest B.
2322        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2323        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2324        save_b.await.unwrap();
2325        assert_eq!(
2326            fs.load("/a/file1".as_ref()).await.unwrap(),
2327            "hi-a, i-am-c, i-am-b, i-am-a"
2328        );
2329        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2330        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2331        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2332
2333        worktree_a.flush_fs_events(cx_a).await;
2334
2335        // Make changes on host's file system, see those changes on guest worktrees.
2336        fs.rename(
2337            "/a/file1".as_ref(),
2338            "/a/file1-renamed".as_ref(),
2339            Default::default(),
2340        )
2341        .await
2342        .unwrap();
2343
2344        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2345            .await
2346            .unwrap();
2347        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2348
2349        worktree_a
2350            .condition(&cx_a, |tree, _| {
2351                tree.paths()
2352                    .map(|p| p.to_string_lossy())
2353                    .collect::<Vec<_>>()
2354                    == ["file1-renamed", "file3", "file4"]
2355            })
2356            .await;
2357        worktree_b
2358            .condition(&cx_b, |tree, _| {
2359                tree.paths()
2360                    .map(|p| p.to_string_lossy())
2361                    .collect::<Vec<_>>()
2362                    == ["file1-renamed", "file3", "file4"]
2363            })
2364            .await;
2365        worktree_c
2366            .condition(&cx_c, |tree, _| {
2367                tree.paths()
2368                    .map(|p| p.to_string_lossy())
2369                    .collect::<Vec<_>>()
2370                    == ["file1-renamed", "file3", "file4"]
2371            })
2372            .await;
2373
2374        // Ensure buffer files are updated as well.
2375        buffer_a
2376            .condition(&cx_a, |buf, _| {
2377                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2378            })
2379            .await;
2380        buffer_b
2381            .condition(&cx_b, |buf, _| {
2382                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2383            })
2384            .await;
2385        buffer_c
2386            .condition(&cx_c, |buf, _| {
2387                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2388            })
2389            .await;
2390    }
2391
2392    #[gpui::test(iterations = 10)]
2393    async fn test_fs_operations(
2394        executor: Arc<Deterministic>,
2395        cx_a: &mut TestAppContext,
2396        cx_b: &mut TestAppContext,
2397    ) {
2398        executor.forbid_parking();
2399        let fs = FakeFs::new(cx_a.background());
2400
2401        // Connect to a server as 2 clients.
2402        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2403        let mut client_a = server.create_client(cx_a, "user_a").await;
2404        let mut client_b = server.create_client(cx_b, "user_b").await;
2405        server
2406            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2407            .await;
2408
2409        // Share a project as client A
2410        fs.insert_tree(
2411            "/dir",
2412            json!({
2413                "a.txt": "a-contents",
2414                "b.txt": "b-contents",
2415            }),
2416        )
2417        .await;
2418
2419        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2420        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2421
2422        let worktree_a =
2423            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2424        let worktree_b =
2425            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2426
2427        let entry = project_b
2428            .update(cx_b, |project, cx| {
2429                project
2430                    .create_entry((worktree_id, "c.txt"), false, cx)
2431                    .unwrap()
2432            })
2433            .await
2434            .unwrap();
2435        worktree_a.read_with(cx_a, |worktree, _| {
2436            assert_eq!(
2437                worktree
2438                    .paths()
2439                    .map(|p| p.to_string_lossy())
2440                    .collect::<Vec<_>>(),
2441                ["a.txt", "b.txt", "c.txt"]
2442            );
2443        });
2444        worktree_b.read_with(cx_b, |worktree, _| {
2445            assert_eq!(
2446                worktree
2447                    .paths()
2448                    .map(|p| p.to_string_lossy())
2449                    .collect::<Vec<_>>(),
2450                ["a.txt", "b.txt", "c.txt"]
2451            );
2452        });
2453
2454        project_b
2455            .update(cx_b, |project, cx| {
2456                project.rename_entry(entry.id, Path::new("d.txt"), cx)
2457            })
2458            .unwrap()
2459            .await
2460            .unwrap();
2461        worktree_a.read_with(cx_a, |worktree, _| {
2462            assert_eq!(
2463                worktree
2464                    .paths()
2465                    .map(|p| p.to_string_lossy())
2466                    .collect::<Vec<_>>(),
2467                ["a.txt", "b.txt", "d.txt"]
2468            );
2469        });
2470        worktree_b.read_with(cx_b, |worktree, _| {
2471            assert_eq!(
2472                worktree
2473                    .paths()
2474                    .map(|p| p.to_string_lossy())
2475                    .collect::<Vec<_>>(),
2476                ["a.txt", "b.txt", "d.txt"]
2477            );
2478        });
2479
2480        let dir_entry = project_b
2481            .update(cx_b, |project, cx| {
2482                project
2483                    .create_entry((worktree_id, "DIR"), true, cx)
2484                    .unwrap()
2485            })
2486            .await
2487            .unwrap();
2488        worktree_a.read_with(cx_a, |worktree, _| {
2489            assert_eq!(
2490                worktree
2491                    .paths()
2492                    .map(|p| p.to_string_lossy())
2493                    .collect::<Vec<_>>(),
2494                ["DIR", "a.txt", "b.txt", "d.txt"]
2495            );
2496        });
2497        worktree_b.read_with(cx_b, |worktree, _| {
2498            assert_eq!(
2499                worktree
2500                    .paths()
2501                    .map(|p| p.to_string_lossy())
2502                    .collect::<Vec<_>>(),
2503                ["DIR", "a.txt", "b.txt", "d.txt"]
2504            );
2505        });
2506
2507        project_b
2508            .update(cx_b, |project, cx| {
2509                project.delete_entry(dir_entry.id, cx).unwrap()
2510            })
2511            .await
2512            .unwrap();
2513        worktree_a.read_with(cx_a, |worktree, _| {
2514            assert_eq!(
2515                worktree
2516                    .paths()
2517                    .map(|p| p.to_string_lossy())
2518                    .collect::<Vec<_>>(),
2519                ["a.txt", "b.txt", "d.txt"]
2520            );
2521        });
2522        worktree_b.read_with(cx_b, |worktree, _| {
2523            assert_eq!(
2524                worktree
2525                    .paths()
2526                    .map(|p| p.to_string_lossy())
2527                    .collect::<Vec<_>>(),
2528                ["a.txt", "b.txt", "d.txt"]
2529            );
2530        });
2531
2532        project_b
2533            .update(cx_b, |project, cx| {
2534                project.delete_entry(entry.id, cx).unwrap()
2535            })
2536            .await
2537            .unwrap();
2538        worktree_a.read_with(cx_a, |worktree, _| {
2539            assert_eq!(
2540                worktree
2541                    .paths()
2542                    .map(|p| p.to_string_lossy())
2543                    .collect::<Vec<_>>(),
2544                ["a.txt", "b.txt"]
2545            );
2546        });
2547        worktree_b.read_with(cx_b, |worktree, _| {
2548            assert_eq!(
2549                worktree
2550                    .paths()
2551                    .map(|p| p.to_string_lossy())
2552                    .collect::<Vec<_>>(),
2553                ["a.txt", "b.txt"]
2554            );
2555        });
2556    }
2557
2558    #[gpui::test(iterations = 10)]
2559    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2560        cx_a.foreground().forbid_parking();
2561        let lang_registry = Arc::new(LanguageRegistry::test());
2562        let fs = FakeFs::new(cx_a.background());
2563
2564        // Connect to a server as 2 clients.
2565        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2566        let client_a = server.create_client(cx_a, "user_a").await;
2567        let mut client_b = server.create_client(cx_b, "user_b").await;
2568        server
2569            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2570            .await;
2571
2572        // Share a project as client A
2573        fs.insert_tree(
2574            "/dir",
2575            json!({
2576                "a.txt": "a-contents",
2577            }),
2578        )
2579        .await;
2580
2581        let project_a = cx_a.update(|cx| {
2582            Project::local(
2583                client_a.clone(),
2584                client_a.user_store.clone(),
2585                lang_registry.clone(),
2586                fs.clone(),
2587                cx,
2588            )
2589        });
2590        let (worktree_a, _) = project_a
2591            .update(cx_a, |p, cx| {
2592                p.find_or_create_local_worktree("/dir", true, cx)
2593            })
2594            .await
2595            .unwrap();
2596        worktree_a
2597            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2598            .await;
2599        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2600
2601        // Join that project as client B
2602        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2603
2604        // Open a buffer as client B
2605        let buffer_b = project_b
2606            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2607            .await
2608            .unwrap();
2609
2610        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2611        buffer_b.read_with(cx_b, |buf, _| {
2612            assert!(buf.is_dirty());
2613            assert!(!buf.has_conflict());
2614        });
2615
2616        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2617        buffer_b
2618            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2619            .await;
2620        buffer_b.read_with(cx_b, |buf, _| {
2621            assert!(!buf.has_conflict());
2622        });
2623
2624        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2625        buffer_b.read_with(cx_b, |buf, _| {
2626            assert!(buf.is_dirty());
2627            assert!(!buf.has_conflict());
2628        });
2629    }
2630
2631    #[gpui::test(iterations = 10)]
2632    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2633        cx_a.foreground().forbid_parking();
2634        let lang_registry = Arc::new(LanguageRegistry::test());
2635        let fs = FakeFs::new(cx_a.background());
2636
2637        // Connect to a server as 2 clients.
2638        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2639        let client_a = server.create_client(cx_a, "user_a").await;
2640        let mut client_b = server.create_client(cx_b, "user_b").await;
2641        server
2642            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2643            .await;
2644
2645        // Share a project as client A
2646        fs.insert_tree(
2647            "/dir",
2648            json!({
2649                "a.txt": "a-contents",
2650            }),
2651        )
2652        .await;
2653
2654        let project_a = cx_a.update(|cx| {
2655            Project::local(
2656                client_a.clone(),
2657                client_a.user_store.clone(),
2658                lang_registry.clone(),
2659                fs.clone(),
2660                cx,
2661            )
2662        });
2663        let (worktree_a, _) = project_a
2664            .update(cx_a, |p, cx| {
2665                p.find_or_create_local_worktree("/dir", true, cx)
2666            })
2667            .await
2668            .unwrap();
2669        worktree_a
2670            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2671            .await;
2672        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2673
2674        // Join that project as client B
2675        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2676        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2677
2678        // Open a buffer as client B
2679        let buffer_b = project_b
2680            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2681            .await
2682            .unwrap();
2683        buffer_b.read_with(cx_b, |buf, _| {
2684            assert!(!buf.is_dirty());
2685            assert!(!buf.has_conflict());
2686        });
2687
2688        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2689            .await
2690            .unwrap();
2691        buffer_b
2692            .condition(&cx_b, |buf, _| {
2693                buf.text() == "new contents" && !buf.is_dirty()
2694            })
2695            .await;
2696        buffer_b.read_with(cx_b, |buf, _| {
2697            assert!(!buf.has_conflict());
2698        });
2699    }
2700
2701    #[gpui::test(iterations = 10)]
2702    async fn test_editing_while_guest_opens_buffer(
2703        cx_a: &mut TestAppContext,
2704        cx_b: &mut TestAppContext,
2705    ) {
2706        cx_a.foreground().forbid_parking();
2707        let lang_registry = Arc::new(LanguageRegistry::test());
2708        let fs = FakeFs::new(cx_a.background());
2709
2710        // Connect to a server as 2 clients.
2711        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2712        let client_a = server.create_client(cx_a, "user_a").await;
2713        let mut client_b = server.create_client(cx_b, "user_b").await;
2714        server
2715            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2716            .await;
2717
2718        // Share a project as client A
2719        fs.insert_tree(
2720            "/dir",
2721            json!({
2722                "a.txt": "a-contents",
2723            }),
2724        )
2725        .await;
2726        let project_a = cx_a.update(|cx| {
2727            Project::local(
2728                client_a.clone(),
2729                client_a.user_store.clone(),
2730                lang_registry.clone(),
2731                fs.clone(),
2732                cx,
2733            )
2734        });
2735        let (worktree_a, _) = project_a
2736            .update(cx_a, |p, cx| {
2737                p.find_or_create_local_worktree("/dir", true, cx)
2738            })
2739            .await
2740            .unwrap();
2741        worktree_a
2742            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2743            .await;
2744        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2745
2746        // Join that project as client B
2747        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2748
2749        // Open a buffer as client A
2750        let buffer_a = project_a
2751            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2752            .await
2753            .unwrap();
2754
2755        // Start opening the same buffer as client B
2756        let buffer_b = cx_b
2757            .background()
2758            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2759
2760        // Edit the buffer as client A while client B is still opening it.
2761        cx_b.background().simulate_random_delay().await;
2762        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2763        cx_b.background().simulate_random_delay().await;
2764        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2765
2766        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2767        let buffer_b = buffer_b.await.unwrap();
2768        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2769    }
2770
2771    #[gpui::test(iterations = 10)]
2772    async fn test_leaving_worktree_while_opening_buffer(
2773        cx_a: &mut TestAppContext,
2774        cx_b: &mut TestAppContext,
2775    ) {
2776        cx_a.foreground().forbid_parking();
2777        let lang_registry = Arc::new(LanguageRegistry::test());
2778        let fs = FakeFs::new(cx_a.background());
2779
2780        // Connect to a server as 2 clients.
2781        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2782        let client_a = server.create_client(cx_a, "user_a").await;
2783        let mut client_b = server.create_client(cx_b, "user_b").await;
2784        server
2785            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2786            .await;
2787
2788        // Share a project as client A
2789        fs.insert_tree(
2790            "/dir",
2791            json!({
2792                "a.txt": "a-contents",
2793            }),
2794        )
2795        .await;
2796        let project_a = cx_a.update(|cx| {
2797            Project::local(
2798                client_a.clone(),
2799                client_a.user_store.clone(),
2800                lang_registry.clone(),
2801                fs.clone(),
2802                cx,
2803            )
2804        });
2805        let (worktree_a, _) = project_a
2806            .update(cx_a, |p, cx| {
2807                p.find_or_create_local_worktree("/dir", true, cx)
2808            })
2809            .await
2810            .unwrap();
2811        worktree_a
2812            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2813            .await;
2814        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2815
2816        // Join that project as client B
2817        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2818
2819        // See that a guest has joined as client A.
2820        project_a
2821            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2822            .await;
2823
2824        // Begin opening a buffer as client B, but leave the project before the open completes.
2825        let buffer_b = cx_b
2826            .background()
2827            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2828        cx_b.update(|_| {
2829            drop(client_b.project.take());
2830            drop(project_b);
2831        });
2832        drop(buffer_b);
2833
2834        // See that the guest has left.
2835        project_a
2836            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2837            .await;
2838    }
2839
2840    #[gpui::test(iterations = 10)]
2841    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2842        cx_a.foreground().forbid_parking();
2843        let lang_registry = Arc::new(LanguageRegistry::test());
2844        let fs = FakeFs::new(cx_a.background());
2845
2846        // Connect to a server as 2 clients.
2847        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2848        let client_a = server.create_client(cx_a, "user_a").await;
2849        let mut client_b = server.create_client(cx_b, "user_b").await;
2850        server
2851            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2852            .await;
2853
2854        // Share a project as client A
2855        fs.insert_tree(
2856            "/a",
2857            json!({
2858                "a.txt": "a-contents",
2859                "b.txt": "b-contents",
2860            }),
2861        )
2862        .await;
2863        let project_a = cx_a.update(|cx| {
2864            Project::local(
2865                client_a.clone(),
2866                client_a.user_store.clone(),
2867                lang_registry.clone(),
2868                fs.clone(),
2869                cx,
2870            )
2871        });
2872        let (worktree_a, _) = project_a
2873            .update(cx_a, |p, cx| {
2874                p.find_or_create_local_worktree("/a", true, cx)
2875            })
2876            .await
2877            .unwrap();
2878        worktree_a
2879            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2880            .await;
2881
2882        // Join that project as client B
2883        let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2884
2885        // Client A sees that a guest has joined.
2886        project_a
2887            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2888            .await;
2889
2890        // Drop client B's connection and ensure client A observes client B leaving the project.
2891        client_b.disconnect(&cx_b.to_async()).unwrap();
2892        project_a
2893            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2894            .await;
2895
2896        // Rejoin the project as client B
2897        let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2898
2899        // Client A sees that a guest has re-joined.
2900        project_a
2901            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2902            .await;
2903
2904        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2905        client_b.wait_for_current_user(cx_b).await;
2906        server.disconnect_client(client_b.current_user_id(cx_b));
2907        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2908        project_a
2909            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2910            .await;
2911    }
2912
2913    #[gpui::test(iterations = 10)]
2914    async fn test_collaborating_with_diagnostics(
2915        deterministic: Arc<Deterministic>,
2916        cx_a: &mut TestAppContext,
2917        cx_b: &mut TestAppContext,
2918        cx_c: &mut TestAppContext,
2919    ) {
2920        deterministic.forbid_parking();
2921        let lang_registry = Arc::new(LanguageRegistry::test());
2922        let fs = FakeFs::new(cx_a.background());
2923
2924        // Set up a fake language server.
2925        let mut language = Language::new(
2926            LanguageConfig {
2927                name: "Rust".into(),
2928                path_suffixes: vec!["rs".to_string()],
2929                ..Default::default()
2930            },
2931            Some(tree_sitter_rust::language()),
2932        );
2933        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2934        lang_registry.add(Arc::new(language));
2935
2936        // Connect to a server as 2 clients.
2937        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2938        let client_a = server.create_client(cx_a, "user_a").await;
2939        let mut client_b = server.create_client(cx_b, "user_b").await;
2940        let mut client_c = server.create_client(cx_c, "user_c").await;
2941        server
2942            .make_contacts(vec![
2943                (&client_a, cx_a),
2944                (&client_b, cx_b),
2945                (&client_c, cx_c),
2946            ])
2947            .await;
2948
2949        // Share a project as client A
2950        fs.insert_tree(
2951            "/a",
2952            json!({
2953                "a.rs": "let one = two",
2954                "other.rs": "",
2955            }),
2956        )
2957        .await;
2958        let project_a = cx_a.update(|cx| {
2959            Project::local(
2960                client_a.clone(),
2961                client_a.user_store.clone(),
2962                lang_registry.clone(),
2963                fs.clone(),
2964                cx,
2965            )
2966        });
2967        let (worktree_a, _) = project_a
2968            .update(cx_a, |p, cx| {
2969                p.find_or_create_local_worktree("/a", true, cx)
2970            })
2971            .await
2972            .unwrap();
2973        worktree_a
2974            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2975            .await;
2976        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2977        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2978
2979        // Cause the language server to start.
2980        let _buffer = cx_a
2981            .background()
2982            .spawn(project_a.update(cx_a, |project, cx| {
2983                project.open_buffer(
2984                    ProjectPath {
2985                        worktree_id,
2986                        path: Path::new("other.rs").into(),
2987                    },
2988                    cx,
2989                )
2990            }))
2991            .await
2992            .unwrap();
2993
2994        // Join the worktree as client B.
2995        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2996
2997        // Simulate a language server reporting errors for a file.
2998        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2999        fake_language_server
3000            .receive_notification::<lsp::notification::DidOpenTextDocument>()
3001            .await;
3002        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3003            lsp::PublishDiagnosticsParams {
3004                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3005                version: None,
3006                diagnostics: vec![lsp::Diagnostic {
3007                    severity: Some(lsp::DiagnosticSeverity::ERROR),
3008                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
3009                    message: "message 1".to_string(),
3010                    ..Default::default()
3011                }],
3012            },
3013        );
3014
3015        // Wait for server to see the diagnostics update.
3016        deterministic.run_until_parked();
3017        {
3018            let store = server.store.read().await;
3019            let project = store.project(project_id).unwrap();
3020            let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap();
3021            assert!(!worktree.diagnostic_summaries.is_empty());
3022        }
3023
3024        // Ensure client B observes the new diagnostics.
3025        project_b.read_with(cx_b, |project, cx| {
3026            assert_eq!(
3027                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3028                &[(
3029                    ProjectPath {
3030                        worktree_id,
3031                        path: Arc::from(Path::new("a.rs")),
3032                    },
3033                    DiagnosticSummary {
3034                        error_count: 1,
3035                        warning_count: 0,
3036                        ..Default::default()
3037                    },
3038                )]
3039            )
3040        });
3041
3042        // Join project as client C and observe the diagnostics.
3043        let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
3044        project_c.read_with(cx_c, |project, cx| {
3045            assert_eq!(
3046                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3047                &[(
3048                    ProjectPath {
3049                        worktree_id,
3050                        path: Arc::from(Path::new("a.rs")),
3051                    },
3052                    DiagnosticSummary {
3053                        error_count: 1,
3054                        warning_count: 0,
3055                        ..Default::default()
3056                    },
3057                )]
3058            )
3059        });
3060
3061        // Simulate a language server reporting more errors for a file.
3062        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3063            lsp::PublishDiagnosticsParams {
3064                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3065                version: None,
3066                diagnostics: vec![
3067                    lsp::Diagnostic {
3068                        severity: Some(lsp::DiagnosticSeverity::ERROR),
3069                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
3070                        message: "message 1".to_string(),
3071                        ..Default::default()
3072                    },
3073                    lsp::Diagnostic {
3074                        severity: Some(lsp::DiagnosticSeverity::WARNING),
3075                        range: lsp::Range::new(
3076                            lsp::Position::new(0, 10),
3077                            lsp::Position::new(0, 13),
3078                        ),
3079                        message: "message 2".to_string(),
3080                        ..Default::default()
3081                    },
3082                ],
3083            },
3084        );
3085
3086        // Clients B and C get the updated summaries
3087        deterministic.run_until_parked();
3088        project_b.read_with(cx_b, |project, cx| {
3089            assert_eq!(
3090                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3091                [(
3092                    ProjectPath {
3093                        worktree_id,
3094                        path: Arc::from(Path::new("a.rs")),
3095                    },
3096                    DiagnosticSummary {
3097                        error_count: 1,
3098                        warning_count: 1,
3099                        ..Default::default()
3100                    },
3101                )]
3102            );
3103        });
3104        project_c.read_with(cx_c, |project, cx| {
3105            assert_eq!(
3106                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3107                [(
3108                    ProjectPath {
3109                        worktree_id,
3110                        path: Arc::from(Path::new("a.rs")),
3111                    },
3112                    DiagnosticSummary {
3113                        error_count: 1,
3114                        warning_count: 1,
3115                        ..Default::default()
3116                    },
3117                )]
3118            );
3119        });
3120
3121        // Open the file with the errors on client B. They should be present.
3122        let buffer_b = cx_b
3123            .background()
3124            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3125            .await
3126            .unwrap();
3127
3128        buffer_b.read_with(cx_b, |buffer, _| {
3129            assert_eq!(
3130                buffer
3131                    .snapshot()
3132                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
3133                    .map(|entry| entry)
3134                    .collect::<Vec<_>>(),
3135                &[
3136                    DiagnosticEntry {
3137                        range: Point::new(0, 4)..Point::new(0, 7),
3138                        diagnostic: Diagnostic {
3139                            group_id: 0,
3140                            message: "message 1".to_string(),
3141                            severity: lsp::DiagnosticSeverity::ERROR,
3142                            is_primary: true,
3143                            ..Default::default()
3144                        }
3145                    },
3146                    DiagnosticEntry {
3147                        range: Point::new(0, 10)..Point::new(0, 13),
3148                        diagnostic: Diagnostic {
3149                            group_id: 1,
3150                            severity: lsp::DiagnosticSeverity::WARNING,
3151                            message: "message 2".to_string(),
3152                            is_primary: true,
3153                            ..Default::default()
3154                        }
3155                    }
3156                ]
3157            );
3158        });
3159
3160        // Simulate a language server reporting no errors for a file.
3161        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3162            lsp::PublishDiagnosticsParams {
3163                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3164                version: None,
3165                diagnostics: vec![],
3166            },
3167        );
3168        deterministic.run_until_parked();
3169        project_a.read_with(cx_a, |project, cx| {
3170            assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3171        });
3172        project_b.read_with(cx_b, |project, cx| {
3173            assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3174        });
3175        project_c.read_with(cx_c, |project, cx| {
3176            assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3177        });
3178    }
3179
3180    #[gpui::test(iterations = 10)]
3181    async fn test_collaborating_with_completion(
3182        cx_a: &mut TestAppContext,
3183        cx_b: &mut TestAppContext,
3184    ) {
3185        cx_a.foreground().forbid_parking();
3186        let lang_registry = Arc::new(LanguageRegistry::test());
3187        let fs = FakeFs::new(cx_a.background());
3188
3189        // Set up a fake language server.
3190        let mut language = Language::new(
3191            LanguageConfig {
3192                name: "Rust".into(),
3193                path_suffixes: vec!["rs".to_string()],
3194                ..Default::default()
3195            },
3196            Some(tree_sitter_rust::language()),
3197        );
3198        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3199            capabilities: lsp::ServerCapabilities {
3200                completion_provider: Some(lsp::CompletionOptions {
3201                    trigger_characters: Some(vec![".".to_string()]),
3202                    ..Default::default()
3203                }),
3204                ..Default::default()
3205            },
3206            ..Default::default()
3207        });
3208        lang_registry.add(Arc::new(language));
3209
3210        // Connect to a server as 2 clients.
3211        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3212        let client_a = server.create_client(cx_a, "user_a").await;
3213        let mut client_b = server.create_client(cx_b, "user_b").await;
3214        server
3215            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3216            .await;
3217
3218        // Share a project as client A
3219        fs.insert_tree(
3220            "/a",
3221            json!({
3222                "main.rs": "fn main() { a }",
3223                "other.rs": "",
3224            }),
3225        )
3226        .await;
3227        let project_a = cx_a.update(|cx| {
3228            Project::local(
3229                client_a.clone(),
3230                client_a.user_store.clone(),
3231                lang_registry.clone(),
3232                fs.clone(),
3233                cx,
3234            )
3235        });
3236        let (worktree_a, _) = project_a
3237            .update(cx_a, |p, cx| {
3238                p.find_or_create_local_worktree("/a", true, cx)
3239            })
3240            .await
3241            .unwrap();
3242        worktree_a
3243            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3244            .await;
3245        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3246
3247        // Join the worktree as client B.
3248        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3249
3250        // Open a file in an editor as the guest.
3251        let buffer_b = project_b
3252            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3253            .await
3254            .unwrap();
3255        let (window_b, _) = cx_b.add_window(|_| EmptyView);
3256        let editor_b = cx_b.add_view(window_b, |cx| {
3257            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
3258        });
3259
3260        let fake_language_server = fake_language_servers.next().await.unwrap();
3261        buffer_b
3262            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3263            .await;
3264
3265        // Type a completion trigger character as the guest.
3266        editor_b.update(cx_b, |editor, cx| {
3267            editor.change_selections(None, cx, |s| s.select_ranges([13..13]));
3268            editor.handle_input(&Input(".".into()), cx);
3269            cx.focus(&editor_b);
3270        });
3271
3272        // Receive a completion request as the host's language server.
3273        // Return some completions from the host's language server.
3274        cx_a.foreground().start_waiting();
3275        fake_language_server
3276            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3277                assert_eq!(
3278                    params.text_document_position.text_document.uri,
3279                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3280                );
3281                assert_eq!(
3282                    params.text_document_position.position,
3283                    lsp::Position::new(0, 14),
3284                );
3285
3286                Ok(Some(lsp::CompletionResponse::Array(vec![
3287                    lsp::CompletionItem {
3288                        label: "first_method(…)".into(),
3289                        detail: Some("fn(&mut self, B) -> C".into()),
3290                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3291                            new_text: "first_method($1)".to_string(),
3292                            range: lsp::Range::new(
3293                                lsp::Position::new(0, 14),
3294                                lsp::Position::new(0, 14),
3295                            ),
3296                        })),
3297                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3298                        ..Default::default()
3299                    },
3300                    lsp::CompletionItem {
3301                        label: "second_method(…)".into(),
3302                        detail: Some("fn(&mut self, C) -> D<E>".into()),
3303                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3304                            new_text: "second_method()".to_string(),
3305                            range: lsp::Range::new(
3306                                lsp::Position::new(0, 14),
3307                                lsp::Position::new(0, 14),
3308                            ),
3309                        })),
3310                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3311                        ..Default::default()
3312                    },
3313                ])))
3314            })
3315            .next()
3316            .await
3317            .unwrap();
3318        cx_a.foreground().finish_waiting();
3319
3320        // Open the buffer on the host.
3321        let buffer_a = project_a
3322            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3323            .await
3324            .unwrap();
3325        buffer_a
3326            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3327            .await;
3328
3329        // Confirm a completion on the guest.
3330        editor_b
3331            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3332            .await;
3333        editor_b.update(cx_b, |editor, cx| {
3334            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3335            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3336        });
3337
3338        // Return a resolved completion from the host's language server.
3339        // The resolved completion has an additional text edit.
3340        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3341            |params, _| async move {
3342                assert_eq!(params.label, "first_method(…)");
3343                Ok(lsp::CompletionItem {
3344                    label: "first_method(…)".into(),
3345                    detail: Some("fn(&mut self, B) -> C".into()),
3346                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3347                        new_text: "first_method($1)".to_string(),
3348                        range: lsp::Range::new(
3349                            lsp::Position::new(0, 14),
3350                            lsp::Position::new(0, 14),
3351                        ),
3352                    })),
3353                    additional_text_edits: Some(vec![lsp::TextEdit {
3354                        new_text: "use d::SomeTrait;\n".to_string(),
3355                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3356                    }]),
3357                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3358                    ..Default::default()
3359                })
3360            },
3361        );
3362
3363        // The additional edit is applied.
3364        buffer_a
3365            .condition(&cx_a, |buffer, _| {
3366                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3367            })
3368            .await;
3369        buffer_b
3370            .condition(&cx_b, |buffer, _| {
3371                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3372            })
3373            .await;
3374    }
3375
3376    #[gpui::test(iterations = 10)]
3377    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3378        cx_a.foreground().forbid_parking();
3379        let lang_registry = Arc::new(LanguageRegistry::test());
3380        let fs = FakeFs::new(cx_a.background());
3381
3382        // Connect to a server as 2 clients.
3383        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3384        let client_a = server.create_client(cx_a, "user_a").await;
3385        let mut client_b = server.create_client(cx_b, "user_b").await;
3386        server
3387            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3388            .await;
3389
3390        // Share a project as client A
3391        fs.insert_tree(
3392            "/a",
3393            json!({
3394                "a.rs": "let one = 1;",
3395            }),
3396        )
3397        .await;
3398        let project_a = cx_a.update(|cx| {
3399            Project::local(
3400                client_a.clone(),
3401                client_a.user_store.clone(),
3402                lang_registry.clone(),
3403                fs.clone(),
3404                cx,
3405            )
3406        });
3407        let (worktree_a, _) = project_a
3408            .update(cx_a, |p, cx| {
3409                p.find_or_create_local_worktree("/a", true, cx)
3410            })
3411            .await
3412            .unwrap();
3413        worktree_a
3414            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3415            .await;
3416        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3417        let buffer_a = project_a
3418            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3419            .await
3420            .unwrap();
3421
3422        // Join the worktree as client B.
3423        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3424
3425        let buffer_b = cx_b
3426            .background()
3427            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3428            .await
3429            .unwrap();
3430        buffer_b.update(cx_b, |buffer, cx| {
3431            buffer.edit([(4..7, "six")], cx);
3432            buffer.edit([(10..11, "6")], cx);
3433            assert_eq!(buffer.text(), "let six = 6;");
3434            assert!(buffer.is_dirty());
3435            assert!(!buffer.has_conflict());
3436        });
3437        buffer_a
3438            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3439            .await;
3440
3441        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3442            .await
3443            .unwrap();
3444        buffer_a
3445            .condition(cx_a, |buffer, _| buffer.has_conflict())
3446            .await;
3447        buffer_b
3448            .condition(cx_b, |buffer, _| buffer.has_conflict())
3449            .await;
3450
3451        project_b
3452            .update(cx_b, |project, cx| {
3453                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3454            })
3455            .await
3456            .unwrap();
3457        buffer_a.read_with(cx_a, |buffer, _| {
3458            assert_eq!(buffer.text(), "let seven = 7;");
3459            assert!(!buffer.is_dirty());
3460            assert!(!buffer.has_conflict());
3461        });
3462        buffer_b.read_with(cx_b, |buffer, _| {
3463            assert_eq!(buffer.text(), "let seven = 7;");
3464            assert!(!buffer.is_dirty());
3465            assert!(!buffer.has_conflict());
3466        });
3467
3468        buffer_a.update(cx_a, |buffer, cx| {
3469            // Undoing on the host is a no-op when the reload was initiated by the guest.
3470            buffer.undo(cx);
3471            assert_eq!(buffer.text(), "let seven = 7;");
3472            assert!(!buffer.is_dirty());
3473            assert!(!buffer.has_conflict());
3474        });
3475        buffer_b.update(cx_b, |buffer, cx| {
3476            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3477            buffer.undo(cx);
3478            assert_eq!(buffer.text(), "let six = 6;");
3479            assert!(buffer.is_dirty());
3480            assert!(!buffer.has_conflict());
3481        });
3482    }
3483
3484    #[gpui::test(iterations = 10)]
3485    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3486        cx_a.foreground().forbid_parking();
3487        let lang_registry = Arc::new(LanguageRegistry::test());
3488        let fs = FakeFs::new(cx_a.background());
3489
3490        // Set up a fake language server.
3491        let mut language = Language::new(
3492            LanguageConfig {
3493                name: "Rust".into(),
3494                path_suffixes: vec!["rs".to_string()],
3495                ..Default::default()
3496            },
3497            Some(tree_sitter_rust::language()),
3498        );
3499        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3500        lang_registry.add(Arc::new(language));
3501
3502        // Connect to a server as 2 clients.
3503        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3504        let client_a = server.create_client(cx_a, "user_a").await;
3505        let mut client_b = server.create_client(cx_b, "user_b").await;
3506        server
3507            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3508            .await;
3509
3510        // Share a project as client A
3511        fs.insert_tree(
3512            "/a",
3513            json!({
3514                "a.rs": "let one = two",
3515            }),
3516        )
3517        .await;
3518        let project_a = cx_a.update(|cx| {
3519            Project::local(
3520                client_a.clone(),
3521                client_a.user_store.clone(),
3522                lang_registry.clone(),
3523                fs.clone(),
3524                cx,
3525            )
3526        });
3527        let (worktree_a, _) = project_a
3528            .update(cx_a, |p, cx| {
3529                p.find_or_create_local_worktree("/a", true, cx)
3530            })
3531            .await
3532            .unwrap();
3533        worktree_a
3534            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3535            .await;
3536        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3537
3538        // Join the project as client B.
3539        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3540
3541        let buffer_b = cx_b
3542            .background()
3543            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3544            .await
3545            .unwrap();
3546
3547        let fake_language_server = fake_language_servers.next().await.unwrap();
3548        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3549            Ok(Some(vec![
3550                lsp::TextEdit {
3551                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3552                    new_text: "h".to_string(),
3553                },
3554                lsp::TextEdit {
3555                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3556                    new_text: "y".to_string(),
3557                },
3558            ]))
3559        });
3560
3561        project_b
3562            .update(cx_b, |project, cx| {
3563                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3564            })
3565            .await
3566            .unwrap();
3567        assert_eq!(
3568            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3569            "let honey = two"
3570        );
3571    }
3572
3573    #[gpui::test(iterations = 10)]
3574    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3575        cx_a.foreground().forbid_parking();
3576        let lang_registry = Arc::new(LanguageRegistry::test());
3577        let fs = FakeFs::new(cx_a.background());
3578        fs.insert_tree(
3579            "/root-1",
3580            json!({
3581                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3582            }),
3583        )
3584        .await;
3585        fs.insert_tree(
3586            "/root-2",
3587            json!({
3588                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3589            }),
3590        )
3591        .await;
3592
3593        // Set up a fake language server.
3594        let mut language = Language::new(
3595            LanguageConfig {
3596                name: "Rust".into(),
3597                path_suffixes: vec!["rs".to_string()],
3598                ..Default::default()
3599            },
3600            Some(tree_sitter_rust::language()),
3601        );
3602        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3603        lang_registry.add(Arc::new(language));
3604
3605        // Connect to a server as 2 clients.
3606        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3607        let client_a = server.create_client(cx_a, "user_a").await;
3608        let mut client_b = server.create_client(cx_b, "user_b").await;
3609        server
3610            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3611            .await;
3612
3613        // Share a project as client A
3614        let project_a = cx_a.update(|cx| {
3615            Project::local(
3616                client_a.clone(),
3617                client_a.user_store.clone(),
3618                lang_registry.clone(),
3619                fs.clone(),
3620                cx,
3621            )
3622        });
3623        let (worktree_a, _) = project_a
3624            .update(cx_a, |p, cx| {
3625                p.find_or_create_local_worktree("/root-1", true, cx)
3626            })
3627            .await
3628            .unwrap();
3629        worktree_a
3630            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3631            .await;
3632        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3633
3634        // Join the project as client B.
3635        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3636
3637        // Open the file on client B.
3638        let buffer_b = cx_b
3639            .background()
3640            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3641            .await
3642            .unwrap();
3643
3644        // Request the definition of a symbol as the guest.
3645        let fake_language_server = fake_language_servers.next().await.unwrap();
3646        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3647            |_, _| async move {
3648                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3649                    lsp::Location::new(
3650                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3651                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3652                    ),
3653                )))
3654            },
3655        );
3656
3657        let definitions_1 = project_b
3658            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3659            .await
3660            .unwrap();
3661        cx_b.read(|cx| {
3662            assert_eq!(definitions_1.len(), 1);
3663            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3664            let target_buffer = definitions_1[0].buffer.read(cx);
3665            assert_eq!(
3666                target_buffer.text(),
3667                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3668            );
3669            assert_eq!(
3670                definitions_1[0].range.to_point(target_buffer),
3671                Point::new(0, 6)..Point::new(0, 9)
3672            );
3673        });
3674
3675        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3676        // the previous call to `definition`.
3677        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3678            |_, _| async move {
3679                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3680                    lsp::Location::new(
3681                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3682                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3683                    ),
3684                )))
3685            },
3686        );
3687
3688        let definitions_2 = project_b
3689            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3690            .await
3691            .unwrap();
3692        cx_b.read(|cx| {
3693            assert_eq!(definitions_2.len(), 1);
3694            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3695            let target_buffer = definitions_2[0].buffer.read(cx);
3696            assert_eq!(
3697                target_buffer.text(),
3698                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3699            );
3700            assert_eq!(
3701                definitions_2[0].range.to_point(target_buffer),
3702                Point::new(1, 6)..Point::new(1, 11)
3703            );
3704        });
3705        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3706    }
3707
3708    #[gpui::test(iterations = 10)]
3709    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3710        cx_a.foreground().forbid_parking();
3711        let lang_registry = Arc::new(LanguageRegistry::test());
3712        let fs = FakeFs::new(cx_a.background());
3713        fs.insert_tree(
3714            "/root-1",
3715            json!({
3716                "one.rs": "const ONE: usize = 1;",
3717                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3718            }),
3719        )
3720        .await;
3721        fs.insert_tree(
3722            "/root-2",
3723            json!({
3724                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3725            }),
3726        )
3727        .await;
3728
3729        // Set up a fake language server.
3730        let mut language = Language::new(
3731            LanguageConfig {
3732                name: "Rust".into(),
3733                path_suffixes: vec!["rs".to_string()],
3734                ..Default::default()
3735            },
3736            Some(tree_sitter_rust::language()),
3737        );
3738        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3739        lang_registry.add(Arc::new(language));
3740
3741        // Connect to a server as 2 clients.
3742        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3743        let client_a = server.create_client(cx_a, "user_a").await;
3744        let mut client_b = server.create_client(cx_b, "user_b").await;
3745        server
3746            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3747            .await;
3748
3749        // Share a project as client A
3750        let project_a = cx_a.update(|cx| {
3751            Project::local(
3752                client_a.clone(),
3753                client_a.user_store.clone(),
3754                lang_registry.clone(),
3755                fs.clone(),
3756                cx,
3757            )
3758        });
3759        let (worktree_a, _) = project_a
3760            .update(cx_a, |p, cx| {
3761                p.find_or_create_local_worktree("/root-1", true, cx)
3762            })
3763            .await
3764            .unwrap();
3765        worktree_a
3766            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3767            .await;
3768        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3769
3770        // Join the worktree as client B.
3771        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3772
3773        // Open the file on client B.
3774        let buffer_b = cx_b
3775            .background()
3776            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3777            .await
3778            .unwrap();
3779
3780        // Request references to a symbol as the guest.
3781        let fake_language_server = fake_language_servers.next().await.unwrap();
3782        fake_language_server.handle_request::<lsp::request::References, _, _>(
3783            |params, _| async move {
3784                assert_eq!(
3785                    params.text_document_position.text_document.uri.as_str(),
3786                    "file:///root-1/one.rs"
3787                );
3788                Ok(Some(vec![
3789                    lsp::Location {
3790                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3791                        range: lsp::Range::new(
3792                            lsp::Position::new(0, 24),
3793                            lsp::Position::new(0, 27),
3794                        ),
3795                    },
3796                    lsp::Location {
3797                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3798                        range: lsp::Range::new(
3799                            lsp::Position::new(0, 35),
3800                            lsp::Position::new(0, 38),
3801                        ),
3802                    },
3803                    lsp::Location {
3804                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3805                        range: lsp::Range::new(
3806                            lsp::Position::new(0, 37),
3807                            lsp::Position::new(0, 40),
3808                        ),
3809                    },
3810                ]))
3811            },
3812        );
3813
3814        let references = project_b
3815            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3816            .await
3817            .unwrap();
3818        cx_b.read(|cx| {
3819            assert_eq!(references.len(), 3);
3820            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3821
3822            let two_buffer = references[0].buffer.read(cx);
3823            let three_buffer = references[2].buffer.read(cx);
3824            assert_eq!(
3825                two_buffer.file().unwrap().path().as_ref(),
3826                Path::new("two.rs")
3827            );
3828            assert_eq!(references[1].buffer, references[0].buffer);
3829            assert_eq!(
3830                three_buffer.file().unwrap().full_path(cx),
3831                Path::new("three.rs")
3832            );
3833
3834            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3835            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3836            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3837        });
3838    }
3839
3840    #[gpui::test(iterations = 10)]
3841    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3842        cx_a.foreground().forbid_parking();
3843        let lang_registry = Arc::new(LanguageRegistry::test());
3844        let fs = FakeFs::new(cx_a.background());
3845        fs.insert_tree(
3846            "/root-1",
3847            json!({
3848                "a": "hello world",
3849                "b": "goodnight moon",
3850                "c": "a world of goo",
3851                "d": "world champion of clown world",
3852            }),
3853        )
3854        .await;
3855        fs.insert_tree(
3856            "/root-2",
3857            json!({
3858                "e": "disney world is fun",
3859            }),
3860        )
3861        .await;
3862
3863        // Connect to a server as 2 clients.
3864        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3865        let client_a = server.create_client(cx_a, "user_a").await;
3866        let mut client_b = server.create_client(cx_b, "user_b").await;
3867        server
3868            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3869            .await;
3870
3871        // Share a project as client A
3872        let project_a = cx_a.update(|cx| {
3873            Project::local(
3874                client_a.clone(),
3875                client_a.user_store.clone(),
3876                lang_registry.clone(),
3877                fs.clone(),
3878                cx,
3879            )
3880        });
3881
3882        let (worktree_1, _) = project_a
3883            .update(cx_a, |p, cx| {
3884                p.find_or_create_local_worktree("/root-1", true, cx)
3885            })
3886            .await
3887            .unwrap();
3888        worktree_1
3889            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3890            .await;
3891        let (worktree_2, _) = project_a
3892            .update(cx_a, |p, cx| {
3893                p.find_or_create_local_worktree("/root-2", true, cx)
3894            })
3895            .await
3896            .unwrap();
3897        worktree_2
3898            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3899            .await;
3900
3901        // Join the worktree as client B.
3902        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3903        let results = project_b
3904            .update(cx_b, |project, cx| {
3905                project.search(SearchQuery::text("world", false, false), cx)
3906            })
3907            .await
3908            .unwrap();
3909
3910        let mut ranges_by_path = results
3911            .into_iter()
3912            .map(|(buffer, ranges)| {
3913                buffer.read_with(cx_b, |buffer, cx| {
3914                    let path = buffer.file().unwrap().full_path(cx);
3915                    let offset_ranges = ranges
3916                        .into_iter()
3917                        .map(|range| range.to_offset(buffer))
3918                        .collect::<Vec<_>>();
3919                    (path, offset_ranges)
3920                })
3921            })
3922            .collect::<Vec<_>>();
3923        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3924
3925        assert_eq!(
3926            ranges_by_path,
3927            &[
3928                (PathBuf::from("root-1/a"), vec![6..11]),
3929                (PathBuf::from("root-1/c"), vec![2..7]),
3930                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3931                (PathBuf::from("root-2/e"), vec![7..12]),
3932            ]
3933        );
3934    }
3935
3936    #[gpui::test(iterations = 10)]
3937    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3938        cx_a.foreground().forbid_parking();
3939        let lang_registry = Arc::new(LanguageRegistry::test());
3940        let fs = FakeFs::new(cx_a.background());
3941        fs.insert_tree(
3942            "/root-1",
3943            json!({
3944                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3945            }),
3946        )
3947        .await;
3948
3949        // Set up a fake language server.
3950        let mut language = Language::new(
3951            LanguageConfig {
3952                name: "Rust".into(),
3953                path_suffixes: vec!["rs".to_string()],
3954                ..Default::default()
3955            },
3956            Some(tree_sitter_rust::language()),
3957        );
3958        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3959        lang_registry.add(Arc::new(language));
3960
3961        // Connect to a server as 2 clients.
3962        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3963        let client_a = server.create_client(cx_a, "user_a").await;
3964        let mut client_b = server.create_client(cx_b, "user_b").await;
3965        server
3966            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3967            .await;
3968
3969        // Share a project as client A
3970        let project_a = cx_a.update(|cx| {
3971            Project::local(
3972                client_a.clone(),
3973                client_a.user_store.clone(),
3974                lang_registry.clone(),
3975                fs.clone(),
3976                cx,
3977            )
3978        });
3979        let (worktree_a, _) = project_a
3980            .update(cx_a, |p, cx| {
3981                p.find_or_create_local_worktree("/root-1", true, cx)
3982            })
3983            .await
3984            .unwrap();
3985        worktree_a
3986            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3987            .await;
3988        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3989
3990        // Join the worktree as client B.
3991        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3992
3993        // Open the file on client B.
3994        let buffer_b = cx_b
3995            .background()
3996            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3997            .await
3998            .unwrap();
3999
4000        // Request document highlights as the guest.
4001        let fake_language_server = fake_language_servers.next().await.unwrap();
4002        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
4003            |params, _| async move {
4004                assert_eq!(
4005                    params
4006                        .text_document_position_params
4007                        .text_document
4008                        .uri
4009                        .as_str(),
4010                    "file:///root-1/main.rs"
4011                );
4012                assert_eq!(
4013                    params.text_document_position_params.position,
4014                    lsp::Position::new(0, 34)
4015                );
4016                Ok(Some(vec![
4017                    lsp::DocumentHighlight {
4018                        kind: Some(lsp::DocumentHighlightKind::WRITE),
4019                        range: lsp::Range::new(
4020                            lsp::Position::new(0, 10),
4021                            lsp::Position::new(0, 16),
4022                        ),
4023                    },
4024                    lsp::DocumentHighlight {
4025                        kind: Some(lsp::DocumentHighlightKind::READ),
4026                        range: lsp::Range::new(
4027                            lsp::Position::new(0, 32),
4028                            lsp::Position::new(0, 38),
4029                        ),
4030                    },
4031                    lsp::DocumentHighlight {
4032                        kind: Some(lsp::DocumentHighlightKind::READ),
4033                        range: lsp::Range::new(
4034                            lsp::Position::new(0, 41),
4035                            lsp::Position::new(0, 47),
4036                        ),
4037                    },
4038                ]))
4039            },
4040        );
4041
4042        let highlights = project_b
4043            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
4044            .await
4045            .unwrap();
4046        buffer_b.read_with(cx_b, |buffer, _| {
4047            let snapshot = buffer.snapshot();
4048
4049            let highlights = highlights
4050                .into_iter()
4051                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
4052                .collect::<Vec<_>>();
4053            assert_eq!(
4054                highlights,
4055                &[
4056                    (lsp::DocumentHighlightKind::WRITE, 10..16),
4057                    (lsp::DocumentHighlightKind::READ, 32..38),
4058                    (lsp::DocumentHighlightKind::READ, 41..47)
4059                ]
4060            )
4061        });
4062    }
4063
4064    #[gpui::test(iterations = 10)]
4065    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4066        cx_a.foreground().forbid_parking();
4067        let lang_registry = Arc::new(LanguageRegistry::test());
4068        let fs = FakeFs::new(cx_a.background());
4069        fs.insert_tree(
4070            "/code",
4071            json!({
4072                "crate-1": {
4073                    "one.rs": "const ONE: usize = 1;",
4074                },
4075                "crate-2": {
4076                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
4077                },
4078                "private": {
4079                    "passwords.txt": "the-password",
4080                }
4081            }),
4082        )
4083        .await;
4084
4085        // Set up a fake language server.
4086        let mut language = Language::new(
4087            LanguageConfig {
4088                name: "Rust".into(),
4089                path_suffixes: vec!["rs".to_string()],
4090                ..Default::default()
4091            },
4092            Some(tree_sitter_rust::language()),
4093        );
4094        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4095        lang_registry.add(Arc::new(language));
4096
4097        // Connect to a server as 2 clients.
4098        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4099        let client_a = server.create_client(cx_a, "user_a").await;
4100        let mut client_b = server.create_client(cx_b, "user_b").await;
4101        server
4102            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4103            .await;
4104
4105        // Share a project as client A
4106        let project_a = cx_a.update(|cx| {
4107            Project::local(
4108                client_a.clone(),
4109                client_a.user_store.clone(),
4110                lang_registry.clone(),
4111                fs.clone(),
4112                cx,
4113            )
4114        });
4115        let (worktree_a, _) = project_a
4116            .update(cx_a, |p, cx| {
4117                p.find_or_create_local_worktree("/code/crate-1", true, cx)
4118            })
4119            .await
4120            .unwrap();
4121        worktree_a
4122            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4123            .await;
4124        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4125
4126        // Join the worktree as client B.
4127        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4128
4129        // Cause the language server to start.
4130        let _buffer = cx_b
4131            .background()
4132            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
4133            .await
4134            .unwrap();
4135
4136        let fake_language_server = fake_language_servers.next().await.unwrap();
4137        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
4138            |_, _| async move {
4139                #[allow(deprecated)]
4140                Ok(Some(vec![lsp::SymbolInformation {
4141                    name: "TWO".into(),
4142                    location: lsp::Location {
4143                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
4144                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4145                    },
4146                    kind: lsp::SymbolKind::CONSTANT,
4147                    tags: None,
4148                    container_name: None,
4149                    deprecated: None,
4150                }]))
4151            },
4152        );
4153
4154        // Request the definition of a symbol as the guest.
4155        let symbols = project_b
4156            .update(cx_b, |p, cx| p.symbols("two", cx))
4157            .await
4158            .unwrap();
4159        assert_eq!(symbols.len(), 1);
4160        assert_eq!(symbols[0].name, "TWO");
4161
4162        // Open one of the returned symbols.
4163        let buffer_b_2 = project_b
4164            .update(cx_b, |project, cx| {
4165                project.open_buffer_for_symbol(&symbols[0], cx)
4166            })
4167            .await
4168            .unwrap();
4169        buffer_b_2.read_with(cx_b, |buffer, _| {
4170            assert_eq!(
4171                buffer.file().unwrap().path().as_ref(),
4172                Path::new("../crate-2/two.rs")
4173            );
4174        });
4175
4176        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
4177        let mut fake_symbol = symbols[0].clone();
4178        fake_symbol.path = Path::new("/code/secrets").into();
4179        let error = project_b
4180            .update(cx_b, |project, cx| {
4181                project.open_buffer_for_symbol(&fake_symbol, cx)
4182            })
4183            .await
4184            .unwrap_err();
4185        assert!(error.to_string().contains("invalid symbol signature"));
4186    }
4187
4188    #[gpui::test(iterations = 10)]
4189    async fn test_open_buffer_while_getting_definition_pointing_to_it(
4190        cx_a: &mut TestAppContext,
4191        cx_b: &mut TestAppContext,
4192        mut rng: StdRng,
4193    ) {
4194        cx_a.foreground().forbid_parking();
4195        let lang_registry = Arc::new(LanguageRegistry::test());
4196        let fs = FakeFs::new(cx_a.background());
4197        fs.insert_tree(
4198            "/root",
4199            json!({
4200                "a.rs": "const ONE: usize = b::TWO;",
4201                "b.rs": "const TWO: usize = 2",
4202            }),
4203        )
4204        .await;
4205
4206        // Set up a fake language server.
4207        let mut language = Language::new(
4208            LanguageConfig {
4209                name: "Rust".into(),
4210                path_suffixes: vec!["rs".to_string()],
4211                ..Default::default()
4212            },
4213            Some(tree_sitter_rust::language()),
4214        );
4215        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4216        lang_registry.add(Arc::new(language));
4217
4218        // Connect to a server as 2 clients.
4219        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4220        let client_a = server.create_client(cx_a, "user_a").await;
4221        let mut client_b = server.create_client(cx_b, "user_b").await;
4222        server
4223            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4224            .await;
4225
4226        // Share a project as client A
4227        let project_a = cx_a.update(|cx| {
4228            Project::local(
4229                client_a.clone(),
4230                client_a.user_store.clone(),
4231                lang_registry.clone(),
4232                fs.clone(),
4233                cx,
4234            )
4235        });
4236
4237        let (worktree_a, _) = project_a
4238            .update(cx_a, |p, cx| {
4239                p.find_or_create_local_worktree("/root", true, cx)
4240            })
4241            .await
4242            .unwrap();
4243        worktree_a
4244            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4245            .await;
4246        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4247
4248        // Join the project as client B.
4249        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4250
4251        let buffer_b1 = cx_b
4252            .background()
4253            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4254            .await
4255            .unwrap();
4256
4257        let fake_language_server = fake_language_servers.next().await.unwrap();
4258        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4259            |_, _| async move {
4260                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4261                    lsp::Location::new(
4262                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
4263                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4264                    ),
4265                )))
4266            },
4267        );
4268
4269        let definitions;
4270        let buffer_b2;
4271        if rng.gen() {
4272            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4273            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4274        } else {
4275            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4276            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4277        }
4278
4279        let buffer_b2 = buffer_b2.await.unwrap();
4280        let definitions = definitions.await.unwrap();
4281        assert_eq!(definitions.len(), 1);
4282        assert_eq!(definitions[0].buffer, buffer_b2);
4283    }
4284
4285    #[gpui::test(iterations = 10)]
4286    async fn test_collaborating_with_code_actions(
4287        cx_a: &mut TestAppContext,
4288        cx_b: &mut TestAppContext,
4289    ) {
4290        cx_a.foreground().forbid_parking();
4291        let lang_registry = Arc::new(LanguageRegistry::test());
4292        let fs = FakeFs::new(cx_a.background());
4293        cx_b.update(|cx| editor::init(cx));
4294
4295        // Set up a fake language server.
4296        let mut language = Language::new(
4297            LanguageConfig {
4298                name: "Rust".into(),
4299                path_suffixes: vec!["rs".to_string()],
4300                ..Default::default()
4301            },
4302            Some(tree_sitter_rust::language()),
4303        );
4304        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4305        lang_registry.add(Arc::new(language));
4306
4307        // Connect to a server as 2 clients.
4308        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4309        let client_a = server.create_client(cx_a, "user_a").await;
4310        let mut client_b = server.create_client(cx_b, "user_b").await;
4311        server
4312            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4313            .await;
4314
4315        // Share a project as client A
4316        fs.insert_tree(
4317            "/a",
4318            json!({
4319                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4320                "other.rs": "pub fn foo() -> usize { 4 }",
4321            }),
4322        )
4323        .await;
4324        let project_a = cx_a.update(|cx| {
4325            Project::local(
4326                client_a.clone(),
4327                client_a.user_store.clone(),
4328                lang_registry.clone(),
4329                fs.clone(),
4330                cx,
4331            )
4332        });
4333        let (worktree_a, _) = project_a
4334            .update(cx_a, |p, cx| {
4335                p.find_or_create_local_worktree("/a", true, cx)
4336            })
4337            .await
4338            .unwrap();
4339        worktree_a
4340            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4341            .await;
4342        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4343
4344        // Join the project as client B.
4345        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4346        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(project_b.clone(), cx));
4347        let editor_b = workspace_b
4348            .update(cx_b, |workspace, cx| {
4349                workspace.open_path((worktree_id, "main.rs"), true, cx)
4350            })
4351            .await
4352            .unwrap()
4353            .downcast::<Editor>()
4354            .unwrap();
4355
4356        let mut fake_language_server = fake_language_servers.next().await.unwrap();
4357        fake_language_server
4358            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4359                assert_eq!(
4360                    params.text_document.uri,
4361                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4362                );
4363                assert_eq!(params.range.start, lsp::Position::new(0, 0));
4364                assert_eq!(params.range.end, lsp::Position::new(0, 0));
4365                Ok(None)
4366            })
4367            .next()
4368            .await;
4369
4370        // Move cursor to a location that contains code actions.
4371        editor_b.update(cx_b, |editor, cx| {
4372            editor.change_selections(None, cx, |s| {
4373                s.select_ranges([Point::new(1, 31)..Point::new(1, 31)])
4374            });
4375            cx.focus(&editor_b);
4376        });
4377
4378        fake_language_server
4379            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4380                assert_eq!(
4381                    params.text_document.uri,
4382                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4383                );
4384                assert_eq!(params.range.start, lsp::Position::new(1, 31));
4385                assert_eq!(params.range.end, lsp::Position::new(1, 31));
4386
4387                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4388                    lsp::CodeAction {
4389                        title: "Inline into all callers".to_string(),
4390                        edit: Some(lsp::WorkspaceEdit {
4391                            changes: Some(
4392                                [
4393                                    (
4394                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
4395                                        vec![lsp::TextEdit::new(
4396                                            lsp::Range::new(
4397                                                lsp::Position::new(1, 22),
4398                                                lsp::Position::new(1, 34),
4399                                            ),
4400                                            "4".to_string(),
4401                                        )],
4402                                    ),
4403                                    (
4404                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
4405                                        vec![lsp::TextEdit::new(
4406                                            lsp::Range::new(
4407                                                lsp::Position::new(0, 0),
4408                                                lsp::Position::new(0, 27),
4409                                            ),
4410                                            "".to_string(),
4411                                        )],
4412                                    ),
4413                                ]
4414                                .into_iter()
4415                                .collect(),
4416                            ),
4417                            ..Default::default()
4418                        }),
4419                        data: Some(json!({
4420                            "codeActionParams": {
4421                                "range": {
4422                                    "start": {"line": 1, "column": 31},
4423                                    "end": {"line": 1, "column": 31},
4424                                }
4425                            }
4426                        })),
4427                        ..Default::default()
4428                    },
4429                )]))
4430            })
4431            .next()
4432            .await;
4433
4434        // Toggle code actions and wait for them to display.
4435        editor_b.update(cx_b, |editor, cx| {
4436            editor.toggle_code_actions(
4437                &ToggleCodeActions {
4438                    deployed_from_indicator: false,
4439                },
4440                cx,
4441            );
4442        });
4443        editor_b
4444            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4445            .await;
4446
4447        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4448
4449        // Confirming the code action will trigger a resolve request.
4450        let confirm_action = workspace_b
4451            .update(cx_b, |workspace, cx| {
4452                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4453            })
4454            .unwrap();
4455        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4456            |_, _| async move {
4457                Ok(lsp::CodeAction {
4458                    title: "Inline into all callers".to_string(),
4459                    edit: Some(lsp::WorkspaceEdit {
4460                        changes: Some(
4461                            [
4462                                (
4463                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4464                                    vec![lsp::TextEdit::new(
4465                                        lsp::Range::new(
4466                                            lsp::Position::new(1, 22),
4467                                            lsp::Position::new(1, 34),
4468                                        ),
4469                                        "4".to_string(),
4470                                    )],
4471                                ),
4472                                (
4473                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4474                                    vec![lsp::TextEdit::new(
4475                                        lsp::Range::new(
4476                                            lsp::Position::new(0, 0),
4477                                            lsp::Position::new(0, 27),
4478                                        ),
4479                                        "".to_string(),
4480                                    )],
4481                                ),
4482                            ]
4483                            .into_iter()
4484                            .collect(),
4485                        ),
4486                        ..Default::default()
4487                    }),
4488                    ..Default::default()
4489                })
4490            },
4491        );
4492
4493        // After the action is confirmed, an editor containing both modified files is opened.
4494        confirm_action.await.unwrap();
4495        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4496            workspace
4497                .active_item(cx)
4498                .unwrap()
4499                .downcast::<Editor>()
4500                .unwrap()
4501        });
4502        code_action_editor.update(cx_b, |editor, cx| {
4503            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4504            editor.undo(&Undo, cx);
4505            assert_eq!(
4506                editor.text(cx),
4507                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4508            );
4509            editor.redo(&Redo, cx);
4510            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4511        });
4512    }
4513
4514    #[gpui::test(iterations = 10)]
4515    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4516        cx_a.foreground().forbid_parking();
4517        let lang_registry = Arc::new(LanguageRegistry::test());
4518        let fs = FakeFs::new(cx_a.background());
4519        cx_b.update(|cx| editor::init(cx));
4520
4521        // Set up a fake language server.
4522        let mut language = Language::new(
4523            LanguageConfig {
4524                name: "Rust".into(),
4525                path_suffixes: vec!["rs".to_string()],
4526                ..Default::default()
4527            },
4528            Some(tree_sitter_rust::language()),
4529        );
4530        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4531            capabilities: lsp::ServerCapabilities {
4532                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4533                    prepare_provider: Some(true),
4534                    work_done_progress_options: Default::default(),
4535                })),
4536                ..Default::default()
4537            },
4538            ..Default::default()
4539        });
4540        lang_registry.add(Arc::new(language));
4541
4542        // Connect to a server as 2 clients.
4543        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4544        let client_a = server.create_client(cx_a, "user_a").await;
4545        let mut client_b = server.create_client(cx_b, "user_b").await;
4546        server
4547            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4548            .await;
4549
4550        // Share a project as client A
4551        fs.insert_tree(
4552            "/dir",
4553            json!({
4554                "one.rs": "const ONE: usize = 1;",
4555                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4556            }),
4557        )
4558        .await;
4559        let project_a = cx_a.update(|cx| {
4560            Project::local(
4561                client_a.clone(),
4562                client_a.user_store.clone(),
4563                lang_registry.clone(),
4564                fs.clone(),
4565                cx,
4566            )
4567        });
4568        let (worktree_a, _) = project_a
4569            .update(cx_a, |p, cx| {
4570                p.find_or_create_local_worktree("/dir", true, cx)
4571            })
4572            .await
4573            .unwrap();
4574        worktree_a
4575            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4576            .await;
4577        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4578
4579        // Join the worktree as client B.
4580        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4581        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(project_b.clone(), cx));
4582        let editor_b = workspace_b
4583            .update(cx_b, |workspace, cx| {
4584                workspace.open_path((worktree_id, "one.rs"), true, cx)
4585            })
4586            .await
4587            .unwrap()
4588            .downcast::<Editor>()
4589            .unwrap();
4590        let fake_language_server = fake_language_servers.next().await.unwrap();
4591
4592        // Move cursor to a location that can be renamed.
4593        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4594            editor.change_selections(None, cx, |s| s.select_ranges([7..7]));
4595            editor.rename(&Rename, cx).unwrap()
4596        });
4597
4598        fake_language_server
4599            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4600                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4601                assert_eq!(params.position, lsp::Position::new(0, 7));
4602                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4603                    lsp::Position::new(0, 6),
4604                    lsp::Position::new(0, 9),
4605                ))))
4606            })
4607            .next()
4608            .await
4609            .unwrap();
4610        prepare_rename.await.unwrap();
4611        editor_b.update(cx_b, |editor, cx| {
4612            let rename = editor.pending_rename().unwrap();
4613            let buffer = editor.buffer().read(cx).snapshot(cx);
4614            assert_eq!(
4615                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4616                6..9
4617            );
4618            rename.editor.update(cx, |rename_editor, cx| {
4619                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4620                    rename_buffer.edit([(0..3, "THREE")], cx);
4621                });
4622            });
4623        });
4624
4625        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4626            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4627        });
4628        fake_language_server
4629            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4630                assert_eq!(
4631                    params.text_document_position.text_document.uri.as_str(),
4632                    "file:///dir/one.rs"
4633                );
4634                assert_eq!(
4635                    params.text_document_position.position,
4636                    lsp::Position::new(0, 6)
4637                );
4638                assert_eq!(params.new_name, "THREE");
4639                Ok(Some(lsp::WorkspaceEdit {
4640                    changes: Some(
4641                        [
4642                            (
4643                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4644                                vec![lsp::TextEdit::new(
4645                                    lsp::Range::new(
4646                                        lsp::Position::new(0, 6),
4647                                        lsp::Position::new(0, 9),
4648                                    ),
4649                                    "THREE".to_string(),
4650                                )],
4651                            ),
4652                            (
4653                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4654                                vec![
4655                                    lsp::TextEdit::new(
4656                                        lsp::Range::new(
4657                                            lsp::Position::new(0, 24),
4658                                            lsp::Position::new(0, 27),
4659                                        ),
4660                                        "THREE".to_string(),
4661                                    ),
4662                                    lsp::TextEdit::new(
4663                                        lsp::Range::new(
4664                                            lsp::Position::new(0, 35),
4665                                            lsp::Position::new(0, 38),
4666                                        ),
4667                                        "THREE".to_string(),
4668                                    ),
4669                                ],
4670                            ),
4671                        ]
4672                        .into_iter()
4673                        .collect(),
4674                    ),
4675                    ..Default::default()
4676                }))
4677            })
4678            .next()
4679            .await
4680            .unwrap();
4681        confirm_rename.await.unwrap();
4682
4683        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4684            workspace
4685                .active_item(cx)
4686                .unwrap()
4687                .downcast::<Editor>()
4688                .unwrap()
4689        });
4690        rename_editor.update(cx_b, |editor, cx| {
4691            assert_eq!(
4692                editor.text(cx),
4693                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4694            );
4695            editor.undo(&Undo, cx);
4696            assert_eq!(
4697                editor.text(cx),
4698                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4699            );
4700            editor.redo(&Redo, cx);
4701            assert_eq!(
4702                editor.text(cx),
4703                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4704            );
4705        });
4706
4707        // Ensure temporary rename edits cannot be undone/redone.
4708        editor_b.update(cx_b, |editor, cx| {
4709            editor.undo(&Undo, cx);
4710            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4711            editor.undo(&Undo, cx);
4712            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4713            editor.redo(&Redo, cx);
4714            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4715        })
4716    }
4717
4718    #[gpui::test(iterations = 10)]
4719    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4720        cx_a.foreground().forbid_parking();
4721
4722        // Connect to a server as 2 clients.
4723        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4724        let client_a = server.create_client(cx_a, "user_a").await;
4725        let client_b = server.create_client(cx_b, "user_b").await;
4726
4727        // Create an org that includes these 2 users.
4728        let db = &server.app_state.db;
4729        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4730        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4731            .await
4732            .unwrap();
4733        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4734            .await
4735            .unwrap();
4736
4737        // Create a channel that includes all the users.
4738        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4739        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4740            .await
4741            .unwrap();
4742        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4743            .await
4744            .unwrap();
4745        db.create_channel_message(
4746            channel_id,
4747            client_b.current_user_id(&cx_b),
4748            "hello A, it's B.",
4749            OffsetDateTime::now_utc(),
4750            1,
4751        )
4752        .await
4753        .unwrap();
4754
4755        let channels_a = cx_a
4756            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4757        channels_a
4758            .condition(cx_a, |list, _| list.available_channels().is_some())
4759            .await;
4760        channels_a.read_with(cx_a, |list, _| {
4761            assert_eq!(
4762                list.available_channels().unwrap(),
4763                &[ChannelDetails {
4764                    id: channel_id.to_proto(),
4765                    name: "test-channel".to_string()
4766                }]
4767            )
4768        });
4769        let channel_a = channels_a.update(cx_a, |this, cx| {
4770            this.get_channel(channel_id.to_proto(), cx).unwrap()
4771        });
4772        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4773        channel_a
4774            .condition(&cx_a, |channel, _| {
4775                channel_messages(channel)
4776                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4777            })
4778            .await;
4779
4780        let channels_b = cx_b
4781            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4782        channels_b
4783            .condition(cx_b, |list, _| list.available_channels().is_some())
4784            .await;
4785        channels_b.read_with(cx_b, |list, _| {
4786            assert_eq!(
4787                list.available_channels().unwrap(),
4788                &[ChannelDetails {
4789                    id: channel_id.to_proto(),
4790                    name: "test-channel".to_string()
4791                }]
4792            )
4793        });
4794
4795        let channel_b = channels_b.update(cx_b, |this, cx| {
4796            this.get_channel(channel_id.to_proto(), cx).unwrap()
4797        });
4798        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4799        channel_b
4800            .condition(&cx_b, |channel, _| {
4801                channel_messages(channel)
4802                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4803            })
4804            .await;
4805
4806        channel_a
4807            .update(cx_a, |channel, cx| {
4808                channel
4809                    .send_message("oh, hi B.".to_string(), cx)
4810                    .unwrap()
4811                    .detach();
4812                let task = channel.send_message("sup".to_string(), cx).unwrap();
4813                assert_eq!(
4814                    channel_messages(channel),
4815                    &[
4816                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4817                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4818                        ("user_a".to_string(), "sup".to_string(), true)
4819                    ]
4820                );
4821                task
4822            })
4823            .await
4824            .unwrap();
4825
4826        channel_b
4827            .condition(&cx_b, |channel, _| {
4828                channel_messages(channel)
4829                    == [
4830                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4831                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4832                        ("user_a".to_string(), "sup".to_string(), false),
4833                    ]
4834            })
4835            .await;
4836
4837        assert_eq!(
4838            server
4839                .state()
4840                .await
4841                .channel(channel_id)
4842                .unwrap()
4843                .connection_ids
4844                .len(),
4845            2
4846        );
4847        cx_b.update(|_| drop(channel_b));
4848        server
4849            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4850            .await;
4851
4852        cx_a.update(|_| drop(channel_a));
4853        server
4854            .condition(|state| state.channel(channel_id).is_none())
4855            .await;
4856    }
4857
4858    #[gpui::test(iterations = 10)]
4859    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4860        cx_a.foreground().forbid_parking();
4861
4862        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4863        let client_a = server.create_client(cx_a, "user_a").await;
4864
4865        let db = &server.app_state.db;
4866        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4867        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4868        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4869            .await
4870            .unwrap();
4871        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4872            .await
4873            .unwrap();
4874
4875        let channels_a = cx_a
4876            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4877        channels_a
4878            .condition(cx_a, |list, _| list.available_channels().is_some())
4879            .await;
4880        let channel_a = channels_a.update(cx_a, |this, cx| {
4881            this.get_channel(channel_id.to_proto(), cx).unwrap()
4882        });
4883
4884        // Messages aren't allowed to be too long.
4885        channel_a
4886            .update(cx_a, |channel, cx| {
4887                let long_body = "this is long.\n".repeat(1024);
4888                channel.send_message(long_body, cx).unwrap()
4889            })
4890            .await
4891            .unwrap_err();
4892
4893        // Messages aren't allowed to be blank.
4894        channel_a.update(cx_a, |channel, cx| {
4895            channel.send_message(String::new(), cx).unwrap_err()
4896        });
4897
4898        // Leading and trailing whitespace are trimmed.
4899        channel_a
4900            .update(cx_a, |channel, cx| {
4901                channel
4902                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4903                    .unwrap()
4904            })
4905            .await
4906            .unwrap();
4907        assert_eq!(
4908            db.get_channel_messages(channel_id, 10, None)
4909                .await
4910                .unwrap()
4911                .iter()
4912                .map(|m| &m.body)
4913                .collect::<Vec<_>>(),
4914            &["surrounded by whitespace"]
4915        );
4916    }
4917
4918    #[gpui::test(iterations = 10)]
4919    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4920        cx_a.foreground().forbid_parking();
4921
4922        // Connect to a server as 2 clients.
4923        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4924        let client_a = server.create_client(cx_a, "user_a").await;
4925        let client_b = server.create_client(cx_b, "user_b").await;
4926        let mut status_b = client_b.status();
4927
4928        // Create an org that includes these 2 users.
4929        let db = &server.app_state.db;
4930        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4931        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4932            .await
4933            .unwrap();
4934        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4935            .await
4936            .unwrap();
4937
4938        // Create a channel that includes all the users.
4939        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4940        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4941            .await
4942            .unwrap();
4943        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4944            .await
4945            .unwrap();
4946        db.create_channel_message(
4947            channel_id,
4948            client_b.current_user_id(&cx_b),
4949            "hello A, it's B.",
4950            OffsetDateTime::now_utc(),
4951            2,
4952        )
4953        .await
4954        .unwrap();
4955
4956        let channels_a = cx_a
4957            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4958        channels_a
4959            .condition(cx_a, |list, _| list.available_channels().is_some())
4960            .await;
4961
4962        channels_a.read_with(cx_a, |list, _| {
4963            assert_eq!(
4964                list.available_channels().unwrap(),
4965                &[ChannelDetails {
4966                    id: channel_id.to_proto(),
4967                    name: "test-channel".to_string()
4968                }]
4969            )
4970        });
4971        let channel_a = channels_a.update(cx_a, |this, cx| {
4972            this.get_channel(channel_id.to_proto(), cx).unwrap()
4973        });
4974        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4975        channel_a
4976            .condition(&cx_a, |channel, _| {
4977                channel_messages(channel)
4978                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4979            })
4980            .await;
4981
4982        let channels_b = cx_b
4983            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4984        channels_b
4985            .condition(cx_b, |list, _| list.available_channels().is_some())
4986            .await;
4987        channels_b.read_with(cx_b, |list, _| {
4988            assert_eq!(
4989                list.available_channels().unwrap(),
4990                &[ChannelDetails {
4991                    id: channel_id.to_proto(),
4992                    name: "test-channel".to_string()
4993                }]
4994            )
4995        });
4996
4997        let channel_b = channels_b.update(cx_b, |this, cx| {
4998            this.get_channel(channel_id.to_proto(), cx).unwrap()
4999        });
5000        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
5001        channel_b
5002            .condition(&cx_b, |channel, _| {
5003                channel_messages(channel)
5004                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5005            })
5006            .await;
5007
5008        // Disconnect client B, ensuring we can still access its cached channel data.
5009        server.forbid_connections();
5010        server.disconnect_client(client_b.current_user_id(&cx_b));
5011        cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
5012        while !matches!(
5013            status_b.next().await,
5014            Some(client::Status::ReconnectionError { .. })
5015        ) {}
5016
5017        channels_b.read_with(cx_b, |channels, _| {
5018            assert_eq!(
5019                channels.available_channels().unwrap(),
5020                [ChannelDetails {
5021                    id: channel_id.to_proto(),
5022                    name: "test-channel".to_string()
5023                }]
5024            )
5025        });
5026        channel_b.read_with(cx_b, |channel, _| {
5027            assert_eq!(
5028                channel_messages(channel),
5029                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5030            )
5031        });
5032
5033        // Send a message from client B while it is disconnected.
5034        channel_b
5035            .update(cx_b, |channel, cx| {
5036                let task = channel
5037                    .send_message("can you see this?".to_string(), cx)
5038                    .unwrap();
5039                assert_eq!(
5040                    channel_messages(channel),
5041                    &[
5042                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5043                        ("user_b".to_string(), "can you see this?".to_string(), true)
5044                    ]
5045                );
5046                task
5047            })
5048            .await
5049            .unwrap_err();
5050
5051        // Send a message from client A while B is disconnected.
5052        channel_a
5053            .update(cx_a, |channel, cx| {
5054                channel
5055                    .send_message("oh, hi B.".to_string(), cx)
5056                    .unwrap()
5057                    .detach();
5058                let task = channel.send_message("sup".to_string(), cx).unwrap();
5059                assert_eq!(
5060                    channel_messages(channel),
5061                    &[
5062                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5063                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
5064                        ("user_a".to_string(), "sup".to_string(), true)
5065                    ]
5066                );
5067                task
5068            })
5069            .await
5070            .unwrap();
5071
5072        // Give client B a chance to reconnect.
5073        server.allow_connections();
5074        cx_b.foreground().advance_clock(Duration::from_secs(10));
5075
5076        // Verify that B sees the new messages upon reconnection, as well as the message client B
5077        // sent while offline.
5078        channel_b
5079            .condition(&cx_b, |channel, _| {
5080                channel_messages(channel)
5081                    == [
5082                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5083                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
5084                        ("user_a".to_string(), "sup".to_string(), false),
5085                        ("user_b".to_string(), "can you see this?".to_string(), false),
5086                    ]
5087            })
5088            .await;
5089
5090        // Ensure client A and B can communicate normally after reconnection.
5091        channel_a
5092            .update(cx_a, |channel, cx| {
5093                channel.send_message("you online?".to_string(), cx).unwrap()
5094            })
5095            .await
5096            .unwrap();
5097        channel_b
5098            .condition(&cx_b, |channel, _| {
5099                channel_messages(channel)
5100                    == [
5101                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5102                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
5103                        ("user_a".to_string(), "sup".to_string(), false),
5104                        ("user_b".to_string(), "can you see this?".to_string(), false),
5105                        ("user_a".to_string(), "you online?".to_string(), false),
5106                    ]
5107            })
5108            .await;
5109
5110        channel_b
5111            .update(cx_b, |channel, cx| {
5112                channel.send_message("yep".to_string(), cx).unwrap()
5113            })
5114            .await
5115            .unwrap();
5116        channel_a
5117            .condition(&cx_a, |channel, _| {
5118                channel_messages(channel)
5119                    == [
5120                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5121                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
5122                        ("user_a".to_string(), "sup".to_string(), false),
5123                        ("user_b".to_string(), "can you see this?".to_string(), false),
5124                        ("user_a".to_string(), "you online?".to_string(), false),
5125                        ("user_b".to_string(), "yep".to_string(), false),
5126                    ]
5127            })
5128            .await;
5129    }
5130
5131    #[gpui::test(iterations = 10)]
5132    async fn test_contacts(
5133        deterministic: Arc<Deterministic>,
5134        cx_a: &mut TestAppContext,
5135        cx_b: &mut TestAppContext,
5136        cx_c: &mut TestAppContext,
5137    ) {
5138        cx_a.foreground().forbid_parking();
5139
5140        // Connect to a server as 3 clients.
5141        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5142        let mut client_a = server.create_client(cx_a, "user_a").await;
5143        let mut client_b = server.create_client(cx_b, "user_b").await;
5144        let client_c = server.create_client(cx_c, "user_c").await;
5145        server
5146            .make_contacts(vec![
5147                (&client_a, cx_a),
5148                (&client_b, cx_b),
5149                (&client_c, cx_c),
5150            ])
5151            .await;
5152
5153        deterministic.run_until_parked();
5154        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5155            client.user_store.read_with(*cx, |store, _| {
5156                assert_eq!(
5157                    contacts(store),
5158                    [
5159                        ("user_a", true, vec![]),
5160                        ("user_b", true, vec![]),
5161                        ("user_c", true, vec![])
5162                    ],
5163                    "{} has the wrong contacts",
5164                    client.username
5165                )
5166            });
5167        }
5168
5169        // Share a project as client A.
5170        let fs = FakeFs::new(cx_a.background());
5171        fs.create_dir(Path::new("/a")).await.unwrap();
5172        let (project_a, _) = client_a.build_local_project(fs, "/a", cx_a).await;
5173
5174        deterministic.run_until_parked();
5175        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5176            client.user_store.read_with(*cx, |store, _| {
5177                assert_eq!(
5178                    contacts(store),
5179                    [
5180                        ("user_a", true, vec![("a", vec![])]),
5181                        ("user_b", true, vec![]),
5182                        ("user_c", true, vec![])
5183                    ],
5184                    "{} has the wrong contacts",
5185                    client.username
5186                )
5187            });
5188        }
5189
5190        let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5191
5192        deterministic.run_until_parked();
5193        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5194            client.user_store.read_with(*cx, |store, _| {
5195                assert_eq!(
5196                    contacts(store),
5197                    [
5198                        ("user_a", true, vec![("a", vec!["user_b"])]),
5199                        ("user_b", true, vec![]),
5200                        ("user_c", true, vec![])
5201                    ],
5202                    "{} has the wrong contacts",
5203                    client.username
5204                )
5205            });
5206        }
5207
5208        // Add a local project as client B
5209        let fs = FakeFs::new(cx_b.background());
5210        fs.create_dir(Path::new("/b")).await.unwrap();
5211        let (_project_b, _) = client_b.build_local_project(fs, "/b", cx_a).await;
5212
5213        deterministic.run_until_parked();
5214        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5215            client.user_store.read_with(*cx, |store, _| {
5216                assert_eq!(
5217                    contacts(store),
5218                    [
5219                        ("user_a", true, vec![("a", vec!["user_b"])]),
5220                        ("user_b", true, vec![("b", vec![])]),
5221                        ("user_c", true, vec![])
5222                    ],
5223                    "{} has the wrong contacts",
5224                    client.username
5225                )
5226            });
5227        }
5228
5229        project_a
5230            .condition(&cx_a, |project, _| {
5231                project.collaborators().contains_key(&client_b.peer_id)
5232            })
5233            .await;
5234
5235        client_a.project.take();
5236        cx_a.update(move |_| drop(project_a));
5237        deterministic.run_until_parked();
5238        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5239            client.user_store.read_with(*cx, |store, _| {
5240                assert_eq!(
5241                    contacts(store),
5242                    [
5243                        ("user_a", true, vec![]),
5244                        ("user_b", true, vec![("b", vec![])]),
5245                        ("user_c", true, vec![])
5246                    ],
5247                    "{} has the wrong contacts",
5248                    client.username
5249                )
5250            });
5251        }
5252
5253        server.disconnect_client(client_c.current_user_id(cx_c));
5254        server.forbid_connections();
5255        deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5256        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5257            client.user_store.read_with(*cx, |store, _| {
5258                assert_eq!(
5259                    contacts(store),
5260                    [
5261                        ("user_a", true, vec![]),
5262                        ("user_b", true, vec![("b", vec![])]),
5263                        ("user_c", false, vec![])
5264                    ],
5265                    "{} has the wrong contacts",
5266                    client.username
5267                )
5268            });
5269        }
5270        client_c
5271            .user_store
5272            .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5273
5274        server.allow_connections();
5275        client_c
5276            .authenticate_and_connect(false, &cx_c.to_async())
5277            .await
5278            .unwrap();
5279
5280        deterministic.run_until_parked();
5281        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5282            client.user_store.read_with(*cx, |store, _| {
5283                assert_eq!(
5284                    contacts(store),
5285                    [
5286                        ("user_a", true, vec![]),
5287                        ("user_b", true, vec![("b", vec![])]),
5288                        ("user_c", true, vec![])
5289                    ],
5290                    "{} has the wrong contacts",
5291                    client.username
5292                )
5293            });
5294        }
5295
5296        fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, Vec<&str>)>)> {
5297            user_store
5298                .contacts()
5299                .iter()
5300                .map(|contact| {
5301                    let projects = contact
5302                        .projects
5303                        .iter()
5304                        .map(|p| {
5305                            (
5306                                p.worktree_root_names[0].as_str(),
5307                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5308                            )
5309                        })
5310                        .collect();
5311                    (contact.user.github_login.as_str(), contact.online, projects)
5312                })
5313                .collect()
5314        }
5315    }
5316
5317    #[gpui::test(iterations = 10)]
5318    async fn test_contact_requests(
5319        executor: Arc<Deterministic>,
5320        cx_a: &mut TestAppContext,
5321        cx_a2: &mut TestAppContext,
5322        cx_b: &mut TestAppContext,
5323        cx_b2: &mut TestAppContext,
5324        cx_c: &mut TestAppContext,
5325        cx_c2: &mut TestAppContext,
5326    ) {
5327        cx_a.foreground().forbid_parking();
5328
5329        // Connect to a server as 3 clients.
5330        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5331        let client_a = server.create_client(cx_a, "user_a").await;
5332        let client_a2 = server.create_client(cx_a2, "user_a").await;
5333        let client_b = server.create_client(cx_b, "user_b").await;
5334        let client_b2 = server.create_client(cx_b2, "user_b").await;
5335        let client_c = server.create_client(cx_c, "user_c").await;
5336        let client_c2 = server.create_client(cx_c2, "user_c").await;
5337
5338        assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5339        assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5340        assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5341
5342        // User A and User C request that user B become their contact.
5343        client_a
5344            .user_store
5345            .update(cx_a, |store, cx| {
5346                store.request_contact(client_b.user_id().unwrap(), cx)
5347            })
5348            .await
5349            .unwrap();
5350        client_c
5351            .user_store
5352            .update(cx_c, |store, cx| {
5353                store.request_contact(client_b.user_id().unwrap(), cx)
5354            })
5355            .await
5356            .unwrap();
5357        executor.run_until_parked();
5358
5359        // All users see the pending request appear in all their clients.
5360        assert_eq!(
5361            client_a.summarize_contacts(&cx_a).outgoing_requests,
5362            &["user_b"]
5363        );
5364        assert_eq!(
5365            client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5366            &["user_b"]
5367        );
5368        assert_eq!(
5369            client_b.summarize_contacts(&cx_b).incoming_requests,
5370            &["user_a", "user_c"]
5371        );
5372        assert_eq!(
5373            client_b2.summarize_contacts(&cx_b2).incoming_requests,
5374            &["user_a", "user_c"]
5375        );
5376        assert_eq!(
5377            client_c.summarize_contacts(&cx_c).outgoing_requests,
5378            &["user_b"]
5379        );
5380        assert_eq!(
5381            client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5382            &["user_b"]
5383        );
5384
5385        // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5386        disconnect_and_reconnect(&client_a, cx_a).await;
5387        disconnect_and_reconnect(&client_b, cx_b).await;
5388        disconnect_and_reconnect(&client_c, cx_c).await;
5389        executor.run_until_parked();
5390        assert_eq!(
5391            client_a.summarize_contacts(&cx_a).outgoing_requests,
5392            &["user_b"]
5393        );
5394        assert_eq!(
5395            client_b.summarize_contacts(&cx_b).incoming_requests,
5396            &["user_a", "user_c"]
5397        );
5398        assert_eq!(
5399            client_c.summarize_contacts(&cx_c).outgoing_requests,
5400            &["user_b"]
5401        );
5402
5403        // User B accepts the request from user A.
5404        client_b
5405            .user_store
5406            .update(cx_b, |store, cx| {
5407                store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5408            })
5409            .await
5410            .unwrap();
5411
5412        executor.run_until_parked();
5413
5414        // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5415        let contacts_b = client_b.summarize_contacts(&cx_b);
5416        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5417        assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5418        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5419        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5420        assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5421
5422        // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5423        let contacts_a = client_a.summarize_contacts(&cx_a);
5424        assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5425        assert!(contacts_a.outgoing_requests.is_empty());
5426        let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5427        assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5428        assert!(contacts_a2.outgoing_requests.is_empty());
5429
5430        // Contacts are present upon connecting (tested here via disconnect/reconnect)
5431        disconnect_and_reconnect(&client_a, cx_a).await;
5432        disconnect_and_reconnect(&client_b, cx_b).await;
5433        disconnect_and_reconnect(&client_c, cx_c).await;
5434        executor.run_until_parked();
5435        assert_eq!(
5436            client_a.summarize_contacts(&cx_a).current,
5437            &["user_a", "user_b"]
5438        );
5439        assert_eq!(
5440            client_b.summarize_contacts(&cx_b).current,
5441            &["user_a", "user_b"]
5442        );
5443        assert_eq!(
5444            client_b.summarize_contacts(&cx_b).incoming_requests,
5445            &["user_c"]
5446        );
5447        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5448        assert_eq!(
5449            client_c.summarize_contacts(&cx_c).outgoing_requests,
5450            &["user_b"]
5451        );
5452
5453        // User B rejects the request from user C.
5454        client_b
5455            .user_store
5456            .update(cx_b, |store, cx| {
5457                store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5458            })
5459            .await
5460            .unwrap();
5461
5462        executor.run_until_parked();
5463
5464        // User B doesn't see user C as their contact, and the incoming request from them is removed.
5465        let contacts_b = client_b.summarize_contacts(&cx_b);
5466        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5467        assert!(contacts_b.incoming_requests.is_empty());
5468        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5469        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5470        assert!(contacts_b2.incoming_requests.is_empty());
5471
5472        // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5473        let contacts_c = client_c.summarize_contacts(&cx_c);
5474        assert_eq!(contacts_c.current, &["user_c"]);
5475        assert!(contacts_c.outgoing_requests.is_empty());
5476        let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5477        assert_eq!(contacts_c2.current, &["user_c"]);
5478        assert!(contacts_c2.outgoing_requests.is_empty());
5479
5480        // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5481        disconnect_and_reconnect(&client_a, cx_a).await;
5482        disconnect_and_reconnect(&client_b, cx_b).await;
5483        disconnect_and_reconnect(&client_c, cx_c).await;
5484        executor.run_until_parked();
5485        assert_eq!(
5486            client_a.summarize_contacts(&cx_a).current,
5487            &["user_a", "user_b"]
5488        );
5489        assert_eq!(
5490            client_b.summarize_contacts(&cx_b).current,
5491            &["user_a", "user_b"]
5492        );
5493        assert!(client_b
5494            .summarize_contacts(&cx_b)
5495            .incoming_requests
5496            .is_empty());
5497        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5498        assert!(client_c
5499            .summarize_contacts(&cx_c)
5500            .outgoing_requests
5501            .is_empty());
5502
5503        async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5504            client.disconnect(&cx.to_async()).unwrap();
5505            client.clear_contacts(cx).await;
5506            client
5507                .authenticate_and_connect(false, &cx.to_async())
5508                .await
5509                .unwrap();
5510        }
5511    }
5512
5513    #[gpui::test(iterations = 10)]
5514    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5515        cx_a.foreground().forbid_parking();
5516        let fs = FakeFs::new(cx_a.background());
5517
5518        // 2 clients connect to a server.
5519        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5520        let mut client_a = server.create_client(cx_a, "user_a").await;
5521        let mut client_b = server.create_client(cx_b, "user_b").await;
5522        server
5523            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5524            .await;
5525        cx_a.update(editor::init);
5526        cx_b.update(editor::init);
5527
5528        // Client A shares a project.
5529        fs.insert_tree(
5530            "/a",
5531            json!({
5532                "1.txt": "one",
5533                "2.txt": "two",
5534                "3.txt": "three",
5535            }),
5536        )
5537        .await;
5538        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5539
5540        // Client B joins the project.
5541        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5542
5543        // Client A opens some editors.
5544        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5545        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5546        let editor_a1 = workspace_a
5547            .update(cx_a, |workspace, cx| {
5548                workspace.open_path((worktree_id, "1.txt"), true, cx)
5549            })
5550            .await
5551            .unwrap()
5552            .downcast::<Editor>()
5553            .unwrap();
5554        let editor_a2 = workspace_a
5555            .update(cx_a, |workspace, cx| {
5556                workspace.open_path((worktree_id, "2.txt"), true, cx)
5557            })
5558            .await
5559            .unwrap()
5560            .downcast::<Editor>()
5561            .unwrap();
5562
5563        // Client B opens an editor.
5564        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5565        let editor_b1 = workspace_b
5566            .update(cx_b, |workspace, cx| {
5567                workspace.open_path((worktree_id, "1.txt"), true, cx)
5568            })
5569            .await
5570            .unwrap()
5571            .downcast::<Editor>()
5572            .unwrap();
5573
5574        let client_a_id = project_b.read_with(cx_b, |project, _| {
5575            project.collaborators().values().next().unwrap().peer_id
5576        });
5577        let client_b_id = project_a.read_with(cx_a, |project, _| {
5578            project.collaborators().values().next().unwrap().peer_id
5579        });
5580
5581        // When client B starts following client A, all visible view states are replicated to client B.
5582        editor_a1.update(cx_a, |editor, cx| {
5583            editor.change_selections(None, cx, |s| s.select_ranges([0..1]))
5584        });
5585        editor_a2.update(cx_a, |editor, cx| {
5586            editor.change_selections(None, cx, |s| s.select_ranges([2..3]))
5587        });
5588        workspace_b
5589            .update(cx_b, |workspace, cx| {
5590                workspace
5591                    .toggle_follow(&ToggleFollow(client_a_id), cx)
5592                    .unwrap()
5593            })
5594            .await
5595            .unwrap();
5596
5597        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5598            workspace
5599                .active_item(cx)
5600                .unwrap()
5601                .downcast::<Editor>()
5602                .unwrap()
5603        });
5604        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5605        assert_eq!(
5606            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5607            Some((worktree_id, "2.txt").into())
5608        );
5609        assert_eq!(
5610            editor_b2.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5611            vec![2..3]
5612        );
5613        assert_eq!(
5614            editor_b1.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5615            vec![0..1]
5616        );
5617
5618        // When client A activates a different editor, client B does so as well.
5619        workspace_a.update(cx_a, |workspace, cx| {
5620            workspace.activate_item(&editor_a1, cx)
5621        });
5622        workspace_b
5623            .condition(cx_b, |workspace, cx| {
5624                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5625            })
5626            .await;
5627
5628        // When client A navigates back and forth, client B does so as well.
5629        workspace_a
5630            .update(cx_a, |workspace, cx| {
5631                workspace::Pane::go_back(workspace, None, cx)
5632            })
5633            .await;
5634        workspace_b
5635            .condition(cx_b, |workspace, cx| {
5636                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5637            })
5638            .await;
5639
5640        workspace_a
5641            .update(cx_a, |workspace, cx| {
5642                workspace::Pane::go_forward(workspace, None, cx)
5643            })
5644            .await;
5645        workspace_b
5646            .condition(cx_b, |workspace, cx| {
5647                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5648            })
5649            .await;
5650
5651        // Changes to client A's editor are reflected on client B.
5652        editor_a1.update(cx_a, |editor, cx| {
5653            editor.change_selections(None, cx, |s| s.select_ranges([1..1, 2..2]));
5654        });
5655        editor_b1
5656            .condition(cx_b, |editor, cx| {
5657                editor.selections.ranges(cx) == vec![1..1, 2..2]
5658            })
5659            .await;
5660
5661        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5662        editor_b1
5663            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5664            .await;
5665
5666        editor_a1.update(cx_a, |editor, cx| {
5667            editor.change_selections(None, cx, |s| s.select_ranges([3..3]));
5668            editor.set_scroll_position(vec2f(0., 100.), cx);
5669        });
5670        editor_b1
5671            .condition(cx_b, |editor, cx| {
5672                editor.selections.ranges(cx) == vec![3..3]
5673            })
5674            .await;
5675
5676        // After unfollowing, client B stops receiving updates from client A.
5677        workspace_b.update(cx_b, |workspace, cx| {
5678            workspace.unfollow(&workspace.active_pane().clone(), cx)
5679        });
5680        workspace_a.update(cx_a, |workspace, cx| {
5681            workspace.activate_item(&editor_a2, cx)
5682        });
5683        cx_a.foreground().run_until_parked();
5684        assert_eq!(
5685            workspace_b.read_with(cx_b, |workspace, cx| workspace
5686                .active_item(cx)
5687                .unwrap()
5688                .id()),
5689            editor_b1.id()
5690        );
5691
5692        // Client A starts following client B.
5693        workspace_a
5694            .update(cx_a, |workspace, cx| {
5695                workspace
5696                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5697                    .unwrap()
5698            })
5699            .await
5700            .unwrap();
5701        assert_eq!(
5702            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5703            Some(client_b_id)
5704        );
5705        assert_eq!(
5706            workspace_a.read_with(cx_a, |workspace, cx| workspace
5707                .active_item(cx)
5708                .unwrap()
5709                .id()),
5710            editor_a1.id()
5711        );
5712
5713        // Following interrupts when client B disconnects.
5714        client_b.disconnect(&cx_b.to_async()).unwrap();
5715        cx_a.foreground().run_until_parked();
5716        assert_eq!(
5717            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5718            None
5719        );
5720    }
5721
5722    #[gpui::test(iterations = 10)]
5723    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5724        cx_a.foreground().forbid_parking();
5725        let fs = FakeFs::new(cx_a.background());
5726
5727        // 2 clients connect to a server.
5728        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5729        let mut client_a = server.create_client(cx_a, "user_a").await;
5730        let mut client_b = server.create_client(cx_b, "user_b").await;
5731        server
5732            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5733            .await;
5734        cx_a.update(editor::init);
5735        cx_b.update(editor::init);
5736
5737        // Client A shares a project.
5738        fs.insert_tree(
5739            "/a",
5740            json!({
5741                "1.txt": "one",
5742                "2.txt": "two",
5743                "3.txt": "three",
5744                "4.txt": "four",
5745            }),
5746        )
5747        .await;
5748        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5749
5750        // Client B joins the project.
5751        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5752
5753        // Client A opens some editors.
5754        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5755        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5756        let _editor_a1 = workspace_a
5757            .update(cx_a, |workspace, cx| {
5758                workspace.open_path((worktree_id, "1.txt"), true, cx)
5759            })
5760            .await
5761            .unwrap()
5762            .downcast::<Editor>()
5763            .unwrap();
5764
5765        // Client B opens an editor.
5766        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5767        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5768        let _editor_b1 = workspace_b
5769            .update(cx_b, |workspace, cx| {
5770                workspace.open_path((worktree_id, "2.txt"), true, cx)
5771            })
5772            .await
5773            .unwrap()
5774            .downcast::<Editor>()
5775            .unwrap();
5776
5777        // Clients A and B follow each other in split panes
5778        workspace_a
5779            .update(cx_a, |workspace, cx| {
5780                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5781                assert_ne!(*workspace.active_pane(), pane_a1);
5782                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5783                workspace
5784                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5785                    .unwrap()
5786            })
5787            .await
5788            .unwrap();
5789        workspace_b
5790            .update(cx_b, |workspace, cx| {
5791                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5792                assert_ne!(*workspace.active_pane(), pane_b1);
5793                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5794                workspace
5795                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5796                    .unwrap()
5797            })
5798            .await
5799            .unwrap();
5800
5801        workspace_a
5802            .update(cx_a, |workspace, cx| {
5803                workspace.activate_next_pane(cx);
5804                assert_eq!(*workspace.active_pane(), pane_a1);
5805                workspace.open_path((worktree_id, "3.txt"), true, cx)
5806            })
5807            .await
5808            .unwrap();
5809        workspace_b
5810            .update(cx_b, |workspace, cx| {
5811                workspace.activate_next_pane(cx);
5812                assert_eq!(*workspace.active_pane(), pane_b1);
5813                workspace.open_path((worktree_id, "4.txt"), true, cx)
5814            })
5815            .await
5816            .unwrap();
5817        cx_a.foreground().run_until_parked();
5818
5819        // Ensure leader updates don't change the active pane of followers
5820        workspace_a.read_with(cx_a, |workspace, _| {
5821            assert_eq!(*workspace.active_pane(), pane_a1);
5822        });
5823        workspace_b.read_with(cx_b, |workspace, _| {
5824            assert_eq!(*workspace.active_pane(), pane_b1);
5825        });
5826
5827        // Ensure peers following each other doesn't cause an infinite loop.
5828        assert_eq!(
5829            workspace_a.read_with(cx_a, |workspace, cx| workspace
5830                .active_item(cx)
5831                .unwrap()
5832                .project_path(cx)),
5833            Some((worktree_id, "3.txt").into())
5834        );
5835        workspace_a.update(cx_a, |workspace, cx| {
5836            assert_eq!(
5837                workspace.active_item(cx).unwrap().project_path(cx),
5838                Some((worktree_id, "3.txt").into())
5839            );
5840            workspace.activate_next_pane(cx);
5841            assert_eq!(
5842                workspace.active_item(cx).unwrap().project_path(cx),
5843                Some((worktree_id, "4.txt").into())
5844            );
5845        });
5846        workspace_b.update(cx_b, |workspace, cx| {
5847            assert_eq!(
5848                workspace.active_item(cx).unwrap().project_path(cx),
5849                Some((worktree_id, "4.txt").into())
5850            );
5851            workspace.activate_next_pane(cx);
5852            assert_eq!(
5853                workspace.active_item(cx).unwrap().project_path(cx),
5854                Some((worktree_id, "3.txt").into())
5855            );
5856        });
5857    }
5858
5859    #[gpui::test(iterations = 10)]
5860    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5861        cx_a.foreground().forbid_parking();
5862        let fs = FakeFs::new(cx_a.background());
5863
5864        // 2 clients connect to a server.
5865        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5866        let mut client_a = server.create_client(cx_a, "user_a").await;
5867        let mut client_b = server.create_client(cx_b, "user_b").await;
5868        server
5869            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5870            .await;
5871        cx_a.update(editor::init);
5872        cx_b.update(editor::init);
5873
5874        // Client A shares a project.
5875        fs.insert_tree(
5876            "/a",
5877            json!({
5878                "1.txt": "one",
5879                "2.txt": "two",
5880                "3.txt": "three",
5881            }),
5882        )
5883        .await;
5884        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5885
5886        // Client B joins the project.
5887        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5888
5889        // Client A opens some editors.
5890        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5891        let _editor_a1 = workspace_a
5892            .update(cx_a, |workspace, cx| {
5893                workspace.open_path((worktree_id, "1.txt"), true, cx)
5894            })
5895            .await
5896            .unwrap()
5897            .downcast::<Editor>()
5898            .unwrap();
5899
5900        // Client B starts following client A.
5901        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5902        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5903        let leader_id = project_b.read_with(cx_b, |project, _| {
5904            project.collaborators().values().next().unwrap().peer_id
5905        });
5906        workspace_b
5907            .update(cx_b, |workspace, cx| {
5908                workspace
5909                    .toggle_follow(&ToggleFollow(leader_id), cx)
5910                    .unwrap()
5911            })
5912            .await
5913            .unwrap();
5914        assert_eq!(
5915            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5916            Some(leader_id)
5917        );
5918        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5919            workspace
5920                .active_item(cx)
5921                .unwrap()
5922                .downcast::<Editor>()
5923                .unwrap()
5924        });
5925
5926        // When client B moves, it automatically stops following client A.
5927        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5928        assert_eq!(
5929            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5930            None
5931        );
5932
5933        workspace_b
5934            .update(cx_b, |workspace, cx| {
5935                workspace
5936                    .toggle_follow(&ToggleFollow(leader_id), cx)
5937                    .unwrap()
5938            })
5939            .await
5940            .unwrap();
5941        assert_eq!(
5942            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5943            Some(leader_id)
5944        );
5945
5946        // When client B edits, it automatically stops following client A.
5947        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5948        assert_eq!(
5949            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5950            None
5951        );
5952
5953        workspace_b
5954            .update(cx_b, |workspace, cx| {
5955                workspace
5956                    .toggle_follow(&ToggleFollow(leader_id), cx)
5957                    .unwrap()
5958            })
5959            .await
5960            .unwrap();
5961        assert_eq!(
5962            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5963            Some(leader_id)
5964        );
5965
5966        // When client B scrolls, it automatically stops following client A.
5967        editor_b2.update(cx_b, |editor, cx| {
5968            editor.set_scroll_position(vec2f(0., 3.), cx)
5969        });
5970        assert_eq!(
5971            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5972            None
5973        );
5974
5975        workspace_b
5976            .update(cx_b, |workspace, cx| {
5977                workspace
5978                    .toggle_follow(&ToggleFollow(leader_id), cx)
5979                    .unwrap()
5980            })
5981            .await
5982            .unwrap();
5983        assert_eq!(
5984            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5985            Some(leader_id)
5986        );
5987
5988        // When client B activates a different pane, it continues following client A in the original pane.
5989        workspace_b.update(cx_b, |workspace, cx| {
5990            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5991        });
5992        assert_eq!(
5993            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5994            Some(leader_id)
5995        );
5996
5997        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5998        assert_eq!(
5999            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6000            Some(leader_id)
6001        );
6002
6003        // When client B activates a different item in the original pane, it automatically stops following client A.
6004        workspace_b
6005            .update(cx_b, |workspace, cx| {
6006                workspace.open_path((worktree_id, "2.txt"), true, cx)
6007            })
6008            .await
6009            .unwrap();
6010        assert_eq!(
6011            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6012            None
6013        );
6014    }
6015
6016    #[gpui::test(iterations = 100)]
6017    async fn test_random_collaboration(
6018        cx: &mut TestAppContext,
6019        deterministic: Arc<Deterministic>,
6020        rng: StdRng,
6021    ) {
6022        cx.foreground().forbid_parking();
6023        let max_peers = env::var("MAX_PEERS")
6024            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
6025            .unwrap_or(5);
6026        assert!(max_peers <= 5);
6027
6028        let max_operations = env::var("OPERATIONS")
6029            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
6030            .unwrap_or(10);
6031
6032        let rng = Arc::new(Mutex::new(rng));
6033
6034        let guest_lang_registry = Arc::new(LanguageRegistry::test());
6035        let host_language_registry = Arc::new(LanguageRegistry::test());
6036
6037        let fs = FakeFs::new(cx.background());
6038        fs.insert_tree("/_collab", json!({"init": ""})).await;
6039
6040        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
6041        let db = server.app_state.db.clone();
6042        let host_user_id = db.create_user("host", false).await.unwrap();
6043        for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
6044            let guest_user_id = db.create_user(username, false).await.unwrap();
6045            server
6046                .app_state
6047                .db
6048                .send_contact_request(guest_user_id, host_user_id)
6049                .await
6050                .unwrap();
6051            server
6052                .app_state
6053                .db
6054                .respond_to_contact_request(host_user_id, guest_user_id, true)
6055                .await
6056                .unwrap();
6057        }
6058
6059        let mut clients = Vec::new();
6060        let mut user_ids = Vec::new();
6061        let mut op_start_signals = Vec::new();
6062
6063        let mut next_entity_id = 100000;
6064        let mut host_cx = TestAppContext::new(
6065            cx.foreground_platform(),
6066            cx.platform(),
6067            deterministic.build_foreground(next_entity_id),
6068            deterministic.build_background(),
6069            cx.font_cache(),
6070            cx.leak_detector(),
6071            next_entity_id,
6072        );
6073        let host = server.create_client(&mut host_cx, "host").await;
6074        let host_project = host_cx.update(|cx| {
6075            Project::local(
6076                host.client.clone(),
6077                host.user_store.clone(),
6078                host_language_registry.clone(),
6079                fs.clone(),
6080                cx,
6081            )
6082        });
6083        let host_project_id = host_project
6084            .update(&mut host_cx, |p, _| p.next_remote_id())
6085            .await;
6086
6087        let (collab_worktree, _) = host_project
6088            .update(&mut host_cx, |project, cx| {
6089                project.find_or_create_local_worktree("/_collab", true, cx)
6090            })
6091            .await
6092            .unwrap();
6093        collab_worktree
6094            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
6095            .await;
6096
6097        // Set up fake language servers.
6098        let mut language = Language::new(
6099            LanguageConfig {
6100                name: "Rust".into(),
6101                path_suffixes: vec!["rs".to_string()],
6102                ..Default::default()
6103            },
6104            None,
6105        );
6106        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6107            name: "the-fake-language-server",
6108            capabilities: lsp::LanguageServer::full_capabilities(),
6109            initializer: Some(Box::new({
6110                let rng = rng.clone();
6111                let fs = fs.clone();
6112                let project = host_project.downgrade();
6113                move |fake_server: &mut FakeLanguageServer| {
6114                    fake_server.handle_request::<lsp::request::Completion, _, _>(
6115                        |_, _| async move {
6116                            Ok(Some(lsp::CompletionResponse::Array(vec![
6117                                lsp::CompletionItem {
6118                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6119                                        range: lsp::Range::new(
6120                                            lsp::Position::new(0, 0),
6121                                            lsp::Position::new(0, 0),
6122                                        ),
6123                                        new_text: "the-new-text".to_string(),
6124                                    })),
6125                                    ..Default::default()
6126                                },
6127                            ])))
6128                        },
6129                    );
6130
6131                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6132                        |_, _| async move {
6133                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6134                                lsp::CodeAction {
6135                                    title: "the-code-action".to_string(),
6136                                    ..Default::default()
6137                                },
6138                            )]))
6139                        },
6140                    );
6141
6142                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6143                        |params, _| async move {
6144                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6145                                params.position,
6146                                params.position,
6147                            ))))
6148                        },
6149                    );
6150
6151                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6152                        let fs = fs.clone();
6153                        let rng = rng.clone();
6154                        move |_, _| {
6155                            let fs = fs.clone();
6156                            let rng = rng.clone();
6157                            async move {
6158                                let files = fs.files().await;
6159                                let mut rng = rng.lock();
6160                                let count = rng.gen_range::<usize, _>(1..3);
6161                                let files = (0..count)
6162                                    .map(|_| files.choose(&mut *rng).unwrap())
6163                                    .collect::<Vec<_>>();
6164                                log::info!("LSP: Returning definitions in files {:?}", &files);
6165                                Ok(Some(lsp::GotoDefinitionResponse::Array(
6166                                    files
6167                                        .into_iter()
6168                                        .map(|file| lsp::Location {
6169                                            uri: lsp::Url::from_file_path(file).unwrap(),
6170                                            range: Default::default(),
6171                                        })
6172                                        .collect(),
6173                                )))
6174                            }
6175                        }
6176                    });
6177
6178                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6179                        let rng = rng.clone();
6180                        let project = project.clone();
6181                        move |params, mut cx| {
6182                            let highlights = if let Some(project) = project.upgrade(&cx) {
6183                                project.update(&mut cx, |project, cx| {
6184                                    let path = params
6185                                        .text_document_position_params
6186                                        .text_document
6187                                        .uri
6188                                        .to_file_path()
6189                                        .unwrap();
6190                                    let (worktree, relative_path) =
6191                                        project.find_local_worktree(&path, cx)?;
6192                                    let project_path =
6193                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
6194                                    let buffer =
6195                                        project.get_open_buffer(&project_path, cx)?.read(cx);
6196
6197                                    let mut highlights = Vec::new();
6198                                    let highlight_count = rng.lock().gen_range(1..=5);
6199                                    let mut prev_end = 0;
6200                                    for _ in 0..highlight_count {
6201                                        let range =
6202                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
6203
6204                                        highlights.push(lsp::DocumentHighlight {
6205                                            range: range_to_lsp(range.to_point_utf16(buffer)),
6206                                            kind: Some(lsp::DocumentHighlightKind::READ),
6207                                        });
6208                                        prev_end = range.end;
6209                                    }
6210                                    Some(highlights)
6211                                })
6212                            } else {
6213                                None
6214                            };
6215                            async move { Ok(highlights) }
6216                        }
6217                    });
6218                }
6219            })),
6220            ..Default::default()
6221        });
6222        host_language_registry.add(Arc::new(language));
6223
6224        let op_start_signal = futures::channel::mpsc::unbounded();
6225        user_ids.push(host.current_user_id(&host_cx));
6226        op_start_signals.push(op_start_signal.0);
6227        clients.push(host_cx.foreground().spawn(host.simulate_host(
6228            host_project,
6229            op_start_signal.1,
6230            rng.clone(),
6231            host_cx,
6232        )));
6233
6234        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6235            rng.lock().gen_range(0..max_operations)
6236        } else {
6237            max_operations
6238        };
6239        let mut available_guests = vec![
6240            "guest-1".to_string(),
6241            "guest-2".to_string(),
6242            "guest-3".to_string(),
6243            "guest-4".to_string(),
6244        ];
6245        let mut operations = 0;
6246        while operations < max_operations {
6247            if operations == disconnect_host_at {
6248                server.disconnect_client(user_ids[0]);
6249                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6250                drop(op_start_signals);
6251                let mut clients = futures::future::join_all(clients).await;
6252                cx.foreground().run_until_parked();
6253
6254                let (host, mut host_cx, host_err) = clients.remove(0);
6255                if let Some(host_err) = host_err {
6256                    log::error!("host error - {:?}", host_err);
6257                }
6258                host.project
6259                    .as_ref()
6260                    .unwrap()
6261                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6262                for (guest, mut guest_cx, guest_err) in clients {
6263                    if let Some(guest_err) = guest_err {
6264                        log::error!("{} error - {:?}", guest.username, guest_err);
6265                    }
6266
6267                    let contacts = server
6268                        .app_state
6269                        .db
6270                        .get_contacts(guest.current_user_id(&guest_cx))
6271                        .await
6272                        .unwrap();
6273                    let contacts = server
6274                        .store
6275                        .read()
6276                        .await
6277                        .build_initial_contacts_update(contacts)
6278                        .contacts;
6279                    assert!(!contacts
6280                        .iter()
6281                        .flat_map(|contact| &contact.projects)
6282                        .any(|project| project.id == host_project_id));
6283                    guest
6284                        .project
6285                        .as_ref()
6286                        .unwrap()
6287                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6288                    guest_cx.update(|_| drop(guest));
6289                }
6290                host_cx.update(|_| drop(host));
6291
6292                return;
6293            }
6294
6295            let distribution = rng.lock().gen_range(0..100);
6296            match distribution {
6297                0..=19 if !available_guests.is_empty() => {
6298                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
6299                    let guest_username = available_guests.remove(guest_ix);
6300                    log::info!("Adding new connection for {}", guest_username);
6301                    next_entity_id += 100000;
6302                    let mut guest_cx = TestAppContext::new(
6303                        cx.foreground_platform(),
6304                        cx.platform(),
6305                        deterministic.build_foreground(next_entity_id),
6306                        deterministic.build_background(),
6307                        cx.font_cache(),
6308                        cx.leak_detector(),
6309                        next_entity_id,
6310                    );
6311                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
6312                    let guest_project = Project::remote(
6313                        host_project_id,
6314                        guest.client.clone(),
6315                        guest.user_store.clone(),
6316                        guest_lang_registry.clone(),
6317                        FakeFs::new(cx.background()),
6318                        &mut guest_cx.to_async(),
6319                    )
6320                    .await
6321                    .unwrap();
6322                    let op_start_signal = futures::channel::mpsc::unbounded();
6323                    user_ids.push(guest.current_user_id(&guest_cx));
6324                    op_start_signals.push(op_start_signal.0);
6325                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6326                        guest_username.clone(),
6327                        guest_project,
6328                        op_start_signal.1,
6329                        rng.clone(),
6330                        guest_cx,
6331                    )));
6332
6333                    log::info!("Added connection for {}", guest_username);
6334                    operations += 1;
6335                }
6336                20..=29 if clients.len() > 1 => {
6337                    let guest_ix = rng.lock().gen_range(1..clients.len());
6338                    log::info!("Removing guest {}", user_ids[guest_ix]);
6339                    let removed_guest_id = user_ids.remove(guest_ix);
6340                    let guest = clients.remove(guest_ix);
6341                    op_start_signals.remove(guest_ix);
6342                    server.forbid_connections();
6343                    server.disconnect_client(removed_guest_id);
6344                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6345                    let (guest, mut guest_cx, guest_err) = guest.await;
6346                    server.allow_connections();
6347
6348                    if let Some(guest_err) = guest_err {
6349                        log::error!("{} error - {:?}", guest.username, guest_err);
6350                    }
6351                    guest
6352                        .project
6353                        .as_ref()
6354                        .unwrap()
6355                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6356                    for user_id in &user_ids {
6357                        let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap();
6358                        let contacts = server
6359                            .store
6360                            .read()
6361                            .await
6362                            .build_initial_contacts_update(contacts)
6363                            .contacts;
6364                        for contact in contacts {
6365                            if contact.online {
6366                                assert_ne!(
6367                                    contact.user_id, removed_guest_id.0 as u64,
6368                                    "removed guest is still a contact of another peer"
6369                                );
6370                            }
6371                            for project in contact.projects {
6372                                for project_guest_id in project.guests {
6373                                    assert_ne!(
6374                                        project_guest_id, removed_guest_id.0 as u64,
6375                                        "removed guest appears as still participating on a project"
6376                                    );
6377                                }
6378                            }
6379                        }
6380                    }
6381
6382                    log::info!("{} removed", guest.username);
6383                    available_guests.push(guest.username.clone());
6384                    guest_cx.update(|_| drop(guest));
6385
6386                    operations += 1;
6387                }
6388                _ => {
6389                    while operations < max_operations && rng.lock().gen_bool(0.7) {
6390                        op_start_signals
6391                            .choose(&mut *rng.lock())
6392                            .unwrap()
6393                            .unbounded_send(())
6394                            .unwrap();
6395                        operations += 1;
6396                    }
6397
6398                    if rng.lock().gen_bool(0.8) {
6399                        cx.foreground().run_until_parked();
6400                    }
6401                }
6402            }
6403        }
6404
6405        drop(op_start_signals);
6406        let mut clients = futures::future::join_all(clients).await;
6407        cx.foreground().run_until_parked();
6408
6409        let (host_client, mut host_cx, host_err) = clients.remove(0);
6410        if let Some(host_err) = host_err {
6411            panic!("host error - {:?}", host_err);
6412        }
6413        let host_project = host_client.project.as_ref().unwrap();
6414        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6415            project
6416                .worktrees(cx)
6417                .map(|worktree| {
6418                    let snapshot = worktree.read(cx).snapshot();
6419                    (snapshot.id(), snapshot)
6420                })
6421                .collect::<BTreeMap<_, _>>()
6422        });
6423
6424        host_client
6425            .project
6426            .as_ref()
6427            .unwrap()
6428            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6429
6430        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6431            if let Some(guest_err) = guest_err {
6432                panic!("{} error - {:?}", guest_client.username, guest_err);
6433            }
6434            let worktree_snapshots =
6435                guest_client
6436                    .project
6437                    .as_ref()
6438                    .unwrap()
6439                    .read_with(&guest_cx, |project, cx| {
6440                        project
6441                            .worktrees(cx)
6442                            .map(|worktree| {
6443                                let worktree = worktree.read(cx);
6444                                (worktree.id(), worktree.snapshot())
6445                            })
6446                            .collect::<BTreeMap<_, _>>()
6447                    });
6448
6449            assert_eq!(
6450                worktree_snapshots.keys().collect::<Vec<_>>(),
6451                host_worktree_snapshots.keys().collect::<Vec<_>>(),
6452                "{} has different worktrees than the host",
6453                guest_client.username
6454            );
6455            for (id, host_snapshot) in &host_worktree_snapshots {
6456                let guest_snapshot = &worktree_snapshots[id];
6457                assert_eq!(
6458                    guest_snapshot.root_name(),
6459                    host_snapshot.root_name(),
6460                    "{} has different root name than the host for worktree {}",
6461                    guest_client.username,
6462                    id
6463                );
6464                assert_eq!(
6465                    guest_snapshot.entries(false).collect::<Vec<_>>(),
6466                    host_snapshot.entries(false).collect::<Vec<_>>(),
6467                    "{} has different snapshot than the host for worktree {}",
6468                    guest_client.username,
6469                    id
6470                );
6471                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6472            }
6473
6474            guest_client
6475                .project
6476                .as_ref()
6477                .unwrap()
6478                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6479
6480            for guest_buffer in &guest_client.buffers {
6481                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6482                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6483                    project.buffer_for_id(buffer_id, cx).expect(&format!(
6484                        "host does not have buffer for guest:{}, peer:{}, id:{}",
6485                        guest_client.username, guest_client.peer_id, buffer_id
6486                    ))
6487                });
6488                let path = host_buffer
6489                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6490
6491                assert_eq!(
6492                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6493                    0,
6494                    "{}, buffer {}, path {:?} has deferred operations",
6495                    guest_client.username,
6496                    buffer_id,
6497                    path,
6498                );
6499                assert_eq!(
6500                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6501                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6502                    "{}, buffer {}, path {:?}, differs from the host's buffer",
6503                    guest_client.username,
6504                    buffer_id,
6505                    path
6506                );
6507            }
6508
6509            guest_cx.update(|_| drop(guest_client));
6510        }
6511
6512        host_cx.update(|_| drop(host_client));
6513    }
6514
6515    struct TestServer {
6516        peer: Arc<Peer>,
6517        app_state: Arc<AppState>,
6518        server: Arc<Server>,
6519        foreground: Rc<executor::Foreground>,
6520        notifications: mpsc::UnboundedReceiver<()>,
6521        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6522        forbid_connections: Arc<AtomicBool>,
6523        _test_db: TestDb,
6524    }
6525
6526    impl TestServer {
6527        async fn start(
6528            foreground: Rc<executor::Foreground>,
6529            background: Arc<executor::Background>,
6530        ) -> Self {
6531            let test_db = TestDb::fake(background);
6532            let app_state = Self::build_app_state(&test_db).await;
6533            let peer = Peer::new();
6534            let notifications = mpsc::unbounded();
6535            let server = Server::new(app_state.clone(), Some(notifications.0));
6536            Self {
6537                peer,
6538                app_state,
6539                server,
6540                foreground,
6541                notifications: notifications.1,
6542                connection_killers: Default::default(),
6543                forbid_connections: Default::default(),
6544                _test_db: test_db,
6545            }
6546        }
6547
6548        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6549            cx.update(|cx| {
6550                let settings = Settings::test(cx);
6551                cx.set_global(settings);
6552            });
6553
6554            let http = FakeHttpClient::with_404_response();
6555            let user_id =
6556                if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6557                    user.id
6558                } else {
6559                    self.app_state.db.create_user(name, false).await.unwrap()
6560                };
6561            let client_name = name.to_string();
6562            let mut client = Client::new(http.clone());
6563            let server = self.server.clone();
6564            let db = self.app_state.db.clone();
6565            let connection_killers = self.connection_killers.clone();
6566            let forbid_connections = self.forbid_connections.clone();
6567            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6568
6569            Arc::get_mut(&mut client)
6570                .unwrap()
6571                .override_authenticate(move |cx| {
6572                    cx.spawn(|_| async move {
6573                        let access_token = "the-token".to_string();
6574                        Ok(Credentials {
6575                            user_id: user_id.0 as u64,
6576                            access_token,
6577                        })
6578                    })
6579                })
6580                .override_establish_connection(move |credentials, cx| {
6581                    assert_eq!(credentials.user_id, user_id.0 as u64);
6582                    assert_eq!(credentials.access_token, "the-token");
6583
6584                    let server = server.clone();
6585                    let db = db.clone();
6586                    let connection_killers = connection_killers.clone();
6587                    let forbid_connections = forbid_connections.clone();
6588                    let client_name = client_name.clone();
6589                    let connection_id_tx = connection_id_tx.clone();
6590                    cx.spawn(move |cx| async move {
6591                        if forbid_connections.load(SeqCst) {
6592                            Err(EstablishConnectionError::other(anyhow!(
6593                                "server is forbidding connections"
6594                            )))
6595                        } else {
6596                            let (client_conn, server_conn, killed) =
6597                                Connection::in_memory(cx.background());
6598                            connection_killers.lock().insert(user_id, killed);
6599                            let user = db.get_user_by_id(user_id).await.unwrap().unwrap();
6600                            cx.background()
6601                                .spawn(server.handle_connection(
6602                                    server_conn,
6603                                    client_name,
6604                                    user,
6605                                    Some(connection_id_tx),
6606                                    cx.background(),
6607                                ))
6608                                .detach();
6609                            Ok(client_conn)
6610                        }
6611                    })
6612                });
6613
6614            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6615            let app_state = Arc::new(workspace::AppState {
6616                client: client.clone(),
6617                user_store: user_store.clone(),
6618                languages: Arc::new(LanguageRegistry::new(Task::ready(()))),
6619                themes: ThemeRegistry::new((), cx.font_cache()),
6620                fs: FakeFs::new(cx.background()),
6621                build_window_options: || Default::default(),
6622                initialize_workspace: |_, _, _| unimplemented!(),
6623            });
6624
6625            Channel::init(&client);
6626            Project::init(&client);
6627            cx.update(|cx| workspace::init(app_state.clone(), cx));
6628
6629            client
6630                .authenticate_and_connect(false, &cx.to_async())
6631                .await
6632                .unwrap();
6633            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6634
6635            let client = TestClient {
6636                client,
6637                peer_id,
6638                username: name.to_string(),
6639                user_store,
6640                language_registry: Arc::new(LanguageRegistry::test()),
6641                project: Default::default(),
6642                buffers: Default::default(),
6643            };
6644            client.wait_for_current_user(cx).await;
6645            client
6646        }
6647
6648        fn disconnect_client(&self, user_id: UserId) {
6649            self.connection_killers
6650                .lock()
6651                .remove(&user_id)
6652                .unwrap()
6653                .store(true, SeqCst);
6654        }
6655
6656        fn forbid_connections(&self) {
6657            self.forbid_connections.store(true, SeqCst);
6658        }
6659
6660        fn allow_connections(&self) {
6661            self.forbid_connections.store(false, SeqCst);
6662        }
6663
6664        async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6665            while let Some((client_a, cx_a)) = clients.pop() {
6666                for (client_b, cx_b) in &mut clients {
6667                    client_a
6668                        .user_store
6669                        .update(cx_a, |store, cx| {
6670                            store.request_contact(client_b.user_id().unwrap(), cx)
6671                        })
6672                        .await
6673                        .unwrap();
6674                    cx_a.foreground().run_until_parked();
6675                    client_b
6676                        .user_store
6677                        .update(*cx_b, |store, cx| {
6678                            store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6679                        })
6680                        .await
6681                        .unwrap();
6682                }
6683            }
6684        }
6685
6686        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6687            Arc::new(AppState {
6688                db: test_db.db().clone(),
6689                api_token: Default::default(),
6690            })
6691        }
6692
6693        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6694            self.server.store.read().await
6695        }
6696
6697        async fn condition<F>(&mut self, mut predicate: F)
6698        where
6699            F: FnMut(&Store) -> bool,
6700        {
6701            assert!(
6702                self.foreground.parking_forbidden(),
6703                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6704            );
6705            while !(predicate)(&*self.server.store.read().await) {
6706                self.foreground.start_waiting();
6707                self.notifications.next().await;
6708                self.foreground.finish_waiting();
6709            }
6710        }
6711    }
6712
6713    impl Deref for TestServer {
6714        type Target = Server;
6715
6716        fn deref(&self) -> &Self::Target {
6717            &self.server
6718        }
6719    }
6720
6721    impl Drop for TestServer {
6722        fn drop(&mut self) {
6723            self.peer.reset();
6724        }
6725    }
6726
6727    struct TestClient {
6728        client: Arc<Client>,
6729        username: String,
6730        pub peer_id: PeerId,
6731        pub user_store: ModelHandle<UserStore>,
6732        language_registry: Arc<LanguageRegistry>,
6733        project: Option<ModelHandle<Project>>,
6734        buffers: HashSet<ModelHandle<language::Buffer>>,
6735    }
6736
6737    impl Deref for TestClient {
6738        type Target = Arc<Client>;
6739
6740        fn deref(&self) -> &Self::Target {
6741            &self.client
6742        }
6743    }
6744
6745    struct ContactsSummary {
6746        pub current: Vec<String>,
6747        pub outgoing_requests: Vec<String>,
6748        pub incoming_requests: Vec<String>,
6749    }
6750
6751    impl TestClient {
6752        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6753            UserId::from_proto(
6754                self.user_store
6755                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6756            )
6757        }
6758
6759        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6760            let mut authed_user = self
6761                .user_store
6762                .read_with(cx, |user_store, _| user_store.watch_current_user());
6763            while authed_user.next().await.unwrap().is_none() {}
6764        }
6765
6766        async fn clear_contacts(&self, cx: &mut TestAppContext) {
6767            self.user_store
6768                .update(cx, |store, _| store.clear_contacts())
6769                .await;
6770        }
6771
6772        fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6773            self.user_store.read_with(cx, |store, _| ContactsSummary {
6774                current: store
6775                    .contacts()
6776                    .iter()
6777                    .map(|contact| contact.user.github_login.clone())
6778                    .collect(),
6779                outgoing_requests: store
6780                    .outgoing_contact_requests()
6781                    .iter()
6782                    .map(|user| user.github_login.clone())
6783                    .collect(),
6784                incoming_requests: store
6785                    .incoming_contact_requests()
6786                    .iter()
6787                    .map(|user| user.github_login.clone())
6788                    .collect(),
6789            })
6790        }
6791
6792        async fn build_local_project(
6793            &mut self,
6794            fs: Arc<FakeFs>,
6795            root_path: impl AsRef<Path>,
6796            cx: &mut TestAppContext,
6797        ) -> (ModelHandle<Project>, WorktreeId) {
6798            let project = cx.update(|cx| {
6799                Project::local(
6800                    self.client.clone(),
6801                    self.user_store.clone(),
6802                    self.language_registry.clone(),
6803                    fs,
6804                    cx,
6805                )
6806            });
6807            self.project = Some(project.clone());
6808            let (worktree, _) = project
6809                .update(cx, |p, cx| {
6810                    p.find_or_create_local_worktree(root_path, true, cx)
6811                })
6812                .await
6813                .unwrap();
6814            worktree
6815                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6816                .await;
6817            project
6818                .update(cx, |project, _| project.next_remote_id())
6819                .await;
6820            (project, worktree.read_with(cx, |tree, _| tree.id()))
6821        }
6822
6823        async fn build_remote_project(
6824            &mut self,
6825            host_project: &ModelHandle<Project>,
6826            host_cx: &mut TestAppContext,
6827            guest_cx: &mut TestAppContext,
6828        ) -> ModelHandle<Project> {
6829            let host_project_id = host_project
6830                .read_with(host_cx, |project, _| project.next_remote_id())
6831                .await;
6832            let guest_user_id = self.user_id().unwrap();
6833            let languages =
6834                host_project.read_with(host_cx, |project, _| project.languages().clone());
6835            let project_b = guest_cx.spawn(|mut cx| {
6836                let user_store = self.user_store.clone();
6837                let guest_client = self.client.clone();
6838                async move {
6839                    Project::remote(
6840                        host_project_id,
6841                        guest_client,
6842                        user_store.clone(),
6843                        languages,
6844                        FakeFs::new(cx.background()),
6845                        &mut cx,
6846                    )
6847                    .await
6848                    .unwrap()
6849                }
6850            });
6851            host_cx.foreground().run_until_parked();
6852            host_project.update(host_cx, |project, cx| {
6853                project.respond_to_join_request(guest_user_id, true, cx)
6854            });
6855            let project = project_b.await;
6856            self.project = Some(project.clone());
6857            project
6858        }
6859
6860        fn build_workspace(
6861            &self,
6862            project: &ModelHandle<Project>,
6863            cx: &mut TestAppContext,
6864        ) -> ViewHandle<Workspace> {
6865            let (window_id, _) = cx.add_window(|_| EmptyView);
6866            cx.add_view(window_id, |cx| Workspace::new(project.clone(), cx))
6867        }
6868
6869        async fn simulate_host(
6870            mut self,
6871            project: ModelHandle<Project>,
6872            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6873            rng: Arc<Mutex<StdRng>>,
6874            mut cx: TestAppContext,
6875        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6876            async fn simulate_host_internal(
6877                client: &mut TestClient,
6878                project: ModelHandle<Project>,
6879                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6880                rng: Arc<Mutex<StdRng>>,
6881                cx: &mut TestAppContext,
6882            ) -> anyhow::Result<()> {
6883                let fs = project.read_with(cx, |project, _| project.fs().clone());
6884
6885                cx.update(|cx| {
6886                    cx.subscribe(&project, move |project, event, cx| {
6887                        if let project::Event::ContactRequestedJoin(user) = event {
6888                            log::info!("Host: accepting join request from {}", user.github_login);
6889                            project.update(cx, |project, cx| {
6890                                project.respond_to_join_request(user.id, true, cx)
6891                            });
6892                        }
6893                    })
6894                    .detach();
6895                });
6896
6897                while op_start_signal.next().await.is_some() {
6898                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6899                    let files = fs.as_fake().files().await;
6900                    match distribution {
6901                        0..=19 if !files.is_empty() => {
6902                            let path = files.choose(&mut *rng.lock()).unwrap();
6903                            let mut path = path.as_path();
6904                            while let Some(parent_path) = path.parent() {
6905                                path = parent_path;
6906                                if rng.lock().gen() {
6907                                    break;
6908                                }
6909                            }
6910
6911                            log::info!("Host: find/create local worktree {:?}", path);
6912                            let find_or_create_worktree = project.update(cx, |project, cx| {
6913                                project.find_or_create_local_worktree(path, true, cx)
6914                            });
6915                            if rng.lock().gen() {
6916                                cx.background().spawn(find_or_create_worktree).detach();
6917                            } else {
6918                                find_or_create_worktree.await?;
6919                            }
6920                        }
6921                        20..=79 if !files.is_empty() => {
6922                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6923                                let file = files.choose(&mut *rng.lock()).unwrap();
6924                                let (worktree, path) = project
6925                                    .update(cx, |project, cx| {
6926                                        project.find_or_create_local_worktree(
6927                                            file.clone(),
6928                                            true,
6929                                            cx,
6930                                        )
6931                                    })
6932                                    .await?;
6933                                let project_path =
6934                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6935                                log::info!(
6936                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6937                                    file,
6938                                    project_path.0,
6939                                    project_path.1
6940                                );
6941                                let buffer = project
6942                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6943                                    .await
6944                                    .unwrap();
6945                                client.buffers.insert(buffer.clone());
6946                                buffer
6947                            } else {
6948                                client
6949                                    .buffers
6950                                    .iter()
6951                                    .choose(&mut *rng.lock())
6952                                    .unwrap()
6953                                    .clone()
6954                            };
6955
6956                            if rng.lock().gen_bool(0.1) {
6957                                cx.update(|cx| {
6958                                    log::info!(
6959                                        "Host: dropping buffer {:?}",
6960                                        buffer.read(cx).file().unwrap().full_path(cx)
6961                                    );
6962                                    client.buffers.remove(&buffer);
6963                                    drop(buffer);
6964                                });
6965                            } else {
6966                                buffer.update(cx, |buffer, cx| {
6967                                    log::info!(
6968                                        "Host: updating buffer {:?} ({})",
6969                                        buffer.file().unwrap().full_path(cx),
6970                                        buffer.remote_id()
6971                                    );
6972
6973                                    if rng.lock().gen_bool(0.7) {
6974                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6975                                    } else {
6976                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6977                                    }
6978                                });
6979                            }
6980                        }
6981                        _ => loop {
6982                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6983                            let mut path = PathBuf::new();
6984                            path.push("/");
6985                            for _ in 0..path_component_count {
6986                                let letter = rng.lock().gen_range(b'a'..=b'z');
6987                                path.push(std::str::from_utf8(&[letter]).unwrap());
6988                            }
6989                            path.set_extension("rs");
6990                            let parent_path = path.parent().unwrap();
6991
6992                            log::info!("Host: creating file {:?}", path,);
6993
6994                            if fs.create_dir(&parent_path).await.is_ok()
6995                                && fs.create_file(&path, Default::default()).await.is_ok()
6996                            {
6997                                break;
6998                            } else {
6999                                log::info!("Host: cannot create file");
7000                            }
7001                        },
7002                    }
7003
7004                    cx.background().simulate_random_delay().await;
7005                }
7006
7007                Ok(())
7008            }
7009
7010            let result =
7011                simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
7012                    .await;
7013            log::info!("Host done");
7014            self.project = Some(project);
7015            (self, cx, result.err())
7016        }
7017
7018        pub async fn simulate_guest(
7019            mut self,
7020            guest_username: String,
7021            project: ModelHandle<Project>,
7022            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
7023            rng: Arc<Mutex<StdRng>>,
7024            mut cx: TestAppContext,
7025        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
7026            async fn simulate_guest_internal(
7027                client: &mut TestClient,
7028                guest_username: &str,
7029                project: ModelHandle<Project>,
7030                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
7031                rng: Arc<Mutex<StdRng>>,
7032                cx: &mut TestAppContext,
7033            ) -> anyhow::Result<()> {
7034                while op_start_signal.next().await.is_some() {
7035                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
7036                        let worktree = if let Some(worktree) =
7037                            project.read_with(cx, |project, cx| {
7038                                project
7039                                    .worktrees(&cx)
7040                                    .filter(|worktree| {
7041                                        let worktree = worktree.read(cx);
7042                                        worktree.is_visible()
7043                                            && worktree.entries(false).any(|e| e.is_file())
7044                                    })
7045                                    .choose(&mut *rng.lock())
7046                            }) {
7047                            worktree
7048                        } else {
7049                            cx.background().simulate_random_delay().await;
7050                            continue;
7051                        };
7052
7053                        let (worktree_root_name, project_path) =
7054                            worktree.read_with(cx, |worktree, _| {
7055                                let entry = worktree
7056                                    .entries(false)
7057                                    .filter(|e| e.is_file())
7058                                    .choose(&mut *rng.lock())
7059                                    .unwrap();
7060                                (
7061                                    worktree.root_name().to_string(),
7062                                    (worktree.id(), entry.path.clone()),
7063                                )
7064                            });
7065                        log::info!(
7066                            "{}: opening path {:?} in worktree {} ({})",
7067                            guest_username,
7068                            project_path.1,
7069                            project_path.0,
7070                            worktree_root_name,
7071                        );
7072                        let buffer = project
7073                            .update(cx, |project, cx| {
7074                                project.open_buffer(project_path.clone(), cx)
7075                            })
7076                            .await?;
7077                        log::info!(
7078                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
7079                            guest_username,
7080                            project_path.1,
7081                            project_path.0,
7082                            worktree_root_name,
7083                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
7084                        );
7085                        client.buffers.insert(buffer.clone());
7086                        buffer
7087                    } else {
7088                        client
7089                            .buffers
7090                            .iter()
7091                            .choose(&mut *rng.lock())
7092                            .unwrap()
7093                            .clone()
7094                    };
7095
7096                    let choice = rng.lock().gen_range(0..100);
7097                    match choice {
7098                        0..=9 => {
7099                            cx.update(|cx| {
7100                                log::info!(
7101                                    "{}: dropping buffer {:?}",
7102                                    guest_username,
7103                                    buffer.read(cx).file().unwrap().full_path(cx)
7104                                );
7105                                client.buffers.remove(&buffer);
7106                                drop(buffer);
7107                            });
7108                        }
7109                        10..=19 => {
7110                            let completions = project.update(cx, |project, cx| {
7111                                log::info!(
7112                                    "{}: requesting completions for buffer {} ({:?})",
7113                                    guest_username,
7114                                    buffer.read(cx).remote_id(),
7115                                    buffer.read(cx).file().unwrap().full_path(cx)
7116                                );
7117                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7118                                project.completions(&buffer, offset, cx)
7119                            });
7120                            let completions = cx.background().spawn(async move {
7121                                completions
7122                                    .await
7123                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
7124                            });
7125                            if rng.lock().gen_bool(0.3) {
7126                                log::info!("{}: detaching completions request", guest_username);
7127                                cx.update(|cx| completions.detach_and_log_err(cx));
7128                            } else {
7129                                completions.await?;
7130                            }
7131                        }
7132                        20..=29 => {
7133                            let code_actions = project.update(cx, |project, cx| {
7134                                log::info!(
7135                                    "{}: requesting code actions for buffer {} ({:?})",
7136                                    guest_username,
7137                                    buffer.read(cx).remote_id(),
7138                                    buffer.read(cx).file().unwrap().full_path(cx)
7139                                );
7140                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7141                                project.code_actions(&buffer, range, cx)
7142                            });
7143                            let code_actions = cx.background().spawn(async move {
7144                                code_actions.await.map_err(|err| {
7145                                    anyhow!("code actions request failed: {:?}", err)
7146                                })
7147                            });
7148                            if rng.lock().gen_bool(0.3) {
7149                                log::info!("{}: detaching code actions request", guest_username);
7150                                cx.update(|cx| code_actions.detach_and_log_err(cx));
7151                            } else {
7152                                code_actions.await?;
7153                            }
7154                        }
7155                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7156                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7157                                log::info!(
7158                                    "{}: saving buffer {} ({:?})",
7159                                    guest_username,
7160                                    buffer.remote_id(),
7161                                    buffer.file().unwrap().full_path(cx)
7162                                );
7163                                (buffer.version(), buffer.save(cx))
7164                            });
7165                            let save = cx.background().spawn(async move {
7166                                let (saved_version, _) = save
7167                                    .await
7168                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7169                                assert!(saved_version.observed_all(&requested_version));
7170                                Ok::<_, anyhow::Error>(())
7171                            });
7172                            if rng.lock().gen_bool(0.3) {
7173                                log::info!("{}: detaching save request", guest_username);
7174                                cx.update(|cx| save.detach_and_log_err(cx));
7175                            } else {
7176                                save.await?;
7177                            }
7178                        }
7179                        40..=44 => {
7180                            let prepare_rename = project.update(cx, |project, cx| {
7181                                log::info!(
7182                                    "{}: preparing rename for buffer {} ({:?})",
7183                                    guest_username,
7184                                    buffer.read(cx).remote_id(),
7185                                    buffer.read(cx).file().unwrap().full_path(cx)
7186                                );
7187                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7188                                project.prepare_rename(buffer, offset, cx)
7189                            });
7190                            let prepare_rename = cx.background().spawn(async move {
7191                                prepare_rename.await.map_err(|err| {
7192                                    anyhow!("prepare rename request failed: {:?}", err)
7193                                })
7194                            });
7195                            if rng.lock().gen_bool(0.3) {
7196                                log::info!("{}: detaching prepare rename request", guest_username);
7197                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7198                            } else {
7199                                prepare_rename.await?;
7200                            }
7201                        }
7202                        45..=49 => {
7203                            let definitions = project.update(cx, |project, cx| {
7204                                log::info!(
7205                                    "{}: requesting definitions for buffer {} ({:?})",
7206                                    guest_username,
7207                                    buffer.read(cx).remote_id(),
7208                                    buffer.read(cx).file().unwrap().full_path(cx)
7209                                );
7210                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7211                                project.definition(&buffer, offset, cx)
7212                            });
7213                            let definitions = cx.background().spawn(async move {
7214                                definitions
7215                                    .await
7216                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7217                            });
7218                            if rng.lock().gen_bool(0.3) {
7219                                log::info!("{}: detaching definitions request", guest_username);
7220                                cx.update(|cx| definitions.detach_and_log_err(cx));
7221                            } else {
7222                                client
7223                                    .buffers
7224                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7225                            }
7226                        }
7227                        50..=54 => {
7228                            let highlights = project.update(cx, |project, cx| {
7229                                log::info!(
7230                                    "{}: requesting highlights for buffer {} ({:?})",
7231                                    guest_username,
7232                                    buffer.read(cx).remote_id(),
7233                                    buffer.read(cx).file().unwrap().full_path(cx)
7234                                );
7235                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7236                                project.document_highlights(&buffer, offset, cx)
7237                            });
7238                            let highlights = cx.background().spawn(async move {
7239                                highlights
7240                                    .await
7241                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7242                            });
7243                            if rng.lock().gen_bool(0.3) {
7244                                log::info!("{}: detaching highlights request", guest_username);
7245                                cx.update(|cx| highlights.detach_and_log_err(cx));
7246                            } else {
7247                                highlights.await?;
7248                            }
7249                        }
7250                        55..=59 => {
7251                            let search = project.update(cx, |project, cx| {
7252                                let query = rng.lock().gen_range('a'..='z');
7253                                log::info!("{}: project-wide search {:?}", guest_username, query);
7254                                project.search(SearchQuery::text(query, false, false), cx)
7255                            });
7256                            let search = cx.background().spawn(async move {
7257                                search
7258                                    .await
7259                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
7260                            });
7261                            if rng.lock().gen_bool(0.3) {
7262                                log::info!("{}: detaching search request", guest_username);
7263                                cx.update(|cx| search.detach_and_log_err(cx));
7264                            } else {
7265                                client.buffers.extend(search.await?.into_keys());
7266                            }
7267                        }
7268                        60..=69 => {
7269                            let worktree = project
7270                                .read_with(cx, |project, cx| {
7271                                    project
7272                                        .worktrees(&cx)
7273                                        .filter(|worktree| {
7274                                            let worktree = worktree.read(cx);
7275                                            worktree.is_visible()
7276                                                && worktree.entries(false).any(|e| e.is_file())
7277                                                && worktree
7278                                                    .root_entry()
7279                                                    .map_or(false, |e| e.is_dir())
7280                                        })
7281                                        .choose(&mut *rng.lock())
7282                                })
7283                                .unwrap();
7284                            let (worktree_id, worktree_root_name) = worktree
7285                                .read_with(cx, |worktree, _| {
7286                                    (worktree.id(), worktree.root_name().to_string())
7287                                });
7288
7289                            let mut new_name = String::new();
7290                            for _ in 0..10 {
7291                                let letter = rng.lock().gen_range('a'..='z');
7292                                new_name.push(letter);
7293                            }
7294                            let mut new_path = PathBuf::new();
7295                            new_path.push(new_name);
7296                            new_path.set_extension("rs");
7297                            log::info!(
7298                                "{}: creating {:?} in worktree {} ({})",
7299                                guest_username,
7300                                new_path,
7301                                worktree_id,
7302                                worktree_root_name,
7303                            );
7304                            project
7305                                .update(cx, |project, cx| {
7306                                    project.create_entry((worktree_id, new_path), false, cx)
7307                                })
7308                                .unwrap()
7309                                .await?;
7310                        }
7311                        _ => {
7312                            buffer.update(cx, |buffer, cx| {
7313                                log::info!(
7314                                    "{}: updating buffer {} ({:?})",
7315                                    guest_username,
7316                                    buffer.remote_id(),
7317                                    buffer.file().unwrap().full_path(cx)
7318                                );
7319                                if rng.lock().gen_bool(0.7) {
7320                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7321                                } else {
7322                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7323                                }
7324                            });
7325                        }
7326                    }
7327                    cx.background().simulate_random_delay().await;
7328                }
7329                Ok(())
7330            }
7331
7332            let result = simulate_guest_internal(
7333                &mut self,
7334                &guest_username,
7335                project.clone(),
7336                op_start_signal,
7337                rng,
7338                &mut cx,
7339            )
7340            .await;
7341            log::info!("{}: done", guest_username);
7342
7343            self.project = Some(project);
7344            (self, cx, result.err())
7345        }
7346    }
7347
7348    impl Drop for TestClient {
7349        fn drop(&mut self) {
7350            self.client.tear_down();
7351        }
7352    }
7353
7354    impl Executor for Arc<gpui::executor::Background> {
7355        type Sleep = gpui::executor::Timer;
7356
7357        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7358            self.spawn(future).detach();
7359        }
7360
7361        fn sleep(&self, duration: Duration) -> Self::Sleep {
7362            self.as_ref().timer(duration)
7363        }
7364    }
7365
7366    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7367        channel
7368            .messages()
7369            .cursor::<()>()
7370            .map(|m| {
7371                (
7372                    m.sender.github_login.clone(),
7373                    m.body.clone(),
7374                    m.is_pending(),
7375                )
7376            })
7377            .collect()
7378    }
7379
7380    struct EmptyView;
7381
7382    impl gpui::Entity for EmptyView {
7383        type Event = ();
7384    }
7385
7386    impl gpui::View for EmptyView {
7387        fn ui_name() -> &'static str {
7388            "empty view"
7389        }
7390
7391        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7392            gpui::Element::boxed(gpui::elements::Empty::new())
7393        }
7394    }
7395}