rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{self, ChannelId, MessageId, User, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::IntoResponse,
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::HashMap;
  26use futures::{
  27    channel::mpsc,
  28    future::{self, BoxFuture},
  29    FutureExt, SinkExt, StreamExt, TryStreamExt,
  30};
  31use lazy_static::lazy_static;
  32use rpc::{
  33    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  34    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  35};
  36use std::{
  37    any::TypeId,
  38    future::Future,
  39    marker::PhantomData,
  40    net::SocketAddr,
  41    ops::{Deref, DerefMut},
  42    rc::Rc,
  43    sync::{
  44        atomic::{AtomicBool, Ordering::SeqCst},
  45        Arc,
  46    },
  47    time::Duration,
  48};
  49use store::{Store, Worktree};
  50use time::OffsetDateTime;
  51use tokio::{
  52    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  53    time::Sleep,
  54};
  55use tower::ServiceBuilder;
  56use tracing::{info_span, instrument, Instrument};
  57
  58type MessageHandler =
  59    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  60
  61struct Response<R> {
  62    server: Arc<Server>,
  63    receipt: Receipt<R>,
  64    responded: Arc<AtomicBool>,
  65}
  66
  67impl<R: RequestMessage> Response<R> {
  68    fn send(self, payload: R::Response) -> Result<()> {
  69        self.responded.store(true, SeqCst);
  70        self.server.peer.respond(self.receipt, payload)?;
  71        Ok(())
  72    }
  73
  74    fn into_receipt(self) -> Receipt<R> {
  75        self.responded.store(true, SeqCst);
  76        self.receipt
  77    }
  78}
  79
  80pub struct Server {
  81    peer: Arc<Peer>,
  82    store: RwLock<Store>,
  83    app_state: Arc<AppState>,
  84    handlers: HashMap<TypeId, MessageHandler>,
  85    notifications: Option<mpsc::UnboundedSender<()>>,
  86}
  87
  88pub trait Executor: Send + Clone {
  89    type Sleep: Send + Future;
  90    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  91    fn sleep(&self, duration: Duration) -> Self::Sleep;
  92}
  93
  94#[derive(Clone)]
  95pub struct RealExecutor;
  96
  97const MESSAGE_COUNT_PER_PAGE: usize = 100;
  98const MAX_MESSAGE_LEN: usize = 1024;
  99
 100struct StoreReadGuard<'a> {
 101    guard: RwLockReadGuard<'a, Store>,
 102    _not_send: PhantomData<Rc<()>>,
 103}
 104
 105struct StoreWriteGuard<'a> {
 106    guard: RwLockWriteGuard<'a, Store>,
 107    _not_send: PhantomData<Rc<()>>,
 108}
 109
 110impl Server {
 111    pub fn new(
 112        app_state: Arc<AppState>,
 113        notifications: Option<mpsc::UnboundedSender<()>>,
 114    ) -> Arc<Self> {
 115        let mut server = Self {
 116            peer: Peer::new(),
 117            app_state,
 118            store: Default::default(),
 119            handlers: Default::default(),
 120            notifications,
 121        };
 122
 123        server
 124            .add_request_handler(Server::ping)
 125            .add_request_handler(Server::register_project)
 126            .add_message_handler(Server::unregister_project)
 127            .add_request_handler(Server::join_project)
 128            .add_message_handler(Server::leave_project)
 129            .add_message_handler(Server::respond_to_join_project_request)
 130            .add_request_handler(Server::register_worktree)
 131            .add_message_handler(Server::unregister_worktree)
 132            .add_request_handler(Server::update_worktree)
 133            .add_message_handler(Server::start_language_server)
 134            .add_message_handler(Server::update_language_server)
 135            .add_message_handler(Server::update_diagnostic_summary)
 136            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 137            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 138            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 139            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 140            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 141            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 142            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 143            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 144            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 145            .add_request_handler(
 146                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 147            )
 148            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 149            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 150            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 151            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 152            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 153            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 154            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 155            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 156            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 157            .add_request_handler(Server::update_buffer)
 158            .add_message_handler(Server::update_buffer_file)
 159            .add_message_handler(Server::buffer_reloaded)
 160            .add_message_handler(Server::buffer_saved)
 161            .add_request_handler(Server::save_buffer)
 162            .add_request_handler(Server::get_channels)
 163            .add_request_handler(Server::get_users)
 164            .add_request_handler(Server::fuzzy_search_users)
 165            .add_request_handler(Server::request_contact)
 166            .add_request_handler(Server::remove_contact)
 167            .add_request_handler(Server::respond_to_contact_request)
 168            .add_request_handler(Server::join_channel)
 169            .add_message_handler(Server::leave_channel)
 170            .add_request_handler(Server::send_channel_message)
 171            .add_request_handler(Server::follow)
 172            .add_message_handler(Server::unfollow)
 173            .add_message_handler(Server::update_followers)
 174            .add_request_handler(Server::get_channel_messages);
 175
 176        Arc::new(server)
 177    }
 178
 179    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 180    where
 181        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 182        Fut: 'static + Send + Future<Output = Result<()>>,
 183        M: EnvelopedMessage,
 184    {
 185        let prev_handler = self.handlers.insert(
 186            TypeId::of::<M>(),
 187            Box::new(move |server, envelope| {
 188                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 189                let span = info_span!(
 190                    "handle message",
 191                    payload_type = envelope.payload_type_name(),
 192                    payload = format!("{:?}", envelope.payload).as_str(),
 193                );
 194                let future = (handler)(server, *envelope);
 195                async move {
 196                    if let Err(error) = future.await {
 197                        tracing::error!(%error, "error handling message");
 198                    }
 199                }
 200                .instrument(span)
 201                .boxed()
 202            }),
 203        );
 204        if prev_handler.is_some() {
 205            panic!("registered a handler for the same message twice");
 206        }
 207        self
 208    }
 209
 210    /// Handle a request while holding a lock to the store. This is useful when we're registering
 211    /// a connection but we want to respond on the connection before anybody else can send on it.
 212    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 213    where
 214        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
 215        Fut: Send + Future<Output = Result<()>>,
 216        M: RequestMessage,
 217    {
 218        let handler = Arc::new(handler);
 219        self.add_message_handler(move |server, envelope| {
 220            let receipt = envelope.receipt();
 221            let handler = handler.clone();
 222            async move {
 223                let responded = Arc::new(AtomicBool::default());
 224                let response = Response {
 225                    server: server.clone(),
 226                    responded: responded.clone(),
 227                    receipt: envelope.receipt(),
 228                };
 229                match (handler)(server.clone(), envelope, response).await {
 230                    Ok(()) => {
 231                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 232                            Ok(())
 233                        } else {
 234                            Err(anyhow!("handler did not send a response"))?
 235                        }
 236                    }
 237                    Err(error) => {
 238                        server.peer.respond_with_error(
 239                            receipt,
 240                            proto::Error {
 241                                message: error.to_string(),
 242                            },
 243                        )?;
 244                        Err(error)
 245                    }
 246                }
 247            }
 248        })
 249    }
 250
 251    pub fn handle_connection<E: Executor>(
 252        self: &Arc<Self>,
 253        connection: Connection,
 254        address: String,
 255        user: User,
 256        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 257        executor: E,
 258    ) -> impl Future<Output = Result<()>> {
 259        let mut this = self.clone();
 260        let user_id = user.id;
 261        let login = user.github_login;
 262        let span = info_span!("handle connection", %user_id, %login, %address);
 263        async move {
 264            let (connection_id, handle_io, mut incoming_rx) = this
 265                .peer
 266                .add_connection(connection, {
 267                    let executor = executor.clone();
 268                    move |duration| {
 269                        let timer = executor.sleep(duration);
 270                        async move {
 271                            timer.await;
 272                        }
 273                    }
 274                })
 275                .await;
 276
 277            tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
 278
 279            if let Some(send_connection_id) = send_connection_id.as_mut() {
 280                let _ = send_connection_id.send(connection_id).await;
 281            }
 282
 283            let (contacts, invite_code) = future::try_join(
 284                this.app_state.db.get_contacts(user_id),
 285                this.app_state.db.get_invite_code_for_user(user_id)
 286            ).await?;
 287
 288            {
 289                let mut store = this.store_mut().await;
 290                store.add_connection(connection_id, user_id);
 291                this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 292                
 293                if let Some((code, count)) = invite_code {
 294                    this.peer.send(connection_id, proto::UpdateInviteInfo {
 295                        url: format!("{}{}", this.app_state.invite_link_prefix, code),
 296                        count,
 297                    })?;
 298                }
 299            }
 300            this.update_user_contacts(user_id).await?;
 301
 302            let handle_io = handle_io.fuse();
 303            futures::pin_mut!(handle_io);
 304            loop {
 305                let next_message = incoming_rx.next().fuse();
 306                futures::pin_mut!(next_message);
 307                futures::select_biased! {
 308                    result = handle_io => {
 309                        if let Err(error) = result {
 310                            tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
 311                        }
 312                        break;
 313                    }
 314                    message = next_message => {
 315                        if let Some(message) = message {
 316                            let type_name = message.payload_type_name();
 317                            let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
 318                            async {
 319                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 320                                    let notifications = this.notifications.clone();
 321                                    let is_background = message.is_background();
 322                                    let handle_message = (handler)(this.clone(), message);
 323                                    let handle_message = async move {
 324                                        handle_message.await;
 325                                        if let Some(mut notifications) = notifications {
 326                                            let _ = notifications.send(()).await;
 327                                        }
 328                                    };
 329                                    if is_background {
 330                                        executor.spawn_detached(handle_message);
 331                                    } else {
 332                                        handle_message.await;
 333                                    }
 334                                } else {
 335                                    tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
 336                                }
 337                            }.instrument(span).await;
 338                        } else {
 339                            tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
 340                            break;
 341                        }
 342                    }
 343                }
 344            }
 345
 346            tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
 347            if let Err(error) = this.sign_out(connection_id).await {
 348                tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
 349            }
 350
 351            Ok(())
 352        }.instrument(span)
 353    }
 354
 355    #[instrument(skip(self), err)]
 356    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 357        self.peer.disconnect(connection_id);
 358
 359        let removed_user_id = {
 360            let mut store = self.store_mut().await;
 361            let removed_connection = store.remove_connection(connection_id)?;
 362
 363            for (project_id, project) in removed_connection.hosted_projects {
 364                broadcast(connection_id, project.guests.keys().copied(), |conn_id| {
 365                    self.peer
 366                        .send(conn_id, proto::UnregisterProject { project_id })
 367                });
 368
 369                for (_, receipts) in project.join_requests {
 370                    for receipt in receipts {
 371                        self.peer.respond(
 372                            receipt,
 373                            proto::JoinProjectResponse {
 374                                variant: Some(proto::join_project_response::Variant::Decline(
 375                                    proto::join_project_response::Decline {
 376                                        reason: proto::join_project_response::decline::Reason::WentOffline as i32
 377                                    },
 378                                )),
 379                            },
 380                        )?;
 381                    }
 382                }
 383            }
 384
 385            for project_id in removed_connection.guest_project_ids {
 386                if let Some(project) = store.project(project_id).trace_err() {
 387                    broadcast(connection_id, project.connection_ids(), |conn_id| {
 388                        self.peer.send(
 389                            conn_id,
 390                            proto::RemoveProjectCollaborator {
 391                                project_id,
 392                                peer_id: connection_id.0,
 393                            },
 394                        )
 395                    });
 396                    if project.guests.is_empty() {
 397                        self.peer
 398                            .send(
 399                                project.host_connection_id,
 400                                proto::ProjectUnshared { project_id },
 401                            )
 402                            .trace_err();
 403                    }
 404                }
 405            }
 406
 407            removed_connection.user_id
 408        };
 409
 410        self.update_user_contacts(removed_user_id).await?;
 411
 412        Ok(())
 413    }
 414
 415    pub async fn invite_code_redeemed(self: &Arc<Self>, code: &str, invitee_id: UserId) -> Result<()> {
 416        let user = self.app_state.db.get_user_for_invite_code(code).await?;
 417        let store = self.store().await;
 418        let invitee_contact = store.contact_for_user(invitee_id, true);
 419        for connection_id in store.connection_ids_for_user(user.id) {
 420            self.peer.send(connection_id, proto::UpdateContacts {
 421                contacts: vec![invitee_contact.clone()],
 422                ..Default::default()
 423            })?;
 424            self.peer.send(connection_id, proto::UpdateInviteInfo {
 425                url: format!("{}{}", self.app_state.invite_link_prefix, code),
 426                count: user.invite_count as u32,
 427            })?;
 428        }
 429        Ok(())
 430    }
 431
 432    async fn ping(
 433        self: Arc<Server>,
 434        _: TypedEnvelope<proto::Ping>,
 435        response: Response<proto::Ping>,
 436    ) -> Result<()> {
 437        response.send(proto::Ack {})?;
 438        Ok(())
 439    }
 440
 441    async fn register_project(
 442        self: Arc<Server>,
 443        request: TypedEnvelope<proto::RegisterProject>,
 444        response: Response<proto::RegisterProject>,
 445    ) -> Result<()> {
 446        let user_id;
 447        let project_id;
 448        {
 449            let mut state = self.store_mut().await;
 450            user_id = state.user_id_for_connection(request.sender_id)?;
 451            project_id = state.register_project(request.sender_id, user_id);
 452        };
 453        self.update_user_contacts(user_id).await?;
 454        response.send(proto::RegisterProjectResponse { project_id })?;
 455        Ok(())
 456    }
 457
 458    async fn unregister_project(
 459        self: Arc<Server>,
 460        request: TypedEnvelope<proto::UnregisterProject>,
 461    ) -> Result<()> {
 462        let (user_id, project) = {
 463            let mut state = self.store_mut().await;
 464            let project =
 465                state.unregister_project(request.payload.project_id, request.sender_id)?;
 466            (state.user_id_for_connection(request.sender_id)?, project)
 467        };
 468        for (_, receipts) in project.join_requests {
 469            for receipt in receipts {
 470                self.peer.respond(
 471                    receipt,
 472                    proto::JoinProjectResponse {
 473                        variant: Some(proto::join_project_response::Variant::Decline(
 474                            proto::join_project_response::Decline {
 475                                reason: proto::join_project_response::decline::Reason::Closed
 476                                    as i32,
 477                            },
 478                        )),
 479                    },
 480                )?;
 481            }
 482        }
 483
 484        self.update_user_contacts(user_id).await?;
 485        Ok(())
 486    }
 487
 488    async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
 489        let contacts = self.app_state.db.get_contacts(user_id).await?;
 490        let store = self.store().await;
 491        let updated_contact = store.contact_for_user(user_id, false);
 492        for contact in contacts {
 493            if let db::Contact::Accepted {
 494                user_id: contact_user_id,
 495                ..
 496            } = contact
 497            {
 498                for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
 499                    self.peer
 500                        .send(
 501                            contact_conn_id,
 502                            proto::UpdateContacts {
 503                                contacts: vec![updated_contact.clone()],
 504                                remove_contacts: Default::default(),
 505                                incoming_requests: Default::default(),
 506                                remove_incoming_requests: Default::default(),
 507                                outgoing_requests: Default::default(),
 508                                remove_outgoing_requests: Default::default(),
 509                            },
 510                        )
 511                        .trace_err();
 512                }
 513            }
 514        }
 515        Ok(())
 516    }
 517
 518    async fn join_project(
 519        self: Arc<Server>,
 520        request: TypedEnvelope<proto::JoinProject>,
 521        response: Response<proto::JoinProject>,
 522    ) -> Result<()> {
 523        let project_id = request.payload.project_id;
 524        let host_user_id;
 525        let guest_user_id;
 526        let host_connection_id;
 527        {
 528            let state = self.store().await;
 529            let project = state.project(project_id)?;
 530            host_user_id = project.host_user_id;
 531            host_connection_id = project.host_connection_id;
 532            guest_user_id = state.user_id_for_connection(request.sender_id)?;
 533        };
 534
 535        let has_contact = self
 536            .app_state
 537            .db
 538            .has_contact(guest_user_id, host_user_id)
 539            .await?;
 540        if !has_contact {
 541            return Err(anyhow!("no such project"))?;
 542        }
 543
 544        self.store_mut().await.request_join_project(
 545            guest_user_id,
 546            project_id,
 547            response.into_receipt(),
 548        )?;
 549        self.peer.send(
 550            host_connection_id,
 551            proto::RequestJoinProject {
 552                project_id,
 553                requester_id: guest_user_id.to_proto(),
 554            },
 555        )?;
 556        Ok(())
 557    }
 558
 559    async fn respond_to_join_project_request(
 560        self: Arc<Server>,
 561        request: TypedEnvelope<proto::RespondToJoinProjectRequest>,
 562    ) -> Result<()> {
 563        let host_user_id;
 564
 565        {
 566            let mut state = self.store_mut().await;
 567            let project_id = request.payload.project_id;
 568            let project = state.project(project_id)?;
 569            if project.host_connection_id != request.sender_id {
 570                Err(anyhow!("no such connection"))?;
 571            }
 572
 573            host_user_id = project.host_user_id;
 574            let guest_user_id = UserId::from_proto(request.payload.requester_id);
 575
 576            if !request.payload.allow {
 577                let receipts = state
 578                    .deny_join_project_request(request.sender_id, guest_user_id, project_id)
 579                    .ok_or_else(|| anyhow!("no such request"))?;
 580                for receipt in receipts {
 581                    self.peer.respond(
 582                        receipt,
 583                        proto::JoinProjectResponse {
 584                            variant: Some(proto::join_project_response::Variant::Decline(
 585                                proto::join_project_response::Decline {
 586                                    reason: proto::join_project_response::decline::Reason::Declined
 587                                        as i32,
 588                                },
 589                            )),
 590                        },
 591                    )?;
 592                }
 593                return Ok(());
 594            }
 595
 596            let (receipts_with_replica_ids, project) = state
 597                .accept_join_project_request(request.sender_id, guest_user_id, project_id)
 598                .ok_or_else(|| anyhow!("no such request"))?;
 599
 600            let peer_count = project.guests.len();
 601            let mut collaborators = Vec::with_capacity(peer_count);
 602            collaborators.push(proto::Collaborator {
 603                peer_id: project.host_connection_id.0,
 604                replica_id: 0,
 605                user_id: project.host_user_id.to_proto(),
 606            });
 607            let worktrees = project
 608                .worktrees
 609                .iter()
 610                .filter_map(|(id, shared_worktree)| {
 611                    let worktree = project.worktrees.get(&id)?;
 612                    Some(proto::Worktree {
 613                        id: *id,
 614                        root_name: worktree.root_name.clone(),
 615                        entries: shared_worktree.entries.values().cloned().collect(),
 616                        diagnostic_summaries: shared_worktree
 617                            .diagnostic_summaries
 618                            .values()
 619                            .cloned()
 620                            .collect(),
 621                        visible: worktree.visible,
 622                        scan_id: shared_worktree.scan_id,
 623                    })
 624                })
 625                .collect::<Vec<_>>();
 626
 627            // Add all guests other than the requesting user's own connections as collaborators
 628            for (peer_conn_id, (peer_replica_id, peer_user_id)) in &project.guests {
 629                if receipts_with_replica_ids
 630                    .iter()
 631                    .all(|(receipt, _)| receipt.sender_id != *peer_conn_id)
 632                {
 633                    collaborators.push(proto::Collaborator {
 634                        peer_id: peer_conn_id.0,
 635                        replica_id: *peer_replica_id as u32,
 636                        user_id: peer_user_id.to_proto(),
 637                    });
 638                }
 639            }
 640
 641            for conn_id in project.connection_ids() {
 642                for (receipt, replica_id) in &receipts_with_replica_ids {
 643                    if conn_id != receipt.sender_id {
 644                        self.peer.send(
 645                            conn_id,
 646                            proto::AddProjectCollaborator {
 647                                project_id,
 648                                collaborator: Some(proto::Collaborator {
 649                                    peer_id: receipt.sender_id.0,
 650                                    replica_id: *replica_id as u32,
 651                                    user_id: guest_user_id.to_proto(),
 652                                }),
 653                            },
 654                        )?;
 655                    }
 656                }
 657            }
 658
 659            for (receipt, replica_id) in receipts_with_replica_ids {
 660                self.peer.respond(
 661                    receipt,
 662                    proto::JoinProjectResponse {
 663                        variant: Some(proto::join_project_response::Variant::Accept(
 664                            proto::join_project_response::Accept {
 665                                worktrees: worktrees.clone(),
 666                                replica_id: replica_id as u32,
 667                                collaborators: collaborators.clone(),
 668                                language_servers: project.language_servers.clone(),
 669                            },
 670                        )),
 671                    },
 672                )?;
 673            }
 674        }
 675
 676        self.update_user_contacts(host_user_id).await?;
 677        Ok(())
 678    }
 679
 680    async fn leave_project(
 681        self: Arc<Server>,
 682        request: TypedEnvelope<proto::LeaveProject>,
 683    ) -> Result<()> {
 684        let sender_id = request.sender_id;
 685        let project_id = request.payload.project_id;
 686        let project;
 687        {
 688            let mut store = self.store_mut().await;
 689            project = store.leave_project(sender_id, project_id)?;
 690
 691            if project.remove_collaborator {
 692                broadcast(sender_id, project.connection_ids, |conn_id| {
 693                    self.peer.send(
 694                        conn_id,
 695                        proto::RemoveProjectCollaborator {
 696                            project_id,
 697                            peer_id: sender_id.0,
 698                        },
 699                    )
 700                });
 701            }
 702
 703            if let Some(requester_id) = project.cancel_request {
 704                self.peer.send(
 705                    project.host_connection_id,
 706                    proto::JoinProjectRequestCancelled {
 707                        project_id,
 708                        requester_id: requester_id.to_proto(),
 709                    },
 710                )?;
 711            }
 712
 713            if project.unshare {
 714                self.peer.send(
 715                    project.host_connection_id,
 716                    proto::ProjectUnshared { project_id },
 717                )?;
 718            }
 719        }
 720        self.update_user_contacts(project.host_user_id).await?;
 721        Ok(())
 722    }
 723
 724    async fn register_worktree(
 725        self: Arc<Server>,
 726        request: TypedEnvelope<proto::RegisterWorktree>,
 727        response: Response<proto::RegisterWorktree>,
 728    ) -> Result<()> {
 729        let host_user_id;
 730        {
 731            let mut state = self.store_mut().await;
 732            host_user_id = state.user_id_for_connection(request.sender_id)?;
 733
 734            let guest_connection_ids = state
 735                .read_project(request.payload.project_id, request.sender_id)?
 736                .guest_connection_ids();
 737            state.register_worktree(
 738                request.payload.project_id,
 739                request.payload.worktree_id,
 740                request.sender_id,
 741                Worktree {
 742                    root_name: request.payload.root_name.clone(),
 743                    visible: request.payload.visible,
 744                    ..Default::default()
 745                },
 746            )?;
 747
 748            broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 749                self.peer
 750                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 751            });
 752        }
 753        self.update_user_contacts(host_user_id).await?;
 754        response.send(proto::Ack {})?;
 755        Ok(())
 756    }
 757
 758    async fn unregister_worktree(
 759        self: Arc<Server>,
 760        request: TypedEnvelope<proto::UnregisterWorktree>,
 761    ) -> Result<()> {
 762        let host_user_id;
 763        let project_id = request.payload.project_id;
 764        let worktree_id = request.payload.worktree_id;
 765        {
 766            let mut state = self.store_mut().await;
 767            let (_, guest_connection_ids) =
 768                state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 769            host_user_id = state.user_id_for_connection(request.sender_id)?;
 770            broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 771                self.peer.send(
 772                    conn_id,
 773                    proto::UnregisterWorktree {
 774                        project_id,
 775                        worktree_id,
 776                    },
 777                )
 778            });
 779        }
 780        self.update_user_contacts(host_user_id).await?;
 781        Ok(())
 782    }
 783
 784    async fn update_worktree(
 785        self: Arc<Server>,
 786        request: TypedEnvelope<proto::UpdateWorktree>,
 787        response: Response<proto::UpdateWorktree>,
 788    ) -> Result<()> {
 789        let connection_ids = self.store_mut().await.update_worktree(
 790            request.sender_id,
 791            request.payload.project_id,
 792            request.payload.worktree_id,
 793            &request.payload.removed_entries,
 794            &request.payload.updated_entries,
 795            request.payload.scan_id,
 796        )?;
 797
 798        broadcast(request.sender_id, connection_ids, |connection_id| {
 799            self.peer
 800                .forward_send(request.sender_id, connection_id, request.payload.clone())
 801        });
 802        response.send(proto::Ack {})?;
 803        Ok(())
 804    }
 805
 806    async fn update_diagnostic_summary(
 807        self: Arc<Server>,
 808        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 809    ) -> Result<()> {
 810        let summary = request
 811            .payload
 812            .summary
 813            .clone()
 814            .ok_or_else(|| anyhow!("invalid summary"))?;
 815        let receiver_ids = self.store_mut().await.update_diagnostic_summary(
 816            request.payload.project_id,
 817            request.payload.worktree_id,
 818            request.sender_id,
 819            summary,
 820        )?;
 821
 822        broadcast(request.sender_id, receiver_ids, |connection_id| {
 823            self.peer
 824                .forward_send(request.sender_id, connection_id, request.payload.clone())
 825        });
 826        Ok(())
 827    }
 828
 829    async fn start_language_server(
 830        self: Arc<Server>,
 831        request: TypedEnvelope<proto::StartLanguageServer>,
 832    ) -> Result<()> {
 833        let receiver_ids = self.store_mut().await.start_language_server(
 834            request.payload.project_id,
 835            request.sender_id,
 836            request
 837                .payload
 838                .server
 839                .clone()
 840                .ok_or_else(|| anyhow!("invalid language server"))?,
 841        )?;
 842        broadcast(request.sender_id, receiver_ids, |connection_id| {
 843            self.peer
 844                .forward_send(request.sender_id, connection_id, request.payload.clone())
 845        });
 846        Ok(())
 847    }
 848
 849    async fn update_language_server(
 850        self: Arc<Server>,
 851        request: TypedEnvelope<proto::UpdateLanguageServer>,
 852    ) -> Result<()> {
 853        let receiver_ids = self
 854            .store()
 855            .await
 856            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 857        broadcast(request.sender_id, receiver_ids, |connection_id| {
 858            self.peer
 859                .forward_send(request.sender_id, connection_id, request.payload.clone())
 860        });
 861        Ok(())
 862    }
 863
 864    async fn forward_project_request<T>(
 865        self: Arc<Server>,
 866        request: TypedEnvelope<T>,
 867        response: Response<T>,
 868    ) -> Result<()>
 869    where
 870        T: EntityMessage + RequestMessage,
 871    {
 872        let host_connection_id = self
 873            .store()
 874            .await
 875            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 876            .host_connection_id;
 877
 878        response.send(
 879            self.peer
 880                .forward_request(request.sender_id, host_connection_id, request.payload)
 881                .await?,
 882        )?;
 883        Ok(())
 884    }
 885
 886    async fn save_buffer(
 887        self: Arc<Server>,
 888        request: TypedEnvelope<proto::SaveBuffer>,
 889        response: Response<proto::SaveBuffer>,
 890    ) -> Result<()> {
 891        let host = self
 892            .store()
 893            .await
 894            .read_project(request.payload.project_id, request.sender_id)?
 895            .host_connection_id;
 896        let response_payload = self
 897            .peer
 898            .forward_request(request.sender_id, host, request.payload.clone())
 899            .await?;
 900
 901        let mut guests = self
 902            .store()
 903            .await
 904            .read_project(request.payload.project_id, request.sender_id)?
 905            .connection_ids();
 906        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 907        broadcast(host, guests, |conn_id| {
 908            self.peer
 909                .forward_send(host, conn_id, response_payload.clone())
 910        });
 911        response.send(response_payload)?;
 912        Ok(())
 913    }
 914
 915    async fn update_buffer(
 916        self: Arc<Server>,
 917        request: TypedEnvelope<proto::UpdateBuffer>,
 918        response: Response<proto::UpdateBuffer>,
 919    ) -> Result<()> {
 920        let receiver_ids = self
 921            .store()
 922            .await
 923            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 924        broadcast(request.sender_id, receiver_ids, |connection_id| {
 925            self.peer
 926                .forward_send(request.sender_id, connection_id, request.payload.clone())
 927        });
 928        response.send(proto::Ack {})?;
 929        Ok(())
 930    }
 931
 932    async fn update_buffer_file(
 933        self: Arc<Server>,
 934        request: TypedEnvelope<proto::UpdateBufferFile>,
 935    ) -> Result<()> {
 936        let receiver_ids = self
 937            .store()
 938            .await
 939            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 940        broadcast(request.sender_id, receiver_ids, |connection_id| {
 941            self.peer
 942                .forward_send(request.sender_id, connection_id, request.payload.clone())
 943        });
 944        Ok(())
 945    }
 946
 947    async fn buffer_reloaded(
 948        self: Arc<Server>,
 949        request: TypedEnvelope<proto::BufferReloaded>,
 950    ) -> Result<()> {
 951        let receiver_ids = self
 952            .store()
 953            .await
 954            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 955        broadcast(request.sender_id, receiver_ids, |connection_id| {
 956            self.peer
 957                .forward_send(request.sender_id, connection_id, request.payload.clone())
 958        });
 959        Ok(())
 960    }
 961
 962    async fn buffer_saved(
 963        self: Arc<Server>,
 964        request: TypedEnvelope<proto::BufferSaved>,
 965    ) -> Result<()> {
 966        let receiver_ids = self
 967            .store()
 968            .await
 969            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 970        broadcast(request.sender_id, receiver_ids, |connection_id| {
 971            self.peer
 972                .forward_send(request.sender_id, connection_id, request.payload.clone())
 973        });
 974        Ok(())
 975    }
 976
 977    async fn follow(
 978        self: Arc<Self>,
 979        request: TypedEnvelope<proto::Follow>,
 980        response: Response<proto::Follow>,
 981    ) -> Result<()> {
 982        let leader_id = ConnectionId(request.payload.leader_id);
 983        let follower_id = request.sender_id;
 984        if !self
 985            .store()
 986            .await
 987            .project_connection_ids(request.payload.project_id, follower_id)?
 988            .contains(&leader_id)
 989        {
 990            Err(anyhow!("no such peer"))?;
 991        }
 992        let mut response_payload = self
 993            .peer
 994            .forward_request(request.sender_id, leader_id, request.payload)
 995            .await?;
 996        response_payload
 997            .views
 998            .retain(|view| view.leader_id != Some(follower_id.0));
 999        response.send(response_payload)?;
1000        Ok(())
1001    }
1002
1003    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
1004        let leader_id = ConnectionId(request.payload.leader_id);
1005        if !self
1006            .store()
1007            .await
1008            .project_connection_ids(request.payload.project_id, request.sender_id)?
1009            .contains(&leader_id)
1010        {
1011            Err(anyhow!("no such peer"))?;
1012        }
1013        self.peer
1014            .forward_send(request.sender_id, leader_id, request.payload)?;
1015        Ok(())
1016    }
1017
1018    async fn update_followers(
1019        self: Arc<Self>,
1020        request: TypedEnvelope<proto::UpdateFollowers>,
1021    ) -> Result<()> {
1022        let connection_ids = self
1023            .store()
1024            .await
1025            .project_connection_ids(request.payload.project_id, request.sender_id)?;
1026        let leader_id = request
1027            .payload
1028            .variant
1029            .as_ref()
1030            .and_then(|variant| match variant {
1031                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
1032                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
1033                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
1034            });
1035        for follower_id in &request.payload.follower_ids {
1036            let follower_id = ConnectionId(*follower_id);
1037            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
1038                self.peer
1039                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
1040            }
1041        }
1042        Ok(())
1043    }
1044
1045    async fn get_channels(
1046        self: Arc<Server>,
1047        request: TypedEnvelope<proto::GetChannels>,
1048        response: Response<proto::GetChannels>,
1049    ) -> Result<()> {
1050        let user_id = self
1051            .store()
1052            .await
1053            .user_id_for_connection(request.sender_id)?;
1054        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
1055        response.send(proto::GetChannelsResponse {
1056            channels: channels
1057                .into_iter()
1058                .map(|chan| proto::Channel {
1059                    id: chan.id.to_proto(),
1060                    name: chan.name,
1061                })
1062                .collect(),
1063        })?;
1064        Ok(())
1065    }
1066
1067    async fn get_users(
1068        self: Arc<Server>,
1069        request: TypedEnvelope<proto::GetUsers>,
1070        response: Response<proto::GetUsers>,
1071    ) -> Result<()> {
1072        let user_ids = request
1073            .payload
1074            .user_ids
1075            .into_iter()
1076            .map(UserId::from_proto)
1077            .collect();
1078        let users = self
1079            .app_state
1080            .db
1081            .get_users_by_ids(user_ids)
1082            .await?
1083            .into_iter()
1084            .map(|user| proto::User {
1085                id: user.id.to_proto(),
1086                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1087                github_login: user.github_login,
1088            })
1089            .collect();
1090        response.send(proto::UsersResponse { users })?;
1091        Ok(())
1092    }
1093
1094    async fn fuzzy_search_users(
1095        self: Arc<Server>,
1096        request: TypedEnvelope<proto::FuzzySearchUsers>,
1097        response: Response<proto::FuzzySearchUsers>,
1098    ) -> Result<()> {
1099        let user_id = self
1100            .store()
1101            .await
1102            .user_id_for_connection(request.sender_id)?;
1103        let query = request.payload.query;
1104        let db = &self.app_state.db;
1105        let users = match query.len() {
1106            0 => vec![],
1107            1 | 2 => db
1108                .get_user_by_github_login(&query)
1109                .await?
1110                .into_iter()
1111                .collect(),
1112            _ => db.fuzzy_search_users(&query, 10).await?,
1113        };
1114        let users = users
1115            .into_iter()
1116            .filter(|user| user.id != user_id)
1117            .map(|user| proto::User {
1118                id: user.id.to_proto(),
1119                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1120                github_login: user.github_login,
1121            })
1122            .collect();
1123        response.send(proto::UsersResponse { users })?;
1124        Ok(())
1125    }
1126
1127    async fn request_contact(
1128        self: Arc<Server>,
1129        request: TypedEnvelope<proto::RequestContact>,
1130        response: Response<proto::RequestContact>,
1131    ) -> Result<()> {
1132        let requester_id = self
1133            .store()
1134            .await
1135            .user_id_for_connection(request.sender_id)?;
1136        let responder_id = UserId::from_proto(request.payload.responder_id);
1137        if requester_id == responder_id {
1138            return Err(anyhow!("cannot add yourself as a contact"))?;
1139        }
1140
1141        self.app_state
1142            .db
1143            .send_contact_request(requester_id, responder_id)
1144            .await?;
1145
1146        // Update outgoing contact requests of requester
1147        let mut update = proto::UpdateContacts::default();
1148        update.outgoing_requests.push(responder_id.to_proto());
1149        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1150            self.peer.send(connection_id, update.clone())?;
1151        }
1152
1153        // Update incoming contact requests of responder
1154        let mut update = proto::UpdateContacts::default();
1155        update
1156            .incoming_requests
1157            .push(proto::IncomingContactRequest {
1158                requester_id: requester_id.to_proto(),
1159                should_notify: true,
1160            });
1161        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1162            self.peer.send(connection_id, update.clone())?;
1163        }
1164
1165        response.send(proto::Ack {})?;
1166        Ok(())
1167    }
1168
1169    async fn respond_to_contact_request(
1170        self: Arc<Server>,
1171        request: TypedEnvelope<proto::RespondToContactRequest>,
1172        response: Response<proto::RespondToContactRequest>,
1173    ) -> Result<()> {
1174        let responder_id = self
1175            .store()
1176            .await
1177            .user_id_for_connection(request.sender_id)?;
1178        let requester_id = UserId::from_proto(request.payload.requester_id);
1179        if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1180            self.app_state
1181                .db
1182                .dismiss_contact_notification(responder_id, requester_id)
1183                .await?;
1184        } else {
1185            let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1186            self.app_state
1187                .db
1188                .respond_to_contact_request(responder_id, requester_id, accept)
1189                .await?;
1190
1191            let store = self.store().await;
1192            // Update responder with new contact
1193            let mut update = proto::UpdateContacts::default();
1194            if accept {
1195                update
1196                    .contacts
1197                    .push(store.contact_for_user(requester_id, false));
1198            }
1199            update
1200                .remove_incoming_requests
1201                .push(requester_id.to_proto());
1202            for connection_id in store.connection_ids_for_user(responder_id) {
1203                self.peer.send(connection_id, update.clone())?;
1204            }
1205
1206            // Update requester with new contact
1207            let mut update = proto::UpdateContacts::default();
1208            if accept {
1209                update
1210                    .contacts
1211                    .push(store.contact_for_user(responder_id, true));
1212            }
1213            update
1214                .remove_outgoing_requests
1215                .push(responder_id.to_proto());
1216            for connection_id in store.connection_ids_for_user(requester_id) {
1217                self.peer.send(connection_id, update.clone())?;
1218            }
1219        }
1220
1221        response.send(proto::Ack {})?;
1222        Ok(())
1223    }
1224
1225    async fn remove_contact(
1226        self: Arc<Server>,
1227        request: TypedEnvelope<proto::RemoveContact>,
1228        response: Response<proto::RemoveContact>,
1229    ) -> Result<()> {
1230        let requester_id = self
1231            .store()
1232            .await
1233            .user_id_for_connection(request.sender_id)?;
1234        let responder_id = UserId::from_proto(request.payload.user_id);
1235        self.app_state
1236            .db
1237            .remove_contact(requester_id, responder_id)
1238            .await?;
1239
1240        // Update outgoing contact requests of requester
1241        let mut update = proto::UpdateContacts::default();
1242        update
1243            .remove_outgoing_requests
1244            .push(responder_id.to_proto());
1245        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1246            self.peer.send(connection_id, update.clone())?;
1247        }
1248
1249        // Update incoming contact requests of responder
1250        let mut update = proto::UpdateContacts::default();
1251        update
1252            .remove_incoming_requests
1253            .push(requester_id.to_proto());
1254        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1255            self.peer.send(connection_id, update.clone())?;
1256        }
1257
1258        response.send(proto::Ack {})?;
1259        Ok(())
1260    }
1261
1262    async fn join_channel(
1263        self: Arc<Self>,
1264        request: TypedEnvelope<proto::JoinChannel>,
1265        response: Response<proto::JoinChannel>,
1266    ) -> Result<()> {
1267        let user_id = self
1268            .store()
1269            .await
1270            .user_id_for_connection(request.sender_id)?;
1271        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1272        if !self
1273            .app_state
1274            .db
1275            .can_user_access_channel(user_id, channel_id)
1276            .await?
1277        {
1278            Err(anyhow!("access denied"))?;
1279        }
1280
1281        self.store_mut()
1282            .await
1283            .join_channel(request.sender_id, channel_id);
1284        let messages = self
1285            .app_state
1286            .db
1287            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1288            .await?
1289            .into_iter()
1290            .map(|msg| proto::ChannelMessage {
1291                id: msg.id.to_proto(),
1292                body: msg.body,
1293                timestamp: msg.sent_at.unix_timestamp() as u64,
1294                sender_id: msg.sender_id.to_proto(),
1295                nonce: Some(msg.nonce.as_u128().into()),
1296            })
1297            .collect::<Vec<_>>();
1298        response.send(proto::JoinChannelResponse {
1299            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1300            messages,
1301        })?;
1302        Ok(())
1303    }
1304
1305    async fn leave_channel(
1306        self: Arc<Self>,
1307        request: TypedEnvelope<proto::LeaveChannel>,
1308    ) -> Result<()> {
1309        let user_id = self
1310            .store()
1311            .await
1312            .user_id_for_connection(request.sender_id)?;
1313        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1314        if !self
1315            .app_state
1316            .db
1317            .can_user_access_channel(user_id, channel_id)
1318            .await?
1319        {
1320            Err(anyhow!("access denied"))?;
1321        }
1322
1323        self.store_mut()
1324            .await
1325            .leave_channel(request.sender_id, channel_id);
1326
1327        Ok(())
1328    }
1329
1330    async fn send_channel_message(
1331        self: Arc<Self>,
1332        request: TypedEnvelope<proto::SendChannelMessage>,
1333        response: Response<proto::SendChannelMessage>,
1334    ) -> Result<()> {
1335        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1336        let user_id;
1337        let connection_ids;
1338        {
1339            let state = self.store().await;
1340            user_id = state.user_id_for_connection(request.sender_id)?;
1341            connection_ids = state.channel_connection_ids(channel_id)?;
1342        }
1343
1344        // Validate the message body.
1345        let body = request.payload.body.trim().to_string();
1346        if body.len() > MAX_MESSAGE_LEN {
1347            return Err(anyhow!("message is too long"))?;
1348        }
1349        if body.is_empty() {
1350            return Err(anyhow!("message can't be blank"))?;
1351        }
1352
1353        let timestamp = OffsetDateTime::now_utc();
1354        let nonce = request
1355            .payload
1356            .nonce
1357            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1358
1359        let message_id = self
1360            .app_state
1361            .db
1362            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1363            .await?
1364            .to_proto();
1365        let message = proto::ChannelMessage {
1366            sender_id: user_id.to_proto(),
1367            id: message_id,
1368            body,
1369            timestamp: timestamp.unix_timestamp() as u64,
1370            nonce: Some(nonce),
1371        };
1372        broadcast(request.sender_id, connection_ids, |conn_id| {
1373            self.peer.send(
1374                conn_id,
1375                proto::ChannelMessageSent {
1376                    channel_id: channel_id.to_proto(),
1377                    message: Some(message.clone()),
1378                },
1379            )
1380        });
1381        response.send(proto::SendChannelMessageResponse {
1382            message: Some(message),
1383        })?;
1384        Ok(())
1385    }
1386
1387    async fn get_channel_messages(
1388        self: Arc<Self>,
1389        request: TypedEnvelope<proto::GetChannelMessages>,
1390        response: Response<proto::GetChannelMessages>,
1391    ) -> Result<()> {
1392        let user_id = self
1393            .store()
1394            .await
1395            .user_id_for_connection(request.sender_id)?;
1396        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1397        if !self
1398            .app_state
1399            .db
1400            .can_user_access_channel(user_id, channel_id)
1401            .await?
1402        {
1403            Err(anyhow!("access denied"))?;
1404        }
1405
1406        let messages = self
1407            .app_state
1408            .db
1409            .get_channel_messages(
1410                channel_id,
1411                MESSAGE_COUNT_PER_PAGE,
1412                Some(MessageId::from_proto(request.payload.before_message_id)),
1413            )
1414            .await?
1415            .into_iter()
1416            .map(|msg| proto::ChannelMessage {
1417                id: msg.id.to_proto(),
1418                body: msg.body,
1419                timestamp: msg.sent_at.unix_timestamp() as u64,
1420                sender_id: msg.sender_id.to_proto(),
1421                nonce: Some(msg.nonce.as_u128().into()),
1422            })
1423            .collect::<Vec<_>>();
1424        response.send(proto::GetChannelMessagesResponse {
1425            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1426            messages,
1427        })?;
1428        Ok(())
1429    }
1430
1431    async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1432        #[cfg(test)]
1433        tokio::task::yield_now().await;
1434        let guard = self.store.read().await;
1435        #[cfg(test)]
1436        tokio::task::yield_now().await;
1437        StoreReadGuard {
1438            guard,
1439            _not_send: PhantomData,
1440        }
1441    }
1442
1443    async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1444        #[cfg(test)]
1445        tokio::task::yield_now().await;
1446        let guard = self.store.write().await;
1447        #[cfg(test)]
1448        tokio::task::yield_now().await;
1449        StoreWriteGuard {
1450            guard,
1451            _not_send: PhantomData,
1452        }
1453    }
1454}
1455
1456impl<'a> Deref for StoreReadGuard<'a> {
1457    type Target = Store;
1458
1459    fn deref(&self) -> &Self::Target {
1460        &*self.guard
1461    }
1462}
1463
1464impl<'a> Deref for StoreWriteGuard<'a> {
1465    type Target = Store;
1466
1467    fn deref(&self) -> &Self::Target {
1468        &*self.guard
1469    }
1470}
1471
1472impl<'a> DerefMut for StoreWriteGuard<'a> {
1473    fn deref_mut(&mut self) -> &mut Self::Target {
1474        &mut *self.guard
1475    }
1476}
1477
1478impl<'a> Drop for StoreWriteGuard<'a> {
1479    fn drop(&mut self) {
1480        #[cfg(test)]
1481        self.check_invariants();
1482
1483        let metrics = self.metrics();
1484        tracing::info!(
1485            connections = metrics.connections,
1486            registered_projects = metrics.registered_projects,
1487            shared_projects = metrics.shared_projects,
1488            "metrics"
1489        );
1490    }
1491}
1492
1493impl Executor for RealExecutor {
1494    type Sleep = Sleep;
1495
1496    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1497        tokio::task::spawn(future);
1498    }
1499
1500    fn sleep(&self, duration: Duration) -> Self::Sleep {
1501        tokio::time::sleep(duration)
1502    }
1503}
1504
1505fn broadcast<F>(
1506    sender_id: ConnectionId,
1507    receiver_ids: impl IntoIterator<Item = ConnectionId>,
1508    mut f: F,
1509) where
1510    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1511{
1512    for receiver_id in receiver_ids {
1513        if receiver_id != sender_id {
1514            f(receiver_id).trace_err();
1515        }
1516    }
1517}
1518
1519lazy_static! {
1520    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1521}
1522
1523pub struct ProtocolVersion(u32);
1524
1525impl Header for ProtocolVersion {
1526    fn name() -> &'static HeaderName {
1527        &ZED_PROTOCOL_VERSION
1528    }
1529
1530    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1531    where
1532        Self: Sized,
1533        I: Iterator<Item = &'i axum::http::HeaderValue>,
1534    {
1535        let version = values
1536            .next()
1537            .ok_or_else(|| axum::headers::Error::invalid())?
1538            .to_str()
1539            .map_err(|_| axum::headers::Error::invalid())?
1540            .parse()
1541            .map_err(|_| axum::headers::Error::invalid())?;
1542        Ok(Self(version))
1543    }
1544
1545    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1546        values.extend([self.0.to_string().parse().unwrap()]);
1547    }
1548}
1549
1550pub fn routes(server: Arc<Server>) -> Router<Body> {
1551    Router::new()
1552        .route("/rpc", get(handle_websocket_request))
1553        .layer(
1554            ServiceBuilder::new()
1555                .layer(Extension(server.app_state.clone()))
1556                .layer(middleware::from_fn(auth::validate_header))
1557                .layer(Extension(server)),
1558        )
1559}
1560
1561pub async fn handle_websocket_request(
1562    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1563    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1564    Extension(server): Extension<Arc<Server>>,
1565    Extension(user): Extension<User>,
1566    ws: WebSocketUpgrade,
1567) -> axum::response::Response {
1568    if protocol_version != rpc::PROTOCOL_VERSION {
1569        return (
1570            StatusCode::UPGRADE_REQUIRED,
1571            "client must be upgraded".to_string(),
1572        )
1573            .into_response();
1574    }
1575    let socket_address = socket_address.to_string();
1576    ws.on_upgrade(move |socket| {
1577        use util::ResultExt;
1578        let socket = socket
1579            .map_ok(to_tungstenite_message)
1580            .err_into()
1581            .with(|message| async move { Ok(to_axum_message(message)) });
1582        let connection = Connection::new(Box::pin(socket));
1583        async move {
1584            server
1585                .handle_connection(connection, socket_address, user, None, RealExecutor)
1586                .await
1587                .log_err();
1588        }
1589    })
1590}
1591
1592fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1593    match message {
1594        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1595        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1596        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1597        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1598        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1599            code: frame.code.into(),
1600            reason: frame.reason,
1601        })),
1602    }
1603}
1604
1605fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1606    match message {
1607        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1608        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1609        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1610        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1611        AxumMessage::Close(frame) => {
1612            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1613                code: frame.code.into(),
1614                reason: frame.reason,
1615            }))
1616        }
1617    }
1618}
1619
1620pub trait ResultExt {
1621    type Ok;
1622
1623    fn trace_err(self) -> Option<Self::Ok>;
1624}
1625
1626impl<T, E> ResultExt for Result<T, E>
1627where
1628    E: std::fmt::Debug,
1629{
1630    type Ok = T;
1631
1632    fn trace_err(self) -> Option<T> {
1633        match self {
1634            Ok(value) => Some(value),
1635            Err(error) => {
1636                tracing::error!("{:?}", error);
1637                None
1638            }
1639        }
1640    }
1641}
1642
1643#[cfg(test)]
1644mod tests {
1645    use super::*;
1646    use crate::{
1647        db::{tests::TestDb, UserId},
1648        AppState,
1649    };
1650    use ::rpc::Peer;
1651    use client::{
1652        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1653        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1654    };
1655    use collections::{BTreeMap, HashSet};
1656    use editor::{
1657        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1658        ToOffset, ToggleCodeActions, Undo,
1659    };
1660    use gpui::{
1661        executor::{self, Deterministic},
1662        geometry::vector::vec2f,
1663        ModelHandle, TestAppContext, ViewHandle,
1664    };
1665    use language::{
1666        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1667        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1668    };
1669    use lsp::{self, FakeLanguageServer};
1670    use parking_lot::Mutex;
1671    use project::{
1672        fs::{FakeFs, Fs as _},
1673        search::SearchQuery,
1674        worktree::WorktreeHandle,
1675        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1676    };
1677    use rand::prelude::*;
1678    use rpc::PeerId;
1679    use serde_json::json;
1680    use settings::Settings;
1681    use sqlx::types::time::OffsetDateTime;
1682    use std::{
1683        cell::RefCell,
1684        env,
1685        ops::Deref,
1686        path::{Path, PathBuf},
1687        rc::Rc,
1688        sync::{
1689            atomic::{AtomicBool, Ordering::SeqCst},
1690            Arc,
1691        },
1692        time::Duration,
1693    };
1694    use theme::ThemeRegistry;
1695    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1696
1697    #[cfg(test)]
1698    #[ctor::ctor]
1699    fn init_logger() {
1700        if std::env::var("RUST_LOG").is_ok() {
1701            env_logger::init();
1702        }
1703    }
1704
1705    #[gpui::test(iterations = 10)]
1706    async fn test_share_project(
1707        deterministic: Arc<Deterministic>,
1708        cx_a: &mut TestAppContext,
1709        cx_b: &mut TestAppContext,
1710        cx_b2: &mut TestAppContext,
1711    ) {
1712        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1713        let lang_registry = Arc::new(LanguageRegistry::test());
1714        let fs = FakeFs::new(cx_a.background());
1715        cx_a.foreground().forbid_parking();
1716
1717        // Connect to a server as 2 clients.
1718        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1719        let client_a = server.create_client(cx_a, "user_a").await;
1720        let mut client_b = server.create_client(cx_b, "user_b").await;
1721        server
1722            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1723            .await;
1724
1725        // Share a project as client A
1726        fs.insert_tree(
1727            "/a",
1728            json!({
1729                "a.txt": "a-contents",
1730                "b.txt": "b-contents",
1731            }),
1732        )
1733        .await;
1734        let project_a = cx_a.update(|cx| {
1735            Project::local(
1736                client_a.clone(),
1737                client_a.user_store.clone(),
1738                lang_registry.clone(),
1739                fs.clone(),
1740                cx,
1741            )
1742        });
1743        let project_id = project_a
1744            .read_with(cx_a, |project, _| project.next_remote_id())
1745            .await;
1746        let (worktree_a, _) = project_a
1747            .update(cx_a, |p, cx| {
1748                p.find_or_create_local_worktree("/a", true, cx)
1749            })
1750            .await
1751            .unwrap();
1752        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1753        worktree_a
1754            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1755            .await;
1756
1757        // Join that project as client B
1758        let client_b_peer_id = client_b.peer_id;
1759        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1760
1761        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1762            assert_eq!(
1763                project
1764                    .collaborators()
1765                    .get(&client_a.peer_id)
1766                    .unwrap()
1767                    .user
1768                    .github_login,
1769                "user_a"
1770            );
1771            project.replica_id()
1772        });
1773        project_a
1774            .condition(&cx_a, |tree, _| {
1775                tree.collaborators()
1776                    .get(&client_b_peer_id)
1777                    .map_or(false, |collaborator| {
1778                        collaborator.replica_id == replica_id_b
1779                            && collaborator.user.github_login == "user_b"
1780                    })
1781            })
1782            .await;
1783
1784        // Open the same file as client B and client A.
1785        let buffer_b = project_b
1786            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1787            .await
1788            .unwrap();
1789        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1790        project_a.read_with(cx_a, |project, cx| {
1791            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1792        });
1793        let buffer_a = project_a
1794            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1795            .await
1796            .unwrap();
1797
1798        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1799
1800        // TODO
1801        // // Create a selection set as client B and see that selection set as client A.
1802        // buffer_a
1803        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1804        //     .await;
1805
1806        // Edit the buffer as client B and see that edit as client A.
1807        editor_b.update(cx_b, |editor, cx| {
1808            editor.handle_input(&Input("ok, ".into()), cx)
1809        });
1810        buffer_a
1811            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1812            .await;
1813
1814        // TODO
1815        // // Remove the selection set as client B, see those selections disappear as client A.
1816        cx_b.update(move |_| drop(editor_b));
1817        // buffer_a
1818        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1819        //     .await;
1820
1821        // Client B can join again on a different window because they are already a participant.
1822        let client_b2 = server.create_client(cx_b2, "user_b").await;
1823        let project_b2 = Project::remote(
1824            project_id,
1825            client_b2.client.clone(),
1826            client_b2.user_store.clone(),
1827            lang_registry.clone(),
1828            FakeFs::new(cx_b2.background()),
1829            &mut cx_b2.to_async(),
1830        )
1831        .await
1832        .unwrap();
1833        deterministic.run_until_parked();
1834        project_a.read_with(cx_a, |project, _| {
1835            assert_eq!(project.collaborators().len(), 2);
1836        });
1837        project_b.read_with(cx_b, |project, _| {
1838            assert_eq!(project.collaborators().len(), 2);
1839        });
1840        project_b2.read_with(cx_b2, |project, _| {
1841            assert_eq!(project.collaborators().len(), 2);
1842        });
1843
1844        // Dropping client B's first project removes only that from client A's collaborators.
1845        cx_b.update(move |_| {
1846            drop(client_b.project.take());
1847            drop(project_b);
1848        });
1849        deterministic.run_until_parked();
1850        project_a.read_with(cx_a, |project, _| {
1851            assert_eq!(project.collaborators().len(), 1);
1852        });
1853        project_b2.read_with(cx_b2, |project, _| {
1854            assert_eq!(project.collaborators().len(), 1);
1855        });
1856    }
1857
1858    #[gpui::test(iterations = 10)]
1859    async fn test_unshare_project(
1860        deterministic: Arc<Deterministic>,
1861        cx_a: &mut TestAppContext,
1862        cx_b: &mut TestAppContext,
1863    ) {
1864        let lang_registry = Arc::new(LanguageRegistry::test());
1865        let fs = FakeFs::new(cx_a.background());
1866        cx_a.foreground().forbid_parking();
1867
1868        // Connect to a server as 2 clients.
1869        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1870        let client_a = server.create_client(cx_a, "user_a").await;
1871        let mut client_b = server.create_client(cx_b, "user_b").await;
1872        server
1873            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1874            .await;
1875
1876        // Share a project as client A
1877        fs.insert_tree(
1878            "/a",
1879            json!({
1880                "a.txt": "a-contents",
1881                "b.txt": "b-contents",
1882            }),
1883        )
1884        .await;
1885        let project_a = cx_a.update(|cx| {
1886            Project::local(
1887                client_a.clone(),
1888                client_a.user_store.clone(),
1889                lang_registry.clone(),
1890                fs.clone(),
1891                cx,
1892            )
1893        });
1894        let (worktree_a, _) = project_a
1895            .update(cx_a, |p, cx| {
1896                p.find_or_create_local_worktree("/a", true, cx)
1897            })
1898            .await
1899            .unwrap();
1900        worktree_a
1901            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1902            .await;
1903        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1904
1905        // Join that project as client B
1906        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1907        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1908        project_b
1909            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1910            .await
1911            .unwrap();
1912
1913        // When client B leaves the project, it gets automatically unshared.
1914        cx_b.update(|_| {
1915            drop(client_b.project.take());
1916            drop(project_b);
1917        });
1918        deterministic.run_until_parked();
1919        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1920
1921        // When client B joins again, the project gets re-shared.
1922        let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1923        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1924        project_b2
1925            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1926            .await
1927            .unwrap();
1928    }
1929
1930    #[gpui::test(iterations = 10)]
1931    async fn test_host_disconnect(
1932        deterministic: Arc<Deterministic>,
1933        cx_a: &mut TestAppContext,
1934        cx_b: &mut TestAppContext,
1935        cx_c: &mut TestAppContext,
1936    ) {
1937        let lang_registry = Arc::new(LanguageRegistry::test());
1938        let fs = FakeFs::new(cx_a.background());
1939        cx_a.foreground().forbid_parking();
1940
1941        // Connect to a server as 3 clients.
1942        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1943        let client_a = server.create_client(cx_a, "user_a").await;
1944        let mut client_b = server.create_client(cx_b, "user_b").await;
1945        let client_c = server.create_client(cx_c, "user_c").await;
1946        server
1947            .make_contacts(vec![
1948                (&client_a, cx_a),
1949                (&client_b, cx_b),
1950                (&client_c, cx_c),
1951            ])
1952            .await;
1953
1954        // Share a project as client A
1955        fs.insert_tree(
1956            "/a",
1957            json!({
1958                "a.txt": "a-contents",
1959                "b.txt": "b-contents",
1960            }),
1961        )
1962        .await;
1963        let project_a = cx_a.update(|cx| {
1964            Project::local(
1965                client_a.clone(),
1966                client_a.user_store.clone(),
1967                lang_registry.clone(),
1968                fs.clone(),
1969                cx,
1970            )
1971        });
1972        let project_id = project_a
1973            .read_with(cx_a, |project, _| project.next_remote_id())
1974            .await;
1975        let (worktree_a, _) = project_a
1976            .update(cx_a, |p, cx| {
1977                p.find_or_create_local_worktree("/a", true, cx)
1978            })
1979            .await
1980            .unwrap();
1981        worktree_a
1982            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1983            .await;
1984        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1985
1986        // Join that project as client B
1987        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1988        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1989        project_b
1990            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1991            .await
1992            .unwrap();
1993
1994        // Request to join that project as client C
1995        let project_c = cx_c.spawn(|mut cx| {
1996            let client = client_c.client.clone();
1997            let user_store = client_c.user_store.clone();
1998            let lang_registry = lang_registry.clone();
1999            async move {
2000                Project::remote(
2001                    project_id,
2002                    client,
2003                    user_store,
2004                    lang_registry.clone(),
2005                    FakeFs::new(cx.background()),
2006                    &mut cx,
2007                )
2008                .await
2009            }
2010        });
2011        deterministic.run_until_parked();
2012
2013        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
2014        server.disconnect_client(client_a.current_user_id(cx_a));
2015        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2016        project_a
2017            .condition(cx_a, |project, _| project.collaborators().is_empty())
2018            .await;
2019        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
2020        project_b
2021            .condition(cx_b, |project, _| project.is_read_only())
2022            .await;
2023        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
2024        cx_b.update(|_| {
2025            drop(project_b);
2026        });
2027        assert!(matches!(
2028            project_c.await.unwrap_err(),
2029            project::JoinProjectError::HostWentOffline
2030        ));
2031
2032        // Ensure guests can still join.
2033        let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2034        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
2035        project_b2
2036            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2037            .await
2038            .unwrap();
2039    }
2040
2041    #[gpui::test(iterations = 10)]
2042    async fn test_decline_join_request(
2043        deterministic: Arc<Deterministic>,
2044        cx_a: &mut TestAppContext,
2045        cx_b: &mut TestAppContext,
2046    ) {
2047        let lang_registry = Arc::new(LanguageRegistry::test());
2048        let fs = FakeFs::new(cx_a.background());
2049        cx_a.foreground().forbid_parking();
2050
2051        // Connect to a server as 2 clients.
2052        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2053        let client_a = server.create_client(cx_a, "user_a").await;
2054        let client_b = server.create_client(cx_b, "user_b").await;
2055        server
2056            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2057            .await;
2058
2059        // Share a project as client A
2060        fs.insert_tree("/a", json!({})).await;
2061        let project_a = cx_a.update(|cx| {
2062            Project::local(
2063                client_a.clone(),
2064                client_a.user_store.clone(),
2065                lang_registry.clone(),
2066                fs.clone(),
2067                cx,
2068            )
2069        });
2070        let project_id = project_a
2071            .read_with(cx_a, |project, _| project.next_remote_id())
2072            .await;
2073        let (worktree_a, _) = project_a
2074            .update(cx_a, |p, cx| {
2075                p.find_or_create_local_worktree("/a", true, cx)
2076            })
2077            .await
2078            .unwrap();
2079        worktree_a
2080            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2081            .await;
2082
2083        // Request to join that project as client B
2084        let project_b = cx_b.spawn(|mut cx| {
2085            let client = client_b.client.clone();
2086            let user_store = client_b.user_store.clone();
2087            let lang_registry = lang_registry.clone();
2088            async move {
2089                Project::remote(
2090                    project_id,
2091                    client,
2092                    user_store,
2093                    lang_registry.clone(),
2094                    FakeFs::new(cx.background()),
2095                    &mut cx,
2096                )
2097                .await
2098            }
2099        });
2100        deterministic.run_until_parked();
2101        project_a.update(cx_a, |project, cx| {
2102            project.respond_to_join_request(client_b.user_id().unwrap(), false, cx)
2103        });
2104        assert!(matches!(
2105            project_b.await.unwrap_err(),
2106            project::JoinProjectError::HostDeclined
2107        ));
2108
2109        // Request to join the project again as client B
2110        let project_b = cx_b.spawn(|mut cx| {
2111            let client = client_b.client.clone();
2112            let user_store = client_b.user_store.clone();
2113            let lang_registry = lang_registry.clone();
2114            async move {
2115                Project::remote(
2116                    project_id,
2117                    client,
2118                    user_store,
2119                    lang_registry.clone(),
2120                    FakeFs::new(cx.background()),
2121                    &mut cx,
2122                )
2123                .await
2124            }
2125        });
2126
2127        // Close the project on the host
2128        deterministic.run_until_parked();
2129        cx_a.update(|_| drop(project_a));
2130        deterministic.run_until_parked();
2131        assert!(matches!(
2132            project_b.await.unwrap_err(),
2133            project::JoinProjectError::HostClosedProject
2134        ));
2135    }
2136
2137    #[gpui::test(iterations = 10)]
2138    async fn test_cancel_join_request(
2139        deterministic: Arc<Deterministic>,
2140        cx_a: &mut TestAppContext,
2141        cx_b: &mut TestAppContext,
2142    ) {
2143        let lang_registry = Arc::new(LanguageRegistry::test());
2144        let fs = FakeFs::new(cx_a.background());
2145        cx_a.foreground().forbid_parking();
2146
2147        // Connect to a server as 2 clients.
2148        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2149        let client_a = server.create_client(cx_a, "user_a").await;
2150        let client_b = server.create_client(cx_b, "user_b").await;
2151        server
2152            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2153            .await;
2154
2155        // Share a project as client A
2156        fs.insert_tree("/a", json!({})).await;
2157        let project_a = cx_a.update(|cx| {
2158            Project::local(
2159                client_a.clone(),
2160                client_a.user_store.clone(),
2161                lang_registry.clone(),
2162                fs.clone(),
2163                cx,
2164            )
2165        });
2166        let project_id = project_a
2167            .read_with(cx_a, |project, _| project.next_remote_id())
2168            .await;
2169
2170        let project_a_events = Rc::new(RefCell::new(Vec::new()));
2171        let user_b = client_a
2172            .user_store
2173            .update(cx_a, |store, cx| {
2174                store.fetch_user(client_b.user_id().unwrap(), cx)
2175            })
2176            .await
2177            .unwrap();
2178        project_a.update(cx_a, {
2179            let project_a_events = project_a_events.clone();
2180            move |_, cx| {
2181                cx.subscribe(&cx.handle(), move |_, _, event, _| {
2182                    project_a_events.borrow_mut().push(event.clone());
2183                })
2184                .detach();
2185            }
2186        });
2187
2188        let (worktree_a, _) = project_a
2189            .update(cx_a, |p, cx| {
2190                p.find_or_create_local_worktree("/a", true, cx)
2191            })
2192            .await
2193            .unwrap();
2194        worktree_a
2195            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2196            .await;
2197
2198        // Request to join that project as client B
2199        let project_b = cx_b.spawn(|mut cx| {
2200            let client = client_b.client.clone();
2201            let user_store = client_b.user_store.clone();
2202            let lang_registry = lang_registry.clone();
2203            async move {
2204                Project::remote(
2205                    project_id,
2206                    client,
2207                    user_store,
2208                    lang_registry.clone(),
2209                    FakeFs::new(cx.background()),
2210                    &mut cx,
2211                )
2212                .await
2213            }
2214        });
2215        deterministic.run_until_parked();
2216        assert_eq!(
2217            &*project_a_events.borrow(),
2218            &[project::Event::ContactRequestedJoin(user_b.clone())]
2219        );
2220        project_a_events.borrow_mut().clear();
2221
2222        // Cancel the join request by leaving the project
2223        client_b
2224            .client
2225            .send(proto::LeaveProject { project_id })
2226            .unwrap();
2227        drop(project_b);
2228
2229        deterministic.run_until_parked();
2230        assert_eq!(
2231            &*project_a_events.borrow(),
2232            &[project::Event::ContactCancelledJoinRequest(user_b.clone())]
2233        );
2234    }
2235
2236    #[gpui::test(iterations = 10)]
2237    async fn test_propagate_saves_and_fs_changes(
2238        cx_a: &mut TestAppContext,
2239        cx_b: &mut TestAppContext,
2240        cx_c: &mut TestAppContext,
2241    ) {
2242        let lang_registry = Arc::new(LanguageRegistry::test());
2243        let fs = FakeFs::new(cx_a.background());
2244        cx_a.foreground().forbid_parking();
2245
2246        // Connect to a server as 3 clients.
2247        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2248        let client_a = server.create_client(cx_a, "user_a").await;
2249        let mut client_b = server.create_client(cx_b, "user_b").await;
2250        let mut client_c = server.create_client(cx_c, "user_c").await;
2251        server
2252            .make_contacts(vec![
2253                (&client_a, cx_a),
2254                (&client_b, cx_b),
2255                (&client_c, cx_c),
2256            ])
2257            .await;
2258
2259        // Share a worktree as client A.
2260        fs.insert_tree(
2261            "/a",
2262            json!({
2263                "file1": "",
2264                "file2": ""
2265            }),
2266        )
2267        .await;
2268        let project_a = cx_a.update(|cx| {
2269            Project::local(
2270                client_a.clone(),
2271                client_a.user_store.clone(),
2272                lang_registry.clone(),
2273                fs.clone(),
2274                cx,
2275            )
2276        });
2277        let (worktree_a, _) = project_a
2278            .update(cx_a, |p, cx| {
2279                p.find_or_create_local_worktree("/a", true, cx)
2280            })
2281            .await
2282            .unwrap();
2283        worktree_a
2284            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2285            .await;
2286        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2287
2288        // Join that worktree as clients B and C.
2289        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2290        let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
2291        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2292        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
2293
2294        // Open and edit a buffer as both guests B and C.
2295        let buffer_b = project_b
2296            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2297            .await
2298            .unwrap();
2299        let buffer_c = project_c
2300            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2301            .await
2302            .unwrap();
2303        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
2304        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
2305
2306        // Open and edit that buffer as the host.
2307        let buffer_a = project_a
2308            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2309            .await
2310            .unwrap();
2311
2312        buffer_a
2313            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
2314            .await;
2315        buffer_a.update(cx_a, |buf, cx| {
2316            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
2317        });
2318
2319        // Wait for edits to propagate
2320        buffer_a
2321            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2322            .await;
2323        buffer_b
2324            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2325            .await;
2326        buffer_c
2327            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2328            .await;
2329
2330        // Edit the buffer as the host and concurrently save as guest B.
2331        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2332        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2333        save_b.await.unwrap();
2334        assert_eq!(
2335            fs.load("/a/file1".as_ref()).await.unwrap(),
2336            "hi-a, i-am-c, i-am-b, i-am-a"
2337        );
2338        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2339        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2340        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2341
2342        worktree_a.flush_fs_events(cx_a).await;
2343
2344        // Make changes on host's file system, see those changes on guest worktrees.
2345        fs.rename(
2346            "/a/file1".as_ref(),
2347            "/a/file1-renamed".as_ref(),
2348            Default::default(),
2349        )
2350        .await
2351        .unwrap();
2352
2353        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2354            .await
2355            .unwrap();
2356        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2357
2358        worktree_a
2359            .condition(&cx_a, |tree, _| {
2360                tree.paths()
2361                    .map(|p| p.to_string_lossy())
2362                    .collect::<Vec<_>>()
2363                    == ["file1-renamed", "file3", "file4"]
2364            })
2365            .await;
2366        worktree_b
2367            .condition(&cx_b, |tree, _| {
2368                tree.paths()
2369                    .map(|p| p.to_string_lossy())
2370                    .collect::<Vec<_>>()
2371                    == ["file1-renamed", "file3", "file4"]
2372            })
2373            .await;
2374        worktree_c
2375            .condition(&cx_c, |tree, _| {
2376                tree.paths()
2377                    .map(|p| p.to_string_lossy())
2378                    .collect::<Vec<_>>()
2379                    == ["file1-renamed", "file3", "file4"]
2380            })
2381            .await;
2382
2383        // Ensure buffer files are updated as well.
2384        buffer_a
2385            .condition(&cx_a, |buf, _| {
2386                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2387            })
2388            .await;
2389        buffer_b
2390            .condition(&cx_b, |buf, _| {
2391                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2392            })
2393            .await;
2394        buffer_c
2395            .condition(&cx_c, |buf, _| {
2396                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2397            })
2398            .await;
2399    }
2400
2401    #[gpui::test(iterations = 10)]
2402    async fn test_fs_operations(
2403        executor: Arc<Deterministic>,
2404        cx_a: &mut TestAppContext,
2405        cx_b: &mut TestAppContext,
2406    ) {
2407        executor.forbid_parking();
2408        let fs = FakeFs::new(cx_a.background());
2409
2410        // Connect to a server as 2 clients.
2411        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2412        let mut client_a = server.create_client(cx_a, "user_a").await;
2413        let mut client_b = server.create_client(cx_b, "user_b").await;
2414        server
2415            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2416            .await;
2417
2418        // Share a project as client A
2419        fs.insert_tree(
2420            "/dir",
2421            json!({
2422                "a.txt": "a-contents",
2423                "b.txt": "b-contents",
2424            }),
2425        )
2426        .await;
2427
2428        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2429        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2430
2431        let worktree_a =
2432            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2433        let worktree_b =
2434            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2435
2436        let entry = project_b
2437            .update(cx_b, |project, cx| {
2438                project
2439                    .create_entry((worktree_id, "c.txt"), false, cx)
2440                    .unwrap()
2441            })
2442            .await
2443            .unwrap();
2444        worktree_a.read_with(cx_a, |worktree, _| {
2445            assert_eq!(
2446                worktree
2447                    .paths()
2448                    .map(|p| p.to_string_lossy())
2449                    .collect::<Vec<_>>(),
2450                ["a.txt", "b.txt", "c.txt"]
2451            );
2452        });
2453        worktree_b.read_with(cx_b, |worktree, _| {
2454            assert_eq!(
2455                worktree
2456                    .paths()
2457                    .map(|p| p.to_string_lossy())
2458                    .collect::<Vec<_>>(),
2459                ["a.txt", "b.txt", "c.txt"]
2460            );
2461        });
2462
2463        project_b
2464            .update(cx_b, |project, cx| {
2465                project.rename_entry(entry.id, Path::new("d.txt"), cx)
2466            })
2467            .unwrap()
2468            .await
2469            .unwrap();
2470        worktree_a.read_with(cx_a, |worktree, _| {
2471            assert_eq!(
2472                worktree
2473                    .paths()
2474                    .map(|p| p.to_string_lossy())
2475                    .collect::<Vec<_>>(),
2476                ["a.txt", "b.txt", "d.txt"]
2477            );
2478        });
2479        worktree_b.read_with(cx_b, |worktree, _| {
2480            assert_eq!(
2481                worktree
2482                    .paths()
2483                    .map(|p| p.to_string_lossy())
2484                    .collect::<Vec<_>>(),
2485                ["a.txt", "b.txt", "d.txt"]
2486            );
2487        });
2488
2489        let dir_entry = project_b
2490            .update(cx_b, |project, cx| {
2491                project
2492                    .create_entry((worktree_id, "DIR"), true, cx)
2493                    .unwrap()
2494            })
2495            .await
2496            .unwrap();
2497        worktree_a.read_with(cx_a, |worktree, _| {
2498            assert_eq!(
2499                worktree
2500                    .paths()
2501                    .map(|p| p.to_string_lossy())
2502                    .collect::<Vec<_>>(),
2503                ["DIR", "a.txt", "b.txt", "d.txt"]
2504            );
2505        });
2506        worktree_b.read_with(cx_b, |worktree, _| {
2507            assert_eq!(
2508                worktree
2509                    .paths()
2510                    .map(|p| p.to_string_lossy())
2511                    .collect::<Vec<_>>(),
2512                ["DIR", "a.txt", "b.txt", "d.txt"]
2513            );
2514        });
2515
2516        project_b
2517            .update(cx_b, |project, cx| {
2518                project.delete_entry(dir_entry.id, cx).unwrap()
2519            })
2520            .await
2521            .unwrap();
2522        worktree_a.read_with(cx_a, |worktree, _| {
2523            assert_eq!(
2524                worktree
2525                    .paths()
2526                    .map(|p| p.to_string_lossy())
2527                    .collect::<Vec<_>>(),
2528                ["a.txt", "b.txt", "d.txt"]
2529            );
2530        });
2531        worktree_b.read_with(cx_b, |worktree, _| {
2532            assert_eq!(
2533                worktree
2534                    .paths()
2535                    .map(|p| p.to_string_lossy())
2536                    .collect::<Vec<_>>(),
2537                ["a.txt", "b.txt", "d.txt"]
2538            );
2539        });
2540
2541        project_b
2542            .update(cx_b, |project, cx| {
2543                project.delete_entry(entry.id, cx).unwrap()
2544            })
2545            .await
2546            .unwrap();
2547        worktree_a.read_with(cx_a, |worktree, _| {
2548            assert_eq!(
2549                worktree
2550                    .paths()
2551                    .map(|p| p.to_string_lossy())
2552                    .collect::<Vec<_>>(),
2553                ["a.txt", "b.txt"]
2554            );
2555        });
2556        worktree_b.read_with(cx_b, |worktree, _| {
2557            assert_eq!(
2558                worktree
2559                    .paths()
2560                    .map(|p| p.to_string_lossy())
2561                    .collect::<Vec<_>>(),
2562                ["a.txt", "b.txt"]
2563            );
2564        });
2565    }
2566
2567    #[gpui::test(iterations = 10)]
2568    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2569        cx_a.foreground().forbid_parking();
2570        let lang_registry = Arc::new(LanguageRegistry::test());
2571        let fs = FakeFs::new(cx_a.background());
2572
2573        // Connect to a server as 2 clients.
2574        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2575        let client_a = server.create_client(cx_a, "user_a").await;
2576        let mut client_b = server.create_client(cx_b, "user_b").await;
2577        server
2578            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2579            .await;
2580
2581        // Share a project as client A
2582        fs.insert_tree(
2583            "/dir",
2584            json!({
2585                "a.txt": "a-contents",
2586            }),
2587        )
2588        .await;
2589
2590        let project_a = cx_a.update(|cx| {
2591            Project::local(
2592                client_a.clone(),
2593                client_a.user_store.clone(),
2594                lang_registry.clone(),
2595                fs.clone(),
2596                cx,
2597            )
2598        });
2599        let (worktree_a, _) = project_a
2600            .update(cx_a, |p, cx| {
2601                p.find_or_create_local_worktree("/dir", true, cx)
2602            })
2603            .await
2604            .unwrap();
2605        worktree_a
2606            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2607            .await;
2608        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2609
2610        // Join that project as client B
2611        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2612
2613        // Open a buffer as client B
2614        let buffer_b = project_b
2615            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2616            .await
2617            .unwrap();
2618
2619        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2620        buffer_b.read_with(cx_b, |buf, _| {
2621            assert!(buf.is_dirty());
2622            assert!(!buf.has_conflict());
2623        });
2624
2625        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2626        buffer_b
2627            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2628            .await;
2629        buffer_b.read_with(cx_b, |buf, _| {
2630            assert!(!buf.has_conflict());
2631        });
2632
2633        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2634        buffer_b.read_with(cx_b, |buf, _| {
2635            assert!(buf.is_dirty());
2636            assert!(!buf.has_conflict());
2637        });
2638    }
2639
2640    #[gpui::test(iterations = 10)]
2641    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2642        cx_a.foreground().forbid_parking();
2643        let lang_registry = Arc::new(LanguageRegistry::test());
2644        let fs = FakeFs::new(cx_a.background());
2645
2646        // Connect to a server as 2 clients.
2647        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2648        let client_a = server.create_client(cx_a, "user_a").await;
2649        let mut client_b = server.create_client(cx_b, "user_b").await;
2650        server
2651            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2652            .await;
2653
2654        // Share a project as client A
2655        fs.insert_tree(
2656            "/dir",
2657            json!({
2658                "a.txt": "a-contents",
2659            }),
2660        )
2661        .await;
2662
2663        let project_a = cx_a.update(|cx| {
2664            Project::local(
2665                client_a.clone(),
2666                client_a.user_store.clone(),
2667                lang_registry.clone(),
2668                fs.clone(),
2669                cx,
2670            )
2671        });
2672        let (worktree_a, _) = project_a
2673            .update(cx_a, |p, cx| {
2674                p.find_or_create_local_worktree("/dir", true, cx)
2675            })
2676            .await
2677            .unwrap();
2678        worktree_a
2679            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2680            .await;
2681        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2682
2683        // Join that project as client B
2684        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2685        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2686
2687        // Open a buffer as client B
2688        let buffer_b = project_b
2689            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2690            .await
2691            .unwrap();
2692        buffer_b.read_with(cx_b, |buf, _| {
2693            assert!(!buf.is_dirty());
2694            assert!(!buf.has_conflict());
2695        });
2696
2697        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2698            .await
2699            .unwrap();
2700        buffer_b
2701            .condition(&cx_b, |buf, _| {
2702                buf.text() == "new contents" && !buf.is_dirty()
2703            })
2704            .await;
2705        buffer_b.read_with(cx_b, |buf, _| {
2706            assert!(!buf.has_conflict());
2707        });
2708    }
2709
2710    #[gpui::test(iterations = 10)]
2711    async fn test_editing_while_guest_opens_buffer(
2712        cx_a: &mut TestAppContext,
2713        cx_b: &mut TestAppContext,
2714    ) {
2715        cx_a.foreground().forbid_parking();
2716        let lang_registry = Arc::new(LanguageRegistry::test());
2717        let fs = FakeFs::new(cx_a.background());
2718
2719        // Connect to a server as 2 clients.
2720        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2721        let client_a = server.create_client(cx_a, "user_a").await;
2722        let mut client_b = server.create_client(cx_b, "user_b").await;
2723        server
2724            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2725            .await;
2726
2727        // Share a project as client A
2728        fs.insert_tree(
2729            "/dir",
2730            json!({
2731                "a.txt": "a-contents",
2732            }),
2733        )
2734        .await;
2735        let project_a = cx_a.update(|cx| {
2736            Project::local(
2737                client_a.clone(),
2738                client_a.user_store.clone(),
2739                lang_registry.clone(),
2740                fs.clone(),
2741                cx,
2742            )
2743        });
2744        let (worktree_a, _) = project_a
2745            .update(cx_a, |p, cx| {
2746                p.find_or_create_local_worktree("/dir", true, cx)
2747            })
2748            .await
2749            .unwrap();
2750        worktree_a
2751            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2752            .await;
2753        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2754
2755        // Join that project as client B
2756        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2757
2758        // Open a buffer as client A
2759        let buffer_a = project_a
2760            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2761            .await
2762            .unwrap();
2763
2764        // Start opening the same buffer as client B
2765        let buffer_b = cx_b
2766            .background()
2767            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2768
2769        // Edit the buffer as client A while client B is still opening it.
2770        cx_b.background().simulate_random_delay().await;
2771        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2772        cx_b.background().simulate_random_delay().await;
2773        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2774
2775        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2776        let buffer_b = buffer_b.await.unwrap();
2777        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2778    }
2779
2780    #[gpui::test(iterations = 10)]
2781    async fn test_leaving_worktree_while_opening_buffer(
2782        cx_a: &mut TestAppContext,
2783        cx_b: &mut TestAppContext,
2784    ) {
2785        cx_a.foreground().forbid_parking();
2786        let lang_registry = Arc::new(LanguageRegistry::test());
2787        let fs = FakeFs::new(cx_a.background());
2788
2789        // Connect to a server as 2 clients.
2790        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2791        let client_a = server.create_client(cx_a, "user_a").await;
2792        let mut client_b = server.create_client(cx_b, "user_b").await;
2793        server
2794            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2795            .await;
2796
2797        // Share a project as client A
2798        fs.insert_tree(
2799            "/dir",
2800            json!({
2801                "a.txt": "a-contents",
2802            }),
2803        )
2804        .await;
2805        let project_a = cx_a.update(|cx| {
2806            Project::local(
2807                client_a.clone(),
2808                client_a.user_store.clone(),
2809                lang_registry.clone(),
2810                fs.clone(),
2811                cx,
2812            )
2813        });
2814        let (worktree_a, _) = project_a
2815            .update(cx_a, |p, cx| {
2816                p.find_or_create_local_worktree("/dir", true, cx)
2817            })
2818            .await
2819            .unwrap();
2820        worktree_a
2821            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2822            .await;
2823        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2824
2825        // Join that project as client B
2826        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2827
2828        // See that a guest has joined as client A.
2829        project_a
2830            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2831            .await;
2832
2833        // Begin opening a buffer as client B, but leave the project before the open completes.
2834        let buffer_b = cx_b
2835            .background()
2836            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2837        cx_b.update(|_| {
2838            drop(client_b.project.take());
2839            drop(project_b);
2840        });
2841        drop(buffer_b);
2842
2843        // See that the guest has left.
2844        project_a
2845            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2846            .await;
2847    }
2848
2849    #[gpui::test(iterations = 10)]
2850    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2851        cx_a.foreground().forbid_parking();
2852        let lang_registry = Arc::new(LanguageRegistry::test());
2853        let fs = FakeFs::new(cx_a.background());
2854
2855        // Connect to a server as 2 clients.
2856        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2857        let client_a = server.create_client(cx_a, "user_a").await;
2858        let mut client_b = server.create_client(cx_b, "user_b").await;
2859        server
2860            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2861            .await;
2862
2863        // Share a project as client A
2864        fs.insert_tree(
2865            "/a",
2866            json!({
2867                "a.txt": "a-contents",
2868                "b.txt": "b-contents",
2869            }),
2870        )
2871        .await;
2872        let project_a = cx_a.update(|cx| {
2873            Project::local(
2874                client_a.clone(),
2875                client_a.user_store.clone(),
2876                lang_registry.clone(),
2877                fs.clone(),
2878                cx,
2879            )
2880        });
2881        let (worktree_a, _) = project_a
2882            .update(cx_a, |p, cx| {
2883                p.find_or_create_local_worktree("/a", true, cx)
2884            })
2885            .await
2886            .unwrap();
2887        worktree_a
2888            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2889            .await;
2890
2891        // Join that project as client B
2892        let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2893
2894        // Client A sees that a guest has joined.
2895        project_a
2896            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2897            .await;
2898
2899        // Drop client B's connection and ensure client A observes client B leaving the project.
2900        client_b.disconnect(&cx_b.to_async()).unwrap();
2901        project_a
2902            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2903            .await;
2904
2905        // Rejoin the project as client B
2906        let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2907
2908        // Client A sees that a guest has re-joined.
2909        project_a
2910            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2911            .await;
2912
2913        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2914        client_b.wait_for_current_user(cx_b).await;
2915        server.disconnect_client(client_b.current_user_id(cx_b));
2916        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2917        project_a
2918            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2919            .await;
2920    }
2921
2922    #[gpui::test(iterations = 10)]
2923    async fn test_collaborating_with_diagnostics(
2924        deterministic: Arc<Deterministic>,
2925        cx_a: &mut TestAppContext,
2926        cx_b: &mut TestAppContext,
2927        cx_c: &mut TestAppContext,
2928    ) {
2929        deterministic.forbid_parking();
2930        let lang_registry = Arc::new(LanguageRegistry::test());
2931        let fs = FakeFs::new(cx_a.background());
2932
2933        // Set up a fake language server.
2934        let mut language = Language::new(
2935            LanguageConfig {
2936                name: "Rust".into(),
2937                path_suffixes: vec!["rs".to_string()],
2938                ..Default::default()
2939            },
2940            Some(tree_sitter_rust::language()),
2941        );
2942        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2943        lang_registry.add(Arc::new(language));
2944
2945        // Connect to a server as 2 clients.
2946        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2947        let client_a = server.create_client(cx_a, "user_a").await;
2948        let mut client_b = server.create_client(cx_b, "user_b").await;
2949        let mut client_c = server.create_client(cx_c, "user_c").await;
2950        server
2951            .make_contacts(vec![
2952                (&client_a, cx_a),
2953                (&client_b, cx_b),
2954                (&client_c, cx_c),
2955            ])
2956            .await;
2957
2958        // Share a project as client A
2959        fs.insert_tree(
2960            "/a",
2961            json!({
2962                "a.rs": "let one = two",
2963                "other.rs": "",
2964            }),
2965        )
2966        .await;
2967        let project_a = cx_a.update(|cx| {
2968            Project::local(
2969                client_a.clone(),
2970                client_a.user_store.clone(),
2971                lang_registry.clone(),
2972                fs.clone(),
2973                cx,
2974            )
2975        });
2976        let (worktree_a, _) = project_a
2977            .update(cx_a, |p, cx| {
2978                p.find_or_create_local_worktree("/a", true, cx)
2979            })
2980            .await
2981            .unwrap();
2982        worktree_a
2983            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2984            .await;
2985        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2986        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2987
2988        // Cause the language server to start.
2989        let _buffer = cx_a
2990            .background()
2991            .spawn(project_a.update(cx_a, |project, cx| {
2992                project.open_buffer(
2993                    ProjectPath {
2994                        worktree_id,
2995                        path: Path::new("other.rs").into(),
2996                    },
2997                    cx,
2998                )
2999            }))
3000            .await
3001            .unwrap();
3002
3003        // Join the worktree as client B.
3004        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3005
3006        // Simulate a language server reporting errors for a file.
3007        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3008        fake_language_server
3009            .receive_notification::<lsp::notification::DidOpenTextDocument>()
3010            .await;
3011        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3012            lsp::PublishDiagnosticsParams {
3013                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3014                version: None,
3015                diagnostics: vec![lsp::Diagnostic {
3016                    severity: Some(lsp::DiagnosticSeverity::ERROR),
3017                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
3018                    message: "message 1".to_string(),
3019                    ..Default::default()
3020                }],
3021            },
3022        );
3023
3024        // Wait for server to see the diagnostics update.
3025        deterministic.run_until_parked();
3026        {
3027            let store = server.store.read().await;
3028            let project = store.project(project_id).unwrap();
3029            let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap();
3030            assert!(!worktree.diagnostic_summaries.is_empty());
3031        }
3032
3033        // Ensure client B observes the new diagnostics.
3034        project_b.read_with(cx_b, |project, cx| {
3035            assert_eq!(
3036                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3037                &[(
3038                    ProjectPath {
3039                        worktree_id,
3040                        path: Arc::from(Path::new("a.rs")),
3041                    },
3042                    DiagnosticSummary {
3043                        error_count: 1,
3044                        warning_count: 0,
3045                        ..Default::default()
3046                    },
3047                )]
3048            )
3049        });
3050
3051        // Join project as client C and observe the diagnostics.
3052        let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
3053        project_c.read_with(cx_c, |project, cx| {
3054            assert_eq!(
3055                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3056                &[(
3057                    ProjectPath {
3058                        worktree_id,
3059                        path: Arc::from(Path::new("a.rs")),
3060                    },
3061                    DiagnosticSummary {
3062                        error_count: 1,
3063                        warning_count: 0,
3064                        ..Default::default()
3065                    },
3066                )]
3067            )
3068        });
3069
3070        // Simulate a language server reporting more errors for a file.
3071        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3072            lsp::PublishDiagnosticsParams {
3073                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3074                version: None,
3075                diagnostics: vec![
3076                    lsp::Diagnostic {
3077                        severity: Some(lsp::DiagnosticSeverity::ERROR),
3078                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
3079                        message: "message 1".to_string(),
3080                        ..Default::default()
3081                    },
3082                    lsp::Diagnostic {
3083                        severity: Some(lsp::DiagnosticSeverity::WARNING),
3084                        range: lsp::Range::new(
3085                            lsp::Position::new(0, 10),
3086                            lsp::Position::new(0, 13),
3087                        ),
3088                        message: "message 2".to_string(),
3089                        ..Default::default()
3090                    },
3091                ],
3092            },
3093        );
3094
3095        // Clients B and C get the updated summaries
3096        deterministic.run_until_parked();
3097        project_b.read_with(cx_b, |project, cx| {
3098            assert_eq!(
3099                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3100                [(
3101                    ProjectPath {
3102                        worktree_id,
3103                        path: Arc::from(Path::new("a.rs")),
3104                    },
3105                    DiagnosticSummary {
3106                        error_count: 1,
3107                        warning_count: 1,
3108                        ..Default::default()
3109                    },
3110                )]
3111            );
3112        });
3113        project_c.read_with(cx_c, |project, cx| {
3114            assert_eq!(
3115                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
3116                [(
3117                    ProjectPath {
3118                        worktree_id,
3119                        path: Arc::from(Path::new("a.rs")),
3120                    },
3121                    DiagnosticSummary {
3122                        error_count: 1,
3123                        warning_count: 1,
3124                        ..Default::default()
3125                    },
3126                )]
3127            );
3128        });
3129
3130        // Open the file with the errors on client B. They should be present.
3131        let buffer_b = cx_b
3132            .background()
3133            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3134            .await
3135            .unwrap();
3136
3137        buffer_b.read_with(cx_b, |buffer, _| {
3138            assert_eq!(
3139                buffer
3140                    .snapshot()
3141                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
3142                    .map(|entry| entry)
3143                    .collect::<Vec<_>>(),
3144                &[
3145                    DiagnosticEntry {
3146                        range: Point::new(0, 4)..Point::new(0, 7),
3147                        diagnostic: Diagnostic {
3148                            group_id: 0,
3149                            message: "message 1".to_string(),
3150                            severity: lsp::DiagnosticSeverity::ERROR,
3151                            is_primary: true,
3152                            ..Default::default()
3153                        }
3154                    },
3155                    DiagnosticEntry {
3156                        range: Point::new(0, 10)..Point::new(0, 13),
3157                        diagnostic: Diagnostic {
3158                            group_id: 1,
3159                            severity: lsp::DiagnosticSeverity::WARNING,
3160                            message: "message 2".to_string(),
3161                            is_primary: true,
3162                            ..Default::default()
3163                        }
3164                    }
3165                ]
3166            );
3167        });
3168
3169        // Simulate a language server reporting no errors for a file.
3170        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
3171            lsp::PublishDiagnosticsParams {
3172                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
3173                version: None,
3174                diagnostics: vec![],
3175            },
3176        );
3177        deterministic.run_until_parked();
3178        project_a.read_with(cx_a, |project, cx| {
3179            assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3180        });
3181        project_b.read_with(cx_b, |project, cx| {
3182            assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3183        });
3184        project_c.read_with(cx_c, |project, cx| {
3185            assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
3186        });
3187    }
3188
3189    #[gpui::test(iterations = 10)]
3190    async fn test_collaborating_with_completion(
3191        cx_a: &mut TestAppContext,
3192        cx_b: &mut TestAppContext,
3193    ) {
3194        cx_a.foreground().forbid_parking();
3195        let lang_registry = Arc::new(LanguageRegistry::test());
3196        let fs = FakeFs::new(cx_a.background());
3197
3198        // Set up a fake language server.
3199        let mut language = Language::new(
3200            LanguageConfig {
3201                name: "Rust".into(),
3202                path_suffixes: vec!["rs".to_string()],
3203                ..Default::default()
3204            },
3205            Some(tree_sitter_rust::language()),
3206        );
3207        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3208            capabilities: lsp::ServerCapabilities {
3209                completion_provider: Some(lsp::CompletionOptions {
3210                    trigger_characters: Some(vec![".".to_string()]),
3211                    ..Default::default()
3212                }),
3213                ..Default::default()
3214            },
3215            ..Default::default()
3216        });
3217        lang_registry.add(Arc::new(language));
3218
3219        // Connect to a server as 2 clients.
3220        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3221        let client_a = server.create_client(cx_a, "user_a").await;
3222        let mut client_b = server.create_client(cx_b, "user_b").await;
3223        server
3224            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3225            .await;
3226
3227        // Share a project as client A
3228        fs.insert_tree(
3229            "/a",
3230            json!({
3231                "main.rs": "fn main() { a }",
3232                "other.rs": "",
3233            }),
3234        )
3235        .await;
3236        let project_a = cx_a.update(|cx| {
3237            Project::local(
3238                client_a.clone(),
3239                client_a.user_store.clone(),
3240                lang_registry.clone(),
3241                fs.clone(),
3242                cx,
3243            )
3244        });
3245        let (worktree_a, _) = project_a
3246            .update(cx_a, |p, cx| {
3247                p.find_or_create_local_worktree("/a", true, cx)
3248            })
3249            .await
3250            .unwrap();
3251        worktree_a
3252            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3253            .await;
3254        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3255
3256        // Join the worktree as client B.
3257        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3258
3259        // Open a file in an editor as the guest.
3260        let buffer_b = project_b
3261            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3262            .await
3263            .unwrap();
3264        let (window_b, _) = cx_b.add_window(|_| EmptyView);
3265        let editor_b = cx_b.add_view(window_b, |cx| {
3266            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
3267        });
3268
3269        let fake_language_server = fake_language_servers.next().await.unwrap();
3270        buffer_b
3271            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3272            .await;
3273
3274        // Type a completion trigger character as the guest.
3275        editor_b.update(cx_b, |editor, cx| {
3276            editor.change_selections(None, cx, |s| s.select_ranges([13..13]));
3277            editor.handle_input(&Input(".".into()), cx);
3278            cx.focus(&editor_b);
3279        });
3280
3281        // Receive a completion request as the host's language server.
3282        // Return some completions from the host's language server.
3283        cx_a.foreground().start_waiting();
3284        fake_language_server
3285            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3286                assert_eq!(
3287                    params.text_document_position.text_document.uri,
3288                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3289                );
3290                assert_eq!(
3291                    params.text_document_position.position,
3292                    lsp::Position::new(0, 14),
3293                );
3294
3295                Ok(Some(lsp::CompletionResponse::Array(vec![
3296                    lsp::CompletionItem {
3297                        label: "first_method(…)".into(),
3298                        detail: Some("fn(&mut self, B) -> C".into()),
3299                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3300                            new_text: "first_method($1)".to_string(),
3301                            range: lsp::Range::new(
3302                                lsp::Position::new(0, 14),
3303                                lsp::Position::new(0, 14),
3304                            ),
3305                        })),
3306                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3307                        ..Default::default()
3308                    },
3309                    lsp::CompletionItem {
3310                        label: "second_method(…)".into(),
3311                        detail: Some("fn(&mut self, C) -> D<E>".into()),
3312                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3313                            new_text: "second_method()".to_string(),
3314                            range: lsp::Range::new(
3315                                lsp::Position::new(0, 14),
3316                                lsp::Position::new(0, 14),
3317                            ),
3318                        })),
3319                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3320                        ..Default::default()
3321                    },
3322                ])))
3323            })
3324            .next()
3325            .await
3326            .unwrap();
3327        cx_a.foreground().finish_waiting();
3328
3329        // Open the buffer on the host.
3330        let buffer_a = project_a
3331            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3332            .await
3333            .unwrap();
3334        buffer_a
3335            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3336            .await;
3337
3338        // Confirm a completion on the guest.
3339        editor_b
3340            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3341            .await;
3342        editor_b.update(cx_b, |editor, cx| {
3343            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3344            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3345        });
3346
3347        // Return a resolved completion from the host's language server.
3348        // The resolved completion has an additional text edit.
3349        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3350            |params, _| async move {
3351                assert_eq!(params.label, "first_method(…)");
3352                Ok(lsp::CompletionItem {
3353                    label: "first_method(…)".into(),
3354                    detail: Some("fn(&mut self, B) -> C".into()),
3355                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3356                        new_text: "first_method($1)".to_string(),
3357                        range: lsp::Range::new(
3358                            lsp::Position::new(0, 14),
3359                            lsp::Position::new(0, 14),
3360                        ),
3361                    })),
3362                    additional_text_edits: Some(vec![lsp::TextEdit {
3363                        new_text: "use d::SomeTrait;\n".to_string(),
3364                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3365                    }]),
3366                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3367                    ..Default::default()
3368                })
3369            },
3370        );
3371
3372        // The additional edit is applied.
3373        buffer_a
3374            .condition(&cx_a, |buffer, _| {
3375                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3376            })
3377            .await;
3378        buffer_b
3379            .condition(&cx_b, |buffer, _| {
3380                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3381            })
3382            .await;
3383    }
3384
3385    #[gpui::test(iterations = 10)]
3386    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3387        cx_a.foreground().forbid_parking();
3388        let lang_registry = Arc::new(LanguageRegistry::test());
3389        let fs = FakeFs::new(cx_a.background());
3390
3391        // Connect to a server as 2 clients.
3392        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3393        let client_a = server.create_client(cx_a, "user_a").await;
3394        let mut client_b = server.create_client(cx_b, "user_b").await;
3395        server
3396            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3397            .await;
3398
3399        // Share a project as client A
3400        fs.insert_tree(
3401            "/a",
3402            json!({
3403                "a.rs": "let one = 1;",
3404            }),
3405        )
3406        .await;
3407        let project_a = cx_a.update(|cx| {
3408            Project::local(
3409                client_a.clone(),
3410                client_a.user_store.clone(),
3411                lang_registry.clone(),
3412                fs.clone(),
3413                cx,
3414            )
3415        });
3416        let (worktree_a, _) = project_a
3417            .update(cx_a, |p, cx| {
3418                p.find_or_create_local_worktree("/a", true, cx)
3419            })
3420            .await
3421            .unwrap();
3422        worktree_a
3423            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3424            .await;
3425        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3426        let buffer_a = project_a
3427            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3428            .await
3429            .unwrap();
3430
3431        // Join the worktree as client B.
3432        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3433
3434        let buffer_b = cx_b
3435            .background()
3436            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3437            .await
3438            .unwrap();
3439        buffer_b.update(cx_b, |buffer, cx| {
3440            buffer.edit([(4..7, "six")], cx);
3441            buffer.edit([(10..11, "6")], cx);
3442            assert_eq!(buffer.text(), "let six = 6;");
3443            assert!(buffer.is_dirty());
3444            assert!(!buffer.has_conflict());
3445        });
3446        buffer_a
3447            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3448            .await;
3449
3450        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3451            .await
3452            .unwrap();
3453        buffer_a
3454            .condition(cx_a, |buffer, _| buffer.has_conflict())
3455            .await;
3456        buffer_b
3457            .condition(cx_b, |buffer, _| buffer.has_conflict())
3458            .await;
3459
3460        project_b
3461            .update(cx_b, |project, cx| {
3462                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3463            })
3464            .await
3465            .unwrap();
3466        buffer_a.read_with(cx_a, |buffer, _| {
3467            assert_eq!(buffer.text(), "let seven = 7;");
3468            assert!(!buffer.is_dirty());
3469            assert!(!buffer.has_conflict());
3470        });
3471        buffer_b.read_with(cx_b, |buffer, _| {
3472            assert_eq!(buffer.text(), "let seven = 7;");
3473            assert!(!buffer.is_dirty());
3474            assert!(!buffer.has_conflict());
3475        });
3476
3477        buffer_a.update(cx_a, |buffer, cx| {
3478            // Undoing on the host is a no-op when the reload was initiated by the guest.
3479            buffer.undo(cx);
3480            assert_eq!(buffer.text(), "let seven = 7;");
3481            assert!(!buffer.is_dirty());
3482            assert!(!buffer.has_conflict());
3483        });
3484        buffer_b.update(cx_b, |buffer, cx| {
3485            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3486            buffer.undo(cx);
3487            assert_eq!(buffer.text(), "let six = 6;");
3488            assert!(buffer.is_dirty());
3489            assert!(!buffer.has_conflict());
3490        });
3491    }
3492
3493    #[gpui::test(iterations = 10)]
3494    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3495        cx_a.foreground().forbid_parking();
3496        let lang_registry = Arc::new(LanguageRegistry::test());
3497        let fs = FakeFs::new(cx_a.background());
3498
3499        // Set up a fake language server.
3500        let mut language = Language::new(
3501            LanguageConfig {
3502                name: "Rust".into(),
3503                path_suffixes: vec!["rs".to_string()],
3504                ..Default::default()
3505            },
3506            Some(tree_sitter_rust::language()),
3507        );
3508        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3509        lang_registry.add(Arc::new(language));
3510
3511        // Connect to a server as 2 clients.
3512        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3513        let client_a = server.create_client(cx_a, "user_a").await;
3514        let mut client_b = server.create_client(cx_b, "user_b").await;
3515        server
3516            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3517            .await;
3518
3519        // Share a project as client A
3520        fs.insert_tree(
3521            "/a",
3522            json!({
3523                "a.rs": "let one = two",
3524            }),
3525        )
3526        .await;
3527        let project_a = cx_a.update(|cx| {
3528            Project::local(
3529                client_a.clone(),
3530                client_a.user_store.clone(),
3531                lang_registry.clone(),
3532                fs.clone(),
3533                cx,
3534            )
3535        });
3536        let (worktree_a, _) = project_a
3537            .update(cx_a, |p, cx| {
3538                p.find_or_create_local_worktree("/a", true, cx)
3539            })
3540            .await
3541            .unwrap();
3542        worktree_a
3543            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3544            .await;
3545        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3546
3547        // Join the project as client B.
3548        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3549
3550        let buffer_b = cx_b
3551            .background()
3552            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3553            .await
3554            .unwrap();
3555
3556        let fake_language_server = fake_language_servers.next().await.unwrap();
3557        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3558            Ok(Some(vec![
3559                lsp::TextEdit {
3560                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3561                    new_text: "h".to_string(),
3562                },
3563                lsp::TextEdit {
3564                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3565                    new_text: "y".to_string(),
3566                },
3567            ]))
3568        });
3569
3570        project_b
3571            .update(cx_b, |project, cx| {
3572                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3573            })
3574            .await
3575            .unwrap();
3576        assert_eq!(
3577            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3578            "let honey = two"
3579        );
3580    }
3581
3582    #[gpui::test(iterations = 10)]
3583    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3584        cx_a.foreground().forbid_parking();
3585        let lang_registry = Arc::new(LanguageRegistry::test());
3586        let fs = FakeFs::new(cx_a.background());
3587        fs.insert_tree(
3588            "/root-1",
3589            json!({
3590                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3591            }),
3592        )
3593        .await;
3594        fs.insert_tree(
3595            "/root-2",
3596            json!({
3597                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3598            }),
3599        )
3600        .await;
3601
3602        // Set up a fake language server.
3603        let mut language = Language::new(
3604            LanguageConfig {
3605                name: "Rust".into(),
3606                path_suffixes: vec!["rs".to_string()],
3607                ..Default::default()
3608            },
3609            Some(tree_sitter_rust::language()),
3610        );
3611        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3612        lang_registry.add(Arc::new(language));
3613
3614        // Connect to a server as 2 clients.
3615        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3616        let client_a = server.create_client(cx_a, "user_a").await;
3617        let mut client_b = server.create_client(cx_b, "user_b").await;
3618        server
3619            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3620            .await;
3621
3622        // Share a project as client A
3623        let project_a = cx_a.update(|cx| {
3624            Project::local(
3625                client_a.clone(),
3626                client_a.user_store.clone(),
3627                lang_registry.clone(),
3628                fs.clone(),
3629                cx,
3630            )
3631        });
3632        let (worktree_a, _) = project_a
3633            .update(cx_a, |p, cx| {
3634                p.find_or_create_local_worktree("/root-1", true, cx)
3635            })
3636            .await
3637            .unwrap();
3638        worktree_a
3639            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3640            .await;
3641        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3642
3643        // Join the project as client B.
3644        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3645
3646        // Open the file on client B.
3647        let buffer_b = cx_b
3648            .background()
3649            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3650            .await
3651            .unwrap();
3652
3653        // Request the definition of a symbol as the guest.
3654        let fake_language_server = fake_language_servers.next().await.unwrap();
3655        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3656            |_, _| async move {
3657                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3658                    lsp::Location::new(
3659                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3660                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3661                    ),
3662                )))
3663            },
3664        );
3665
3666        let definitions_1 = project_b
3667            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3668            .await
3669            .unwrap();
3670        cx_b.read(|cx| {
3671            assert_eq!(definitions_1.len(), 1);
3672            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3673            let target_buffer = definitions_1[0].buffer.read(cx);
3674            assert_eq!(
3675                target_buffer.text(),
3676                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3677            );
3678            assert_eq!(
3679                definitions_1[0].range.to_point(target_buffer),
3680                Point::new(0, 6)..Point::new(0, 9)
3681            );
3682        });
3683
3684        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3685        // the previous call to `definition`.
3686        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3687            |_, _| async move {
3688                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3689                    lsp::Location::new(
3690                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3691                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3692                    ),
3693                )))
3694            },
3695        );
3696
3697        let definitions_2 = project_b
3698            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3699            .await
3700            .unwrap();
3701        cx_b.read(|cx| {
3702            assert_eq!(definitions_2.len(), 1);
3703            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3704            let target_buffer = definitions_2[0].buffer.read(cx);
3705            assert_eq!(
3706                target_buffer.text(),
3707                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3708            );
3709            assert_eq!(
3710                definitions_2[0].range.to_point(target_buffer),
3711                Point::new(1, 6)..Point::new(1, 11)
3712            );
3713        });
3714        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3715    }
3716
3717    #[gpui::test(iterations = 10)]
3718    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3719        cx_a.foreground().forbid_parking();
3720        let lang_registry = Arc::new(LanguageRegistry::test());
3721        let fs = FakeFs::new(cx_a.background());
3722        fs.insert_tree(
3723            "/root-1",
3724            json!({
3725                "one.rs": "const ONE: usize = 1;",
3726                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3727            }),
3728        )
3729        .await;
3730        fs.insert_tree(
3731            "/root-2",
3732            json!({
3733                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3734            }),
3735        )
3736        .await;
3737
3738        // Set up a fake language server.
3739        let mut language = Language::new(
3740            LanguageConfig {
3741                name: "Rust".into(),
3742                path_suffixes: vec!["rs".to_string()],
3743                ..Default::default()
3744            },
3745            Some(tree_sitter_rust::language()),
3746        );
3747        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3748        lang_registry.add(Arc::new(language));
3749
3750        // Connect to a server as 2 clients.
3751        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3752        let client_a = server.create_client(cx_a, "user_a").await;
3753        let mut client_b = server.create_client(cx_b, "user_b").await;
3754        server
3755            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3756            .await;
3757
3758        // Share a project as client A
3759        let project_a = cx_a.update(|cx| {
3760            Project::local(
3761                client_a.clone(),
3762                client_a.user_store.clone(),
3763                lang_registry.clone(),
3764                fs.clone(),
3765                cx,
3766            )
3767        });
3768        let (worktree_a, _) = project_a
3769            .update(cx_a, |p, cx| {
3770                p.find_or_create_local_worktree("/root-1", true, cx)
3771            })
3772            .await
3773            .unwrap();
3774        worktree_a
3775            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3776            .await;
3777        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3778
3779        // Join the worktree as client B.
3780        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3781
3782        // Open the file on client B.
3783        let buffer_b = cx_b
3784            .background()
3785            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3786            .await
3787            .unwrap();
3788
3789        // Request references to a symbol as the guest.
3790        let fake_language_server = fake_language_servers.next().await.unwrap();
3791        fake_language_server.handle_request::<lsp::request::References, _, _>(
3792            |params, _| async move {
3793                assert_eq!(
3794                    params.text_document_position.text_document.uri.as_str(),
3795                    "file:///root-1/one.rs"
3796                );
3797                Ok(Some(vec![
3798                    lsp::Location {
3799                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3800                        range: lsp::Range::new(
3801                            lsp::Position::new(0, 24),
3802                            lsp::Position::new(0, 27),
3803                        ),
3804                    },
3805                    lsp::Location {
3806                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3807                        range: lsp::Range::new(
3808                            lsp::Position::new(0, 35),
3809                            lsp::Position::new(0, 38),
3810                        ),
3811                    },
3812                    lsp::Location {
3813                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3814                        range: lsp::Range::new(
3815                            lsp::Position::new(0, 37),
3816                            lsp::Position::new(0, 40),
3817                        ),
3818                    },
3819                ]))
3820            },
3821        );
3822
3823        let references = project_b
3824            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3825            .await
3826            .unwrap();
3827        cx_b.read(|cx| {
3828            assert_eq!(references.len(), 3);
3829            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3830
3831            let two_buffer = references[0].buffer.read(cx);
3832            let three_buffer = references[2].buffer.read(cx);
3833            assert_eq!(
3834                two_buffer.file().unwrap().path().as_ref(),
3835                Path::new("two.rs")
3836            );
3837            assert_eq!(references[1].buffer, references[0].buffer);
3838            assert_eq!(
3839                three_buffer.file().unwrap().full_path(cx),
3840                Path::new("three.rs")
3841            );
3842
3843            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3844            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3845            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3846        });
3847    }
3848
3849    #[gpui::test(iterations = 10)]
3850    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3851        cx_a.foreground().forbid_parking();
3852        let lang_registry = Arc::new(LanguageRegistry::test());
3853        let fs = FakeFs::new(cx_a.background());
3854        fs.insert_tree(
3855            "/root-1",
3856            json!({
3857                "a": "hello world",
3858                "b": "goodnight moon",
3859                "c": "a world of goo",
3860                "d": "world champion of clown world",
3861            }),
3862        )
3863        .await;
3864        fs.insert_tree(
3865            "/root-2",
3866            json!({
3867                "e": "disney world is fun",
3868            }),
3869        )
3870        .await;
3871
3872        // Connect to a server as 2 clients.
3873        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3874        let client_a = server.create_client(cx_a, "user_a").await;
3875        let mut client_b = server.create_client(cx_b, "user_b").await;
3876        server
3877            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3878            .await;
3879
3880        // Share a project as client A
3881        let project_a = cx_a.update(|cx| {
3882            Project::local(
3883                client_a.clone(),
3884                client_a.user_store.clone(),
3885                lang_registry.clone(),
3886                fs.clone(),
3887                cx,
3888            )
3889        });
3890
3891        let (worktree_1, _) = project_a
3892            .update(cx_a, |p, cx| {
3893                p.find_or_create_local_worktree("/root-1", true, cx)
3894            })
3895            .await
3896            .unwrap();
3897        worktree_1
3898            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3899            .await;
3900        let (worktree_2, _) = project_a
3901            .update(cx_a, |p, cx| {
3902                p.find_or_create_local_worktree("/root-2", true, cx)
3903            })
3904            .await
3905            .unwrap();
3906        worktree_2
3907            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3908            .await;
3909
3910        // Join the worktree as client B.
3911        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3912        let results = project_b
3913            .update(cx_b, |project, cx| {
3914                project.search(SearchQuery::text("world", false, false), cx)
3915            })
3916            .await
3917            .unwrap();
3918
3919        let mut ranges_by_path = results
3920            .into_iter()
3921            .map(|(buffer, ranges)| {
3922                buffer.read_with(cx_b, |buffer, cx| {
3923                    let path = buffer.file().unwrap().full_path(cx);
3924                    let offset_ranges = ranges
3925                        .into_iter()
3926                        .map(|range| range.to_offset(buffer))
3927                        .collect::<Vec<_>>();
3928                    (path, offset_ranges)
3929                })
3930            })
3931            .collect::<Vec<_>>();
3932        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3933
3934        assert_eq!(
3935            ranges_by_path,
3936            &[
3937                (PathBuf::from("root-1/a"), vec![6..11]),
3938                (PathBuf::from("root-1/c"), vec![2..7]),
3939                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3940                (PathBuf::from("root-2/e"), vec![7..12]),
3941            ]
3942        );
3943    }
3944
3945    #[gpui::test(iterations = 10)]
3946    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3947        cx_a.foreground().forbid_parking();
3948        let lang_registry = Arc::new(LanguageRegistry::test());
3949        let fs = FakeFs::new(cx_a.background());
3950        fs.insert_tree(
3951            "/root-1",
3952            json!({
3953                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3954            }),
3955        )
3956        .await;
3957
3958        // Set up a fake language server.
3959        let mut language = Language::new(
3960            LanguageConfig {
3961                name: "Rust".into(),
3962                path_suffixes: vec!["rs".to_string()],
3963                ..Default::default()
3964            },
3965            Some(tree_sitter_rust::language()),
3966        );
3967        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3968        lang_registry.add(Arc::new(language));
3969
3970        // Connect to a server as 2 clients.
3971        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3972        let client_a = server.create_client(cx_a, "user_a").await;
3973        let mut client_b = server.create_client(cx_b, "user_b").await;
3974        server
3975            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3976            .await;
3977
3978        // Share a project as client A
3979        let project_a = cx_a.update(|cx| {
3980            Project::local(
3981                client_a.clone(),
3982                client_a.user_store.clone(),
3983                lang_registry.clone(),
3984                fs.clone(),
3985                cx,
3986            )
3987        });
3988        let (worktree_a, _) = project_a
3989            .update(cx_a, |p, cx| {
3990                p.find_or_create_local_worktree("/root-1", true, cx)
3991            })
3992            .await
3993            .unwrap();
3994        worktree_a
3995            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3996            .await;
3997        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3998
3999        // Join the worktree as client B.
4000        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4001
4002        // Open the file on client B.
4003        let buffer_b = cx_b
4004            .background()
4005            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
4006            .await
4007            .unwrap();
4008
4009        // Request document highlights as the guest.
4010        let fake_language_server = fake_language_servers.next().await.unwrap();
4011        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
4012            |params, _| async move {
4013                assert_eq!(
4014                    params
4015                        .text_document_position_params
4016                        .text_document
4017                        .uri
4018                        .as_str(),
4019                    "file:///root-1/main.rs"
4020                );
4021                assert_eq!(
4022                    params.text_document_position_params.position,
4023                    lsp::Position::new(0, 34)
4024                );
4025                Ok(Some(vec![
4026                    lsp::DocumentHighlight {
4027                        kind: Some(lsp::DocumentHighlightKind::WRITE),
4028                        range: lsp::Range::new(
4029                            lsp::Position::new(0, 10),
4030                            lsp::Position::new(0, 16),
4031                        ),
4032                    },
4033                    lsp::DocumentHighlight {
4034                        kind: Some(lsp::DocumentHighlightKind::READ),
4035                        range: lsp::Range::new(
4036                            lsp::Position::new(0, 32),
4037                            lsp::Position::new(0, 38),
4038                        ),
4039                    },
4040                    lsp::DocumentHighlight {
4041                        kind: Some(lsp::DocumentHighlightKind::READ),
4042                        range: lsp::Range::new(
4043                            lsp::Position::new(0, 41),
4044                            lsp::Position::new(0, 47),
4045                        ),
4046                    },
4047                ]))
4048            },
4049        );
4050
4051        let highlights = project_b
4052            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
4053            .await
4054            .unwrap();
4055        buffer_b.read_with(cx_b, |buffer, _| {
4056            let snapshot = buffer.snapshot();
4057
4058            let highlights = highlights
4059                .into_iter()
4060                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
4061                .collect::<Vec<_>>();
4062            assert_eq!(
4063                highlights,
4064                &[
4065                    (lsp::DocumentHighlightKind::WRITE, 10..16),
4066                    (lsp::DocumentHighlightKind::READ, 32..38),
4067                    (lsp::DocumentHighlightKind::READ, 41..47)
4068                ]
4069            )
4070        });
4071    }
4072
4073    #[gpui::test(iterations = 10)]
4074    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4075        cx_a.foreground().forbid_parking();
4076        let lang_registry = Arc::new(LanguageRegistry::test());
4077        let fs = FakeFs::new(cx_a.background());
4078        fs.insert_tree(
4079            "/code",
4080            json!({
4081                "crate-1": {
4082                    "one.rs": "const ONE: usize = 1;",
4083                },
4084                "crate-2": {
4085                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
4086                },
4087                "private": {
4088                    "passwords.txt": "the-password",
4089                }
4090            }),
4091        )
4092        .await;
4093
4094        // Set up a fake language server.
4095        let mut language = Language::new(
4096            LanguageConfig {
4097                name: "Rust".into(),
4098                path_suffixes: vec!["rs".to_string()],
4099                ..Default::default()
4100            },
4101            Some(tree_sitter_rust::language()),
4102        );
4103        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4104        lang_registry.add(Arc::new(language));
4105
4106        // Connect to a server as 2 clients.
4107        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4108        let client_a = server.create_client(cx_a, "user_a").await;
4109        let mut client_b = server.create_client(cx_b, "user_b").await;
4110        server
4111            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4112            .await;
4113
4114        // Share a project as client A
4115        let project_a = cx_a.update(|cx| {
4116            Project::local(
4117                client_a.clone(),
4118                client_a.user_store.clone(),
4119                lang_registry.clone(),
4120                fs.clone(),
4121                cx,
4122            )
4123        });
4124        let (worktree_a, _) = project_a
4125            .update(cx_a, |p, cx| {
4126                p.find_or_create_local_worktree("/code/crate-1", true, cx)
4127            })
4128            .await
4129            .unwrap();
4130        worktree_a
4131            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4132            .await;
4133        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4134
4135        // Join the worktree as client B.
4136        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4137
4138        // Cause the language server to start.
4139        let _buffer = cx_b
4140            .background()
4141            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
4142            .await
4143            .unwrap();
4144
4145        let fake_language_server = fake_language_servers.next().await.unwrap();
4146        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
4147            |_, _| async move {
4148                #[allow(deprecated)]
4149                Ok(Some(vec![lsp::SymbolInformation {
4150                    name: "TWO".into(),
4151                    location: lsp::Location {
4152                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
4153                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4154                    },
4155                    kind: lsp::SymbolKind::CONSTANT,
4156                    tags: None,
4157                    container_name: None,
4158                    deprecated: None,
4159                }]))
4160            },
4161        );
4162
4163        // Request the definition of a symbol as the guest.
4164        let symbols = project_b
4165            .update(cx_b, |p, cx| p.symbols("two", cx))
4166            .await
4167            .unwrap();
4168        assert_eq!(symbols.len(), 1);
4169        assert_eq!(symbols[0].name, "TWO");
4170
4171        // Open one of the returned symbols.
4172        let buffer_b_2 = project_b
4173            .update(cx_b, |project, cx| {
4174                project.open_buffer_for_symbol(&symbols[0], cx)
4175            })
4176            .await
4177            .unwrap();
4178        buffer_b_2.read_with(cx_b, |buffer, _| {
4179            assert_eq!(
4180                buffer.file().unwrap().path().as_ref(),
4181                Path::new("../crate-2/two.rs")
4182            );
4183        });
4184
4185        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
4186        let mut fake_symbol = symbols[0].clone();
4187        fake_symbol.path = Path::new("/code/secrets").into();
4188        let error = project_b
4189            .update(cx_b, |project, cx| {
4190                project.open_buffer_for_symbol(&fake_symbol, cx)
4191            })
4192            .await
4193            .unwrap_err();
4194        assert!(error.to_string().contains("invalid symbol signature"));
4195    }
4196
4197    #[gpui::test(iterations = 10)]
4198    async fn test_open_buffer_while_getting_definition_pointing_to_it(
4199        cx_a: &mut TestAppContext,
4200        cx_b: &mut TestAppContext,
4201        mut rng: StdRng,
4202    ) {
4203        cx_a.foreground().forbid_parking();
4204        let lang_registry = Arc::new(LanguageRegistry::test());
4205        let fs = FakeFs::new(cx_a.background());
4206        fs.insert_tree(
4207            "/root",
4208            json!({
4209                "a.rs": "const ONE: usize = b::TWO;",
4210                "b.rs": "const TWO: usize = 2",
4211            }),
4212        )
4213        .await;
4214
4215        // Set up a fake language server.
4216        let mut language = Language::new(
4217            LanguageConfig {
4218                name: "Rust".into(),
4219                path_suffixes: vec!["rs".to_string()],
4220                ..Default::default()
4221            },
4222            Some(tree_sitter_rust::language()),
4223        );
4224        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4225        lang_registry.add(Arc::new(language));
4226
4227        // Connect to a server as 2 clients.
4228        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4229        let client_a = server.create_client(cx_a, "user_a").await;
4230        let mut client_b = server.create_client(cx_b, "user_b").await;
4231        server
4232            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4233            .await;
4234
4235        // Share a project as client A
4236        let project_a = cx_a.update(|cx| {
4237            Project::local(
4238                client_a.clone(),
4239                client_a.user_store.clone(),
4240                lang_registry.clone(),
4241                fs.clone(),
4242                cx,
4243            )
4244        });
4245
4246        let (worktree_a, _) = project_a
4247            .update(cx_a, |p, cx| {
4248                p.find_or_create_local_worktree("/root", true, cx)
4249            })
4250            .await
4251            .unwrap();
4252        worktree_a
4253            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4254            .await;
4255        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4256
4257        // Join the project as client B.
4258        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4259
4260        let buffer_b1 = cx_b
4261            .background()
4262            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4263            .await
4264            .unwrap();
4265
4266        let fake_language_server = fake_language_servers.next().await.unwrap();
4267        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4268            |_, _| async move {
4269                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4270                    lsp::Location::new(
4271                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
4272                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4273                    ),
4274                )))
4275            },
4276        );
4277
4278        let definitions;
4279        let buffer_b2;
4280        if rng.gen() {
4281            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4282            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4283        } else {
4284            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4285            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4286        }
4287
4288        let buffer_b2 = buffer_b2.await.unwrap();
4289        let definitions = definitions.await.unwrap();
4290        assert_eq!(definitions.len(), 1);
4291        assert_eq!(definitions[0].buffer, buffer_b2);
4292    }
4293
4294    #[gpui::test(iterations = 10)]
4295    async fn test_collaborating_with_code_actions(
4296        cx_a: &mut TestAppContext,
4297        cx_b: &mut TestAppContext,
4298    ) {
4299        cx_a.foreground().forbid_parking();
4300        let lang_registry = Arc::new(LanguageRegistry::test());
4301        let fs = FakeFs::new(cx_a.background());
4302        cx_b.update(|cx| editor::init(cx));
4303
4304        // Set up a fake language server.
4305        let mut language = Language::new(
4306            LanguageConfig {
4307                name: "Rust".into(),
4308                path_suffixes: vec!["rs".to_string()],
4309                ..Default::default()
4310            },
4311            Some(tree_sitter_rust::language()),
4312        );
4313        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4314        lang_registry.add(Arc::new(language));
4315
4316        // Connect to a server as 2 clients.
4317        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4318        let client_a = server.create_client(cx_a, "user_a").await;
4319        let mut client_b = server.create_client(cx_b, "user_b").await;
4320        server
4321            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4322            .await;
4323
4324        // Share a project as client A
4325        fs.insert_tree(
4326            "/a",
4327            json!({
4328                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4329                "other.rs": "pub fn foo() -> usize { 4 }",
4330            }),
4331        )
4332        .await;
4333        let project_a = cx_a.update(|cx| {
4334            Project::local(
4335                client_a.clone(),
4336                client_a.user_store.clone(),
4337                lang_registry.clone(),
4338                fs.clone(),
4339                cx,
4340            )
4341        });
4342        let (worktree_a, _) = project_a
4343            .update(cx_a, |p, cx| {
4344                p.find_or_create_local_worktree("/a", true, cx)
4345            })
4346            .await
4347            .unwrap();
4348        worktree_a
4349            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4350            .await;
4351        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4352
4353        // Join the project as client B.
4354        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4355        let mut params = cx_b.update(WorkspaceParams::test);
4356        params.languages = lang_registry.clone();
4357        params.project = project_b.clone();
4358        params.client = client_b.client.clone();
4359        params.user_store = client_b.user_store.clone();
4360
4361        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4362        let editor_b = workspace_b
4363            .update(cx_b, |workspace, cx| {
4364                workspace.open_path((worktree_id, "main.rs"), true, cx)
4365            })
4366            .await
4367            .unwrap()
4368            .downcast::<Editor>()
4369            .unwrap();
4370
4371        let mut fake_language_server = fake_language_servers.next().await.unwrap();
4372        fake_language_server
4373            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4374                assert_eq!(
4375                    params.text_document.uri,
4376                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4377                );
4378                assert_eq!(params.range.start, lsp::Position::new(0, 0));
4379                assert_eq!(params.range.end, lsp::Position::new(0, 0));
4380                Ok(None)
4381            })
4382            .next()
4383            .await;
4384
4385        // Move cursor to a location that contains code actions.
4386        editor_b.update(cx_b, |editor, cx| {
4387            editor.change_selections(None, cx, |s| {
4388                s.select_ranges([Point::new(1, 31)..Point::new(1, 31)])
4389            });
4390            cx.focus(&editor_b);
4391        });
4392
4393        fake_language_server
4394            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4395                assert_eq!(
4396                    params.text_document.uri,
4397                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4398                );
4399                assert_eq!(params.range.start, lsp::Position::new(1, 31));
4400                assert_eq!(params.range.end, lsp::Position::new(1, 31));
4401
4402                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4403                    lsp::CodeAction {
4404                        title: "Inline into all callers".to_string(),
4405                        edit: Some(lsp::WorkspaceEdit {
4406                            changes: Some(
4407                                [
4408                                    (
4409                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
4410                                        vec![lsp::TextEdit::new(
4411                                            lsp::Range::new(
4412                                                lsp::Position::new(1, 22),
4413                                                lsp::Position::new(1, 34),
4414                                            ),
4415                                            "4".to_string(),
4416                                        )],
4417                                    ),
4418                                    (
4419                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
4420                                        vec![lsp::TextEdit::new(
4421                                            lsp::Range::new(
4422                                                lsp::Position::new(0, 0),
4423                                                lsp::Position::new(0, 27),
4424                                            ),
4425                                            "".to_string(),
4426                                        )],
4427                                    ),
4428                                ]
4429                                .into_iter()
4430                                .collect(),
4431                            ),
4432                            ..Default::default()
4433                        }),
4434                        data: Some(json!({
4435                            "codeActionParams": {
4436                                "range": {
4437                                    "start": {"line": 1, "column": 31},
4438                                    "end": {"line": 1, "column": 31},
4439                                }
4440                            }
4441                        })),
4442                        ..Default::default()
4443                    },
4444                )]))
4445            })
4446            .next()
4447            .await;
4448
4449        // Toggle code actions and wait for them to display.
4450        editor_b.update(cx_b, |editor, cx| {
4451            editor.toggle_code_actions(
4452                &ToggleCodeActions {
4453                    deployed_from_indicator: false,
4454                },
4455                cx,
4456            );
4457        });
4458        editor_b
4459            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4460            .await;
4461
4462        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4463
4464        // Confirming the code action will trigger a resolve request.
4465        let confirm_action = workspace_b
4466            .update(cx_b, |workspace, cx| {
4467                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4468            })
4469            .unwrap();
4470        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4471            |_, _| async move {
4472                Ok(lsp::CodeAction {
4473                    title: "Inline into all callers".to_string(),
4474                    edit: Some(lsp::WorkspaceEdit {
4475                        changes: Some(
4476                            [
4477                                (
4478                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4479                                    vec![lsp::TextEdit::new(
4480                                        lsp::Range::new(
4481                                            lsp::Position::new(1, 22),
4482                                            lsp::Position::new(1, 34),
4483                                        ),
4484                                        "4".to_string(),
4485                                    )],
4486                                ),
4487                                (
4488                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4489                                    vec![lsp::TextEdit::new(
4490                                        lsp::Range::new(
4491                                            lsp::Position::new(0, 0),
4492                                            lsp::Position::new(0, 27),
4493                                        ),
4494                                        "".to_string(),
4495                                    )],
4496                                ),
4497                            ]
4498                            .into_iter()
4499                            .collect(),
4500                        ),
4501                        ..Default::default()
4502                    }),
4503                    ..Default::default()
4504                })
4505            },
4506        );
4507
4508        // After the action is confirmed, an editor containing both modified files is opened.
4509        confirm_action.await.unwrap();
4510        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4511            workspace
4512                .active_item(cx)
4513                .unwrap()
4514                .downcast::<Editor>()
4515                .unwrap()
4516        });
4517        code_action_editor.update(cx_b, |editor, cx| {
4518            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4519            editor.undo(&Undo, cx);
4520            assert_eq!(
4521                editor.text(cx),
4522                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4523            );
4524            editor.redo(&Redo, cx);
4525            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4526        });
4527    }
4528
4529    #[gpui::test(iterations = 10)]
4530    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4531        cx_a.foreground().forbid_parking();
4532        let lang_registry = Arc::new(LanguageRegistry::test());
4533        let fs = FakeFs::new(cx_a.background());
4534        cx_b.update(|cx| editor::init(cx));
4535
4536        // Set up a fake language server.
4537        let mut language = Language::new(
4538            LanguageConfig {
4539                name: "Rust".into(),
4540                path_suffixes: vec!["rs".to_string()],
4541                ..Default::default()
4542            },
4543            Some(tree_sitter_rust::language()),
4544        );
4545        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4546            capabilities: lsp::ServerCapabilities {
4547                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4548                    prepare_provider: Some(true),
4549                    work_done_progress_options: Default::default(),
4550                })),
4551                ..Default::default()
4552            },
4553            ..Default::default()
4554        });
4555        lang_registry.add(Arc::new(language));
4556
4557        // Connect to a server as 2 clients.
4558        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4559        let client_a = server.create_client(cx_a, "user_a").await;
4560        let mut client_b = server.create_client(cx_b, "user_b").await;
4561        server
4562            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4563            .await;
4564
4565        // Share a project as client A
4566        fs.insert_tree(
4567            "/dir",
4568            json!({
4569                "one.rs": "const ONE: usize = 1;",
4570                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4571            }),
4572        )
4573        .await;
4574        let project_a = cx_a.update(|cx| {
4575            Project::local(
4576                client_a.clone(),
4577                client_a.user_store.clone(),
4578                lang_registry.clone(),
4579                fs.clone(),
4580                cx,
4581            )
4582        });
4583        let (worktree_a, _) = project_a
4584            .update(cx_a, |p, cx| {
4585                p.find_or_create_local_worktree("/dir", true, cx)
4586            })
4587            .await
4588            .unwrap();
4589        worktree_a
4590            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4591            .await;
4592        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4593
4594        // Join the worktree as client B.
4595        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4596        let mut params = cx_b.update(WorkspaceParams::test);
4597        params.languages = lang_registry.clone();
4598        params.project = project_b.clone();
4599        params.client = client_b.client.clone();
4600        params.user_store = client_b.user_store.clone();
4601
4602        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4603        let editor_b = workspace_b
4604            .update(cx_b, |workspace, cx| {
4605                workspace.open_path((worktree_id, "one.rs"), true, cx)
4606            })
4607            .await
4608            .unwrap()
4609            .downcast::<Editor>()
4610            .unwrap();
4611        let fake_language_server = fake_language_servers.next().await.unwrap();
4612
4613        // Move cursor to a location that can be renamed.
4614        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4615            editor.change_selections(None, cx, |s| s.select_ranges([7..7]));
4616            editor.rename(&Rename, cx).unwrap()
4617        });
4618
4619        fake_language_server
4620            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4621                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4622                assert_eq!(params.position, lsp::Position::new(0, 7));
4623                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4624                    lsp::Position::new(0, 6),
4625                    lsp::Position::new(0, 9),
4626                ))))
4627            })
4628            .next()
4629            .await
4630            .unwrap();
4631        prepare_rename.await.unwrap();
4632        editor_b.update(cx_b, |editor, cx| {
4633            let rename = editor.pending_rename().unwrap();
4634            let buffer = editor.buffer().read(cx).snapshot(cx);
4635            assert_eq!(
4636                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4637                6..9
4638            );
4639            rename.editor.update(cx, |rename_editor, cx| {
4640                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4641                    rename_buffer.edit([(0..3, "THREE")], cx);
4642                });
4643            });
4644        });
4645
4646        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4647            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4648        });
4649        fake_language_server
4650            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4651                assert_eq!(
4652                    params.text_document_position.text_document.uri.as_str(),
4653                    "file:///dir/one.rs"
4654                );
4655                assert_eq!(
4656                    params.text_document_position.position,
4657                    lsp::Position::new(0, 6)
4658                );
4659                assert_eq!(params.new_name, "THREE");
4660                Ok(Some(lsp::WorkspaceEdit {
4661                    changes: Some(
4662                        [
4663                            (
4664                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4665                                vec![lsp::TextEdit::new(
4666                                    lsp::Range::new(
4667                                        lsp::Position::new(0, 6),
4668                                        lsp::Position::new(0, 9),
4669                                    ),
4670                                    "THREE".to_string(),
4671                                )],
4672                            ),
4673                            (
4674                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4675                                vec![
4676                                    lsp::TextEdit::new(
4677                                        lsp::Range::new(
4678                                            lsp::Position::new(0, 24),
4679                                            lsp::Position::new(0, 27),
4680                                        ),
4681                                        "THREE".to_string(),
4682                                    ),
4683                                    lsp::TextEdit::new(
4684                                        lsp::Range::new(
4685                                            lsp::Position::new(0, 35),
4686                                            lsp::Position::new(0, 38),
4687                                        ),
4688                                        "THREE".to_string(),
4689                                    ),
4690                                ],
4691                            ),
4692                        ]
4693                        .into_iter()
4694                        .collect(),
4695                    ),
4696                    ..Default::default()
4697                }))
4698            })
4699            .next()
4700            .await
4701            .unwrap();
4702        confirm_rename.await.unwrap();
4703
4704        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4705            workspace
4706                .active_item(cx)
4707                .unwrap()
4708                .downcast::<Editor>()
4709                .unwrap()
4710        });
4711        rename_editor.update(cx_b, |editor, cx| {
4712            assert_eq!(
4713                editor.text(cx),
4714                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4715            );
4716            editor.undo(&Undo, cx);
4717            assert_eq!(
4718                editor.text(cx),
4719                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4720            );
4721            editor.redo(&Redo, cx);
4722            assert_eq!(
4723                editor.text(cx),
4724                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4725            );
4726        });
4727
4728        // Ensure temporary rename edits cannot be undone/redone.
4729        editor_b.update(cx_b, |editor, cx| {
4730            editor.undo(&Undo, cx);
4731            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4732            editor.undo(&Undo, cx);
4733            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4734            editor.redo(&Redo, cx);
4735            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4736        })
4737    }
4738
4739    #[gpui::test(iterations = 10)]
4740    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4741        cx_a.foreground().forbid_parking();
4742
4743        // Connect to a server as 2 clients.
4744        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4745        let client_a = server.create_client(cx_a, "user_a").await;
4746        let client_b = server.create_client(cx_b, "user_b").await;
4747
4748        // Create an org that includes these 2 users.
4749        let db = &server.app_state.db;
4750        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4751        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4752            .await
4753            .unwrap();
4754        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4755            .await
4756            .unwrap();
4757
4758        // Create a channel that includes all the users.
4759        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4760        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4761            .await
4762            .unwrap();
4763        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4764            .await
4765            .unwrap();
4766        db.create_channel_message(
4767            channel_id,
4768            client_b.current_user_id(&cx_b),
4769            "hello A, it's B.",
4770            OffsetDateTime::now_utc(),
4771            1,
4772        )
4773        .await
4774        .unwrap();
4775
4776        let channels_a = cx_a
4777            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4778        channels_a
4779            .condition(cx_a, |list, _| list.available_channels().is_some())
4780            .await;
4781        channels_a.read_with(cx_a, |list, _| {
4782            assert_eq!(
4783                list.available_channels().unwrap(),
4784                &[ChannelDetails {
4785                    id: channel_id.to_proto(),
4786                    name: "test-channel".to_string()
4787                }]
4788            )
4789        });
4790        let channel_a = channels_a.update(cx_a, |this, cx| {
4791            this.get_channel(channel_id.to_proto(), cx).unwrap()
4792        });
4793        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4794        channel_a
4795            .condition(&cx_a, |channel, _| {
4796                channel_messages(channel)
4797                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4798            })
4799            .await;
4800
4801        let channels_b = cx_b
4802            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4803        channels_b
4804            .condition(cx_b, |list, _| list.available_channels().is_some())
4805            .await;
4806        channels_b.read_with(cx_b, |list, _| {
4807            assert_eq!(
4808                list.available_channels().unwrap(),
4809                &[ChannelDetails {
4810                    id: channel_id.to_proto(),
4811                    name: "test-channel".to_string()
4812                }]
4813            )
4814        });
4815
4816        let channel_b = channels_b.update(cx_b, |this, cx| {
4817            this.get_channel(channel_id.to_proto(), cx).unwrap()
4818        });
4819        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4820        channel_b
4821            .condition(&cx_b, |channel, _| {
4822                channel_messages(channel)
4823                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4824            })
4825            .await;
4826
4827        channel_a
4828            .update(cx_a, |channel, cx| {
4829                channel
4830                    .send_message("oh, hi B.".to_string(), cx)
4831                    .unwrap()
4832                    .detach();
4833                let task = channel.send_message("sup".to_string(), cx).unwrap();
4834                assert_eq!(
4835                    channel_messages(channel),
4836                    &[
4837                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4838                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4839                        ("user_a".to_string(), "sup".to_string(), true)
4840                    ]
4841                );
4842                task
4843            })
4844            .await
4845            .unwrap();
4846
4847        channel_b
4848            .condition(&cx_b, |channel, _| {
4849                channel_messages(channel)
4850                    == [
4851                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4852                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4853                        ("user_a".to_string(), "sup".to_string(), false),
4854                    ]
4855            })
4856            .await;
4857
4858        assert_eq!(
4859            server
4860                .state()
4861                .await
4862                .channel(channel_id)
4863                .unwrap()
4864                .connection_ids
4865                .len(),
4866            2
4867        );
4868        cx_b.update(|_| drop(channel_b));
4869        server
4870            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4871            .await;
4872
4873        cx_a.update(|_| drop(channel_a));
4874        server
4875            .condition(|state| state.channel(channel_id).is_none())
4876            .await;
4877    }
4878
4879    #[gpui::test(iterations = 10)]
4880    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4881        cx_a.foreground().forbid_parking();
4882
4883        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4884        let client_a = server.create_client(cx_a, "user_a").await;
4885
4886        let db = &server.app_state.db;
4887        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4888        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4889        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4890            .await
4891            .unwrap();
4892        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4893            .await
4894            .unwrap();
4895
4896        let channels_a = cx_a
4897            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4898        channels_a
4899            .condition(cx_a, |list, _| list.available_channels().is_some())
4900            .await;
4901        let channel_a = channels_a.update(cx_a, |this, cx| {
4902            this.get_channel(channel_id.to_proto(), cx).unwrap()
4903        });
4904
4905        // Messages aren't allowed to be too long.
4906        channel_a
4907            .update(cx_a, |channel, cx| {
4908                let long_body = "this is long.\n".repeat(1024);
4909                channel.send_message(long_body, cx).unwrap()
4910            })
4911            .await
4912            .unwrap_err();
4913
4914        // Messages aren't allowed to be blank.
4915        channel_a.update(cx_a, |channel, cx| {
4916            channel.send_message(String::new(), cx).unwrap_err()
4917        });
4918
4919        // Leading and trailing whitespace are trimmed.
4920        channel_a
4921            .update(cx_a, |channel, cx| {
4922                channel
4923                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4924                    .unwrap()
4925            })
4926            .await
4927            .unwrap();
4928        assert_eq!(
4929            db.get_channel_messages(channel_id, 10, None)
4930                .await
4931                .unwrap()
4932                .iter()
4933                .map(|m| &m.body)
4934                .collect::<Vec<_>>(),
4935            &["surrounded by whitespace"]
4936        );
4937    }
4938
4939    #[gpui::test(iterations = 10)]
4940    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4941        cx_a.foreground().forbid_parking();
4942
4943        // Connect to a server as 2 clients.
4944        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4945        let client_a = server.create_client(cx_a, "user_a").await;
4946        let client_b = server.create_client(cx_b, "user_b").await;
4947        let mut status_b = client_b.status();
4948
4949        // Create an org that includes these 2 users.
4950        let db = &server.app_state.db;
4951        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4952        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4953            .await
4954            .unwrap();
4955        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4956            .await
4957            .unwrap();
4958
4959        // Create a channel that includes all the users.
4960        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4961        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4962            .await
4963            .unwrap();
4964        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4965            .await
4966            .unwrap();
4967        db.create_channel_message(
4968            channel_id,
4969            client_b.current_user_id(&cx_b),
4970            "hello A, it's B.",
4971            OffsetDateTime::now_utc(),
4972            2,
4973        )
4974        .await
4975        .unwrap();
4976
4977        let channels_a = cx_a
4978            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4979        channels_a
4980            .condition(cx_a, |list, _| list.available_channels().is_some())
4981            .await;
4982
4983        channels_a.read_with(cx_a, |list, _| {
4984            assert_eq!(
4985                list.available_channels().unwrap(),
4986                &[ChannelDetails {
4987                    id: channel_id.to_proto(),
4988                    name: "test-channel".to_string()
4989                }]
4990            )
4991        });
4992        let channel_a = channels_a.update(cx_a, |this, cx| {
4993            this.get_channel(channel_id.to_proto(), cx).unwrap()
4994        });
4995        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4996        channel_a
4997            .condition(&cx_a, |channel, _| {
4998                channel_messages(channel)
4999                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5000            })
5001            .await;
5002
5003        let channels_b = cx_b
5004            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
5005        channels_b
5006            .condition(cx_b, |list, _| list.available_channels().is_some())
5007            .await;
5008        channels_b.read_with(cx_b, |list, _| {
5009            assert_eq!(
5010                list.available_channels().unwrap(),
5011                &[ChannelDetails {
5012                    id: channel_id.to_proto(),
5013                    name: "test-channel".to_string()
5014                }]
5015            )
5016        });
5017
5018        let channel_b = channels_b.update(cx_b, |this, cx| {
5019            this.get_channel(channel_id.to_proto(), cx).unwrap()
5020        });
5021        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
5022        channel_b
5023            .condition(&cx_b, |channel, _| {
5024                channel_messages(channel)
5025                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5026            })
5027            .await;
5028
5029        // Disconnect client B, ensuring we can still access its cached channel data.
5030        server.forbid_connections();
5031        server.disconnect_client(client_b.current_user_id(&cx_b));
5032        cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
5033        while !matches!(
5034            status_b.next().await,
5035            Some(client::Status::ReconnectionError { .. })
5036        ) {}
5037
5038        channels_b.read_with(cx_b, |channels, _| {
5039            assert_eq!(
5040                channels.available_channels().unwrap(),
5041                [ChannelDetails {
5042                    id: channel_id.to_proto(),
5043                    name: "test-channel".to_string()
5044                }]
5045            )
5046        });
5047        channel_b.read_with(cx_b, |channel, _| {
5048            assert_eq!(
5049                channel_messages(channel),
5050                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
5051            )
5052        });
5053
5054        // Send a message from client B while it is disconnected.
5055        channel_b
5056            .update(cx_b, |channel, cx| {
5057                let task = channel
5058                    .send_message("can you see this?".to_string(), cx)
5059                    .unwrap();
5060                assert_eq!(
5061                    channel_messages(channel),
5062                    &[
5063                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5064                        ("user_b".to_string(), "can you see this?".to_string(), true)
5065                    ]
5066                );
5067                task
5068            })
5069            .await
5070            .unwrap_err();
5071
5072        // Send a message from client A while B is disconnected.
5073        channel_a
5074            .update(cx_a, |channel, cx| {
5075                channel
5076                    .send_message("oh, hi B.".to_string(), cx)
5077                    .unwrap()
5078                    .detach();
5079                let task = channel.send_message("sup".to_string(), cx).unwrap();
5080                assert_eq!(
5081                    channel_messages(channel),
5082                    &[
5083                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5084                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
5085                        ("user_a".to_string(), "sup".to_string(), true)
5086                    ]
5087                );
5088                task
5089            })
5090            .await
5091            .unwrap();
5092
5093        // Give client B a chance to reconnect.
5094        server.allow_connections();
5095        cx_b.foreground().advance_clock(Duration::from_secs(10));
5096
5097        // Verify that B sees the new messages upon reconnection, as well as the message client B
5098        // sent while offline.
5099        channel_b
5100            .condition(&cx_b, |channel, _| {
5101                channel_messages(channel)
5102                    == [
5103                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5104                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
5105                        ("user_a".to_string(), "sup".to_string(), false),
5106                        ("user_b".to_string(), "can you see this?".to_string(), false),
5107                    ]
5108            })
5109            .await;
5110
5111        // Ensure client A and B can communicate normally after reconnection.
5112        channel_a
5113            .update(cx_a, |channel, cx| {
5114                channel.send_message("you online?".to_string(), cx).unwrap()
5115            })
5116            .await
5117            .unwrap();
5118        channel_b
5119            .condition(&cx_b, |channel, _| {
5120                channel_messages(channel)
5121                    == [
5122                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5123                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
5124                        ("user_a".to_string(), "sup".to_string(), false),
5125                        ("user_b".to_string(), "can you see this?".to_string(), false),
5126                        ("user_a".to_string(), "you online?".to_string(), false),
5127                    ]
5128            })
5129            .await;
5130
5131        channel_b
5132            .update(cx_b, |channel, cx| {
5133                channel.send_message("yep".to_string(), cx).unwrap()
5134            })
5135            .await
5136            .unwrap();
5137        channel_a
5138            .condition(&cx_a, |channel, _| {
5139                channel_messages(channel)
5140                    == [
5141                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5142                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
5143                        ("user_a".to_string(), "sup".to_string(), false),
5144                        ("user_b".to_string(), "can you see this?".to_string(), false),
5145                        ("user_a".to_string(), "you online?".to_string(), false),
5146                        ("user_b".to_string(), "yep".to_string(), false),
5147                    ]
5148            })
5149            .await;
5150    }
5151
5152    #[gpui::test(iterations = 10)]
5153    async fn test_contacts(
5154        deterministic: Arc<Deterministic>,
5155        cx_a: &mut TestAppContext,
5156        cx_b: &mut TestAppContext,
5157        cx_c: &mut TestAppContext,
5158    ) {
5159        cx_a.foreground().forbid_parking();
5160
5161        // Connect to a server as 3 clients.
5162        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5163        let mut client_a = server.create_client(cx_a, "user_a").await;
5164        let mut client_b = server.create_client(cx_b, "user_b").await;
5165        let client_c = server.create_client(cx_c, "user_c").await;
5166        server
5167            .make_contacts(vec![
5168                (&client_a, cx_a),
5169                (&client_b, cx_b),
5170                (&client_c, cx_c),
5171            ])
5172            .await;
5173
5174        deterministic.run_until_parked();
5175        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5176            client.user_store.read_with(*cx, |store, _| {
5177                assert_eq!(
5178                    contacts(store),
5179                    [
5180                        ("user_a", true, vec![]),
5181                        ("user_b", true, vec![]),
5182                        ("user_c", true, vec![])
5183                    ],
5184                    "{} has the wrong contacts",
5185                    client.username
5186                )
5187            });
5188        }
5189
5190        // Share a project as client A.
5191        let fs = FakeFs::new(cx_a.background());
5192        fs.create_dir(Path::new("/a")).await.unwrap();
5193        let (project_a, _) = client_a.build_local_project(fs, "/a", cx_a).await;
5194
5195        deterministic.run_until_parked();
5196        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5197            client.user_store.read_with(*cx, |store, _| {
5198                assert_eq!(
5199                    contacts(store),
5200                    [
5201                        ("user_a", true, vec![("a", vec![])]),
5202                        ("user_b", true, vec![]),
5203                        ("user_c", true, vec![])
5204                    ],
5205                    "{} has the wrong contacts",
5206                    client.username
5207                )
5208            });
5209        }
5210
5211        let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5212
5213        deterministic.run_until_parked();
5214        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5215            client.user_store.read_with(*cx, |store, _| {
5216                assert_eq!(
5217                    contacts(store),
5218                    [
5219                        ("user_a", true, vec![("a", vec!["user_b"])]),
5220                        ("user_b", true, vec![]),
5221                        ("user_c", true, vec![])
5222                    ],
5223                    "{} has the wrong contacts",
5224                    client.username
5225                )
5226            });
5227        }
5228
5229        // Add a local project as client B
5230        let fs = FakeFs::new(cx_b.background());
5231        fs.create_dir(Path::new("/b")).await.unwrap();
5232        let (_project_b, _) = client_b.build_local_project(fs, "/b", cx_a).await;
5233
5234        deterministic.run_until_parked();
5235        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5236            client.user_store.read_with(*cx, |store, _| {
5237                assert_eq!(
5238                    contacts(store),
5239                    [
5240                        ("user_a", true, vec![("a", vec!["user_b"])]),
5241                        ("user_b", true, vec![("b", vec![])]),
5242                        ("user_c", true, vec![])
5243                    ],
5244                    "{} has the wrong contacts",
5245                    client.username
5246                )
5247            });
5248        }
5249
5250        project_a
5251            .condition(&cx_a, |project, _| {
5252                project.collaborators().contains_key(&client_b.peer_id)
5253            })
5254            .await;
5255
5256        client_a.project.take();
5257        cx_a.update(move |_| drop(project_a));
5258        deterministic.run_until_parked();
5259        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5260            client.user_store.read_with(*cx, |store, _| {
5261                assert_eq!(
5262                    contacts(store),
5263                    [
5264                        ("user_a", true, vec![]),
5265                        ("user_b", true, vec![("b", vec![])]),
5266                        ("user_c", true, vec![])
5267                    ],
5268                    "{} has the wrong contacts",
5269                    client.username
5270                )
5271            });
5272        }
5273
5274        server.disconnect_client(client_c.current_user_id(cx_c));
5275        server.forbid_connections();
5276        deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5277        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5278            client.user_store.read_with(*cx, |store, _| {
5279                assert_eq!(
5280                    contacts(store),
5281                    [
5282                        ("user_a", true, vec![]),
5283                        ("user_b", true, vec![("b", vec![])]),
5284                        ("user_c", false, vec![])
5285                    ],
5286                    "{} has the wrong contacts",
5287                    client.username
5288                )
5289            });
5290        }
5291        client_c
5292            .user_store
5293            .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5294
5295        server.allow_connections();
5296        client_c
5297            .authenticate_and_connect(false, &cx_c.to_async())
5298            .await
5299            .unwrap();
5300
5301        deterministic.run_until_parked();
5302        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5303            client.user_store.read_with(*cx, |store, _| {
5304                assert_eq!(
5305                    contacts(store),
5306                    [
5307                        ("user_a", true, vec![]),
5308                        ("user_b", true, vec![("b", vec![])]),
5309                        ("user_c", true, vec![])
5310                    ],
5311                    "{} has the wrong contacts",
5312                    client.username
5313                )
5314            });
5315        }
5316
5317        fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, Vec<&str>)>)> {
5318            user_store
5319                .contacts()
5320                .iter()
5321                .map(|contact| {
5322                    let projects = contact
5323                        .projects
5324                        .iter()
5325                        .map(|p| {
5326                            (
5327                                p.worktree_root_names[0].as_str(),
5328                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5329                            )
5330                        })
5331                        .collect();
5332                    (contact.user.github_login.as_str(), contact.online, projects)
5333                })
5334                .collect()
5335        }
5336    }
5337
5338    #[gpui::test(iterations = 10)]
5339    async fn test_contact_requests(
5340        executor: Arc<Deterministic>,
5341        cx_a: &mut TestAppContext,
5342        cx_a2: &mut TestAppContext,
5343        cx_b: &mut TestAppContext,
5344        cx_b2: &mut TestAppContext,
5345        cx_c: &mut TestAppContext,
5346        cx_c2: &mut TestAppContext,
5347    ) {
5348        cx_a.foreground().forbid_parking();
5349
5350        // Connect to a server as 3 clients.
5351        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5352        let client_a = server.create_client(cx_a, "user_a").await;
5353        let client_a2 = server.create_client(cx_a2, "user_a").await;
5354        let client_b = server.create_client(cx_b, "user_b").await;
5355        let client_b2 = server.create_client(cx_b2, "user_b").await;
5356        let client_c = server.create_client(cx_c, "user_c").await;
5357        let client_c2 = server.create_client(cx_c2, "user_c").await;
5358
5359        assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5360        assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5361        assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5362
5363        // User A and User C request that user B become their contact.
5364        client_a
5365            .user_store
5366            .update(cx_a, |store, cx| {
5367                store.request_contact(client_b.user_id().unwrap(), cx)
5368            })
5369            .await
5370            .unwrap();
5371        client_c
5372            .user_store
5373            .update(cx_c, |store, cx| {
5374                store.request_contact(client_b.user_id().unwrap(), cx)
5375            })
5376            .await
5377            .unwrap();
5378        executor.run_until_parked();
5379
5380        // All users see the pending request appear in all their clients.
5381        assert_eq!(
5382            client_a.summarize_contacts(&cx_a).outgoing_requests,
5383            &["user_b"]
5384        );
5385        assert_eq!(
5386            client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5387            &["user_b"]
5388        );
5389        assert_eq!(
5390            client_b.summarize_contacts(&cx_b).incoming_requests,
5391            &["user_a", "user_c"]
5392        );
5393        assert_eq!(
5394            client_b2.summarize_contacts(&cx_b2).incoming_requests,
5395            &["user_a", "user_c"]
5396        );
5397        assert_eq!(
5398            client_c.summarize_contacts(&cx_c).outgoing_requests,
5399            &["user_b"]
5400        );
5401        assert_eq!(
5402            client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5403            &["user_b"]
5404        );
5405
5406        // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5407        disconnect_and_reconnect(&client_a, cx_a).await;
5408        disconnect_and_reconnect(&client_b, cx_b).await;
5409        disconnect_and_reconnect(&client_c, cx_c).await;
5410        executor.run_until_parked();
5411        assert_eq!(
5412            client_a.summarize_contacts(&cx_a).outgoing_requests,
5413            &["user_b"]
5414        );
5415        assert_eq!(
5416            client_b.summarize_contacts(&cx_b).incoming_requests,
5417            &["user_a", "user_c"]
5418        );
5419        assert_eq!(
5420            client_c.summarize_contacts(&cx_c).outgoing_requests,
5421            &["user_b"]
5422        );
5423
5424        // User B accepts the request from user A.
5425        client_b
5426            .user_store
5427            .update(cx_b, |store, cx| {
5428                store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5429            })
5430            .await
5431            .unwrap();
5432
5433        executor.run_until_parked();
5434
5435        // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5436        let contacts_b = client_b.summarize_contacts(&cx_b);
5437        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5438        assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5439        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5440        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5441        assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5442
5443        // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5444        let contacts_a = client_a.summarize_contacts(&cx_a);
5445        assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5446        assert!(contacts_a.outgoing_requests.is_empty());
5447        let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5448        assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5449        assert!(contacts_a2.outgoing_requests.is_empty());
5450
5451        // Contacts are present upon connecting (tested here via disconnect/reconnect)
5452        disconnect_and_reconnect(&client_a, cx_a).await;
5453        disconnect_and_reconnect(&client_b, cx_b).await;
5454        disconnect_and_reconnect(&client_c, cx_c).await;
5455        executor.run_until_parked();
5456        assert_eq!(
5457            client_a.summarize_contacts(&cx_a).current,
5458            &["user_a", "user_b"]
5459        );
5460        assert_eq!(
5461            client_b.summarize_contacts(&cx_b).current,
5462            &["user_a", "user_b"]
5463        );
5464        assert_eq!(
5465            client_b.summarize_contacts(&cx_b).incoming_requests,
5466            &["user_c"]
5467        );
5468        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5469        assert_eq!(
5470            client_c.summarize_contacts(&cx_c).outgoing_requests,
5471            &["user_b"]
5472        );
5473
5474        // User B rejects the request from user C.
5475        client_b
5476            .user_store
5477            .update(cx_b, |store, cx| {
5478                store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5479            })
5480            .await
5481            .unwrap();
5482
5483        executor.run_until_parked();
5484
5485        // User B doesn't see user C as their contact, and the incoming request from them is removed.
5486        let contacts_b = client_b.summarize_contacts(&cx_b);
5487        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5488        assert!(contacts_b.incoming_requests.is_empty());
5489        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5490        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5491        assert!(contacts_b2.incoming_requests.is_empty());
5492
5493        // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5494        let contacts_c = client_c.summarize_contacts(&cx_c);
5495        assert_eq!(contacts_c.current, &["user_c"]);
5496        assert!(contacts_c.outgoing_requests.is_empty());
5497        let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5498        assert_eq!(contacts_c2.current, &["user_c"]);
5499        assert!(contacts_c2.outgoing_requests.is_empty());
5500
5501        // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5502        disconnect_and_reconnect(&client_a, cx_a).await;
5503        disconnect_and_reconnect(&client_b, cx_b).await;
5504        disconnect_and_reconnect(&client_c, cx_c).await;
5505        executor.run_until_parked();
5506        assert_eq!(
5507            client_a.summarize_contacts(&cx_a).current,
5508            &["user_a", "user_b"]
5509        );
5510        assert_eq!(
5511            client_b.summarize_contacts(&cx_b).current,
5512            &["user_a", "user_b"]
5513        );
5514        assert!(client_b
5515            .summarize_contacts(&cx_b)
5516            .incoming_requests
5517            .is_empty());
5518        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5519        assert!(client_c
5520            .summarize_contacts(&cx_c)
5521            .outgoing_requests
5522            .is_empty());
5523
5524        async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5525            client.disconnect(&cx.to_async()).unwrap();
5526            client.clear_contacts(cx).await;
5527            client
5528                .authenticate_and_connect(false, &cx.to_async())
5529                .await
5530                .unwrap();
5531        }
5532    }
5533
5534    #[gpui::test(iterations = 10)]
5535    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5536        cx_a.foreground().forbid_parking();
5537        let fs = FakeFs::new(cx_a.background());
5538
5539        // 2 clients connect to a server.
5540        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5541        let mut client_a = server.create_client(cx_a, "user_a").await;
5542        let mut client_b = server.create_client(cx_b, "user_b").await;
5543        server
5544            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5545            .await;
5546        cx_a.update(editor::init);
5547        cx_b.update(editor::init);
5548
5549        // Client A shares a project.
5550        fs.insert_tree(
5551            "/a",
5552            json!({
5553                "1.txt": "one",
5554                "2.txt": "two",
5555                "3.txt": "three",
5556            }),
5557        )
5558        .await;
5559        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5560
5561        // Client B joins the project.
5562        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5563
5564        // Client A opens some editors.
5565        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5566        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5567        let editor_a1 = workspace_a
5568            .update(cx_a, |workspace, cx| {
5569                workspace.open_path((worktree_id, "1.txt"), true, cx)
5570            })
5571            .await
5572            .unwrap()
5573            .downcast::<Editor>()
5574            .unwrap();
5575        let editor_a2 = workspace_a
5576            .update(cx_a, |workspace, cx| {
5577                workspace.open_path((worktree_id, "2.txt"), true, cx)
5578            })
5579            .await
5580            .unwrap()
5581            .downcast::<Editor>()
5582            .unwrap();
5583
5584        // Client B opens an editor.
5585        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5586        let editor_b1 = workspace_b
5587            .update(cx_b, |workspace, cx| {
5588                workspace.open_path((worktree_id, "1.txt"), true, cx)
5589            })
5590            .await
5591            .unwrap()
5592            .downcast::<Editor>()
5593            .unwrap();
5594
5595        let client_a_id = project_b.read_with(cx_b, |project, _| {
5596            project.collaborators().values().next().unwrap().peer_id
5597        });
5598        let client_b_id = project_a.read_with(cx_a, |project, _| {
5599            project.collaborators().values().next().unwrap().peer_id
5600        });
5601
5602        // When client B starts following client A, all visible view states are replicated to client B.
5603        editor_a1.update(cx_a, |editor, cx| {
5604            editor.change_selections(None, cx, |s| s.select_ranges([0..1]))
5605        });
5606        editor_a2.update(cx_a, |editor, cx| {
5607            editor.change_selections(None, cx, |s| s.select_ranges([2..3]))
5608        });
5609        workspace_b
5610            .update(cx_b, |workspace, cx| {
5611                workspace
5612                    .toggle_follow(&ToggleFollow(client_a_id), cx)
5613                    .unwrap()
5614            })
5615            .await
5616            .unwrap();
5617
5618        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5619            workspace
5620                .active_item(cx)
5621                .unwrap()
5622                .downcast::<Editor>()
5623                .unwrap()
5624        });
5625        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5626        assert_eq!(
5627            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5628            Some((worktree_id, "2.txt").into())
5629        );
5630        assert_eq!(
5631            editor_b2.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5632            vec![2..3]
5633        );
5634        assert_eq!(
5635            editor_b1.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5636            vec![0..1]
5637        );
5638
5639        // When client A activates a different editor, client B does so as well.
5640        workspace_a.update(cx_a, |workspace, cx| {
5641            workspace.activate_item(&editor_a1, cx)
5642        });
5643        workspace_b
5644            .condition(cx_b, |workspace, cx| {
5645                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5646            })
5647            .await;
5648
5649        // When client A navigates back and forth, client B does so as well.
5650        workspace_a
5651            .update(cx_a, |workspace, cx| {
5652                workspace::Pane::go_back(workspace, None, cx)
5653            })
5654            .await;
5655        workspace_b
5656            .condition(cx_b, |workspace, cx| {
5657                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5658            })
5659            .await;
5660
5661        workspace_a
5662            .update(cx_a, |workspace, cx| {
5663                workspace::Pane::go_forward(workspace, None, cx)
5664            })
5665            .await;
5666        workspace_b
5667            .condition(cx_b, |workspace, cx| {
5668                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5669            })
5670            .await;
5671
5672        // Changes to client A's editor are reflected on client B.
5673        editor_a1.update(cx_a, |editor, cx| {
5674            editor.change_selections(None, cx, |s| s.select_ranges([1..1, 2..2]));
5675        });
5676        editor_b1
5677            .condition(cx_b, |editor, cx| {
5678                editor.selections.ranges(cx) == vec![1..1, 2..2]
5679            })
5680            .await;
5681
5682        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5683        editor_b1
5684            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5685            .await;
5686
5687        editor_a1.update(cx_a, |editor, cx| {
5688            editor.change_selections(None, cx, |s| s.select_ranges([3..3]));
5689            editor.set_scroll_position(vec2f(0., 100.), cx);
5690        });
5691        editor_b1
5692            .condition(cx_b, |editor, cx| {
5693                editor.selections.ranges(cx) == vec![3..3]
5694            })
5695            .await;
5696
5697        // After unfollowing, client B stops receiving updates from client A.
5698        workspace_b.update(cx_b, |workspace, cx| {
5699            workspace.unfollow(&workspace.active_pane().clone(), cx)
5700        });
5701        workspace_a.update(cx_a, |workspace, cx| {
5702            workspace.activate_item(&editor_a2, cx)
5703        });
5704        cx_a.foreground().run_until_parked();
5705        assert_eq!(
5706            workspace_b.read_with(cx_b, |workspace, cx| workspace
5707                .active_item(cx)
5708                .unwrap()
5709                .id()),
5710            editor_b1.id()
5711        );
5712
5713        // Client A starts following client B.
5714        workspace_a
5715            .update(cx_a, |workspace, cx| {
5716                workspace
5717                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5718                    .unwrap()
5719            })
5720            .await
5721            .unwrap();
5722        assert_eq!(
5723            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5724            Some(client_b_id)
5725        );
5726        assert_eq!(
5727            workspace_a.read_with(cx_a, |workspace, cx| workspace
5728                .active_item(cx)
5729                .unwrap()
5730                .id()),
5731            editor_a1.id()
5732        );
5733
5734        // Following interrupts when client B disconnects.
5735        client_b.disconnect(&cx_b.to_async()).unwrap();
5736        cx_a.foreground().run_until_parked();
5737        assert_eq!(
5738            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5739            None
5740        );
5741    }
5742
5743    #[gpui::test(iterations = 10)]
5744    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5745        cx_a.foreground().forbid_parking();
5746        let fs = FakeFs::new(cx_a.background());
5747
5748        // 2 clients connect to a server.
5749        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5750        let mut client_a = server.create_client(cx_a, "user_a").await;
5751        let mut client_b = server.create_client(cx_b, "user_b").await;
5752        server
5753            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5754            .await;
5755        cx_a.update(editor::init);
5756        cx_b.update(editor::init);
5757
5758        // Client A shares a project.
5759        fs.insert_tree(
5760            "/a",
5761            json!({
5762                "1.txt": "one",
5763                "2.txt": "two",
5764                "3.txt": "three",
5765                "4.txt": "four",
5766            }),
5767        )
5768        .await;
5769        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5770
5771        // Client B joins the project.
5772        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5773
5774        // Client A opens some editors.
5775        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5776        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5777        let _editor_a1 = workspace_a
5778            .update(cx_a, |workspace, cx| {
5779                workspace.open_path((worktree_id, "1.txt"), true, cx)
5780            })
5781            .await
5782            .unwrap()
5783            .downcast::<Editor>()
5784            .unwrap();
5785
5786        // Client B opens an editor.
5787        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5788        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5789        let _editor_b1 = workspace_b
5790            .update(cx_b, |workspace, cx| {
5791                workspace.open_path((worktree_id, "2.txt"), true, cx)
5792            })
5793            .await
5794            .unwrap()
5795            .downcast::<Editor>()
5796            .unwrap();
5797
5798        // Clients A and B follow each other in split panes
5799        workspace_a
5800            .update(cx_a, |workspace, cx| {
5801                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5802                assert_ne!(*workspace.active_pane(), pane_a1);
5803                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5804                workspace
5805                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5806                    .unwrap()
5807            })
5808            .await
5809            .unwrap();
5810        workspace_b
5811            .update(cx_b, |workspace, cx| {
5812                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5813                assert_ne!(*workspace.active_pane(), pane_b1);
5814                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5815                workspace
5816                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5817                    .unwrap()
5818            })
5819            .await
5820            .unwrap();
5821
5822        workspace_a
5823            .update(cx_a, |workspace, cx| {
5824                workspace.activate_next_pane(cx);
5825                assert_eq!(*workspace.active_pane(), pane_a1);
5826                workspace.open_path((worktree_id, "3.txt"), true, cx)
5827            })
5828            .await
5829            .unwrap();
5830        workspace_b
5831            .update(cx_b, |workspace, cx| {
5832                workspace.activate_next_pane(cx);
5833                assert_eq!(*workspace.active_pane(), pane_b1);
5834                workspace.open_path((worktree_id, "4.txt"), true, cx)
5835            })
5836            .await
5837            .unwrap();
5838        cx_a.foreground().run_until_parked();
5839
5840        // Ensure leader updates don't change the active pane of followers
5841        workspace_a.read_with(cx_a, |workspace, _| {
5842            assert_eq!(*workspace.active_pane(), pane_a1);
5843        });
5844        workspace_b.read_with(cx_b, |workspace, _| {
5845            assert_eq!(*workspace.active_pane(), pane_b1);
5846        });
5847
5848        // Ensure peers following each other doesn't cause an infinite loop.
5849        assert_eq!(
5850            workspace_a.read_with(cx_a, |workspace, cx| workspace
5851                .active_item(cx)
5852                .unwrap()
5853                .project_path(cx)),
5854            Some((worktree_id, "3.txt").into())
5855        );
5856        workspace_a.update(cx_a, |workspace, cx| {
5857            assert_eq!(
5858                workspace.active_item(cx).unwrap().project_path(cx),
5859                Some((worktree_id, "3.txt").into())
5860            );
5861            workspace.activate_next_pane(cx);
5862            assert_eq!(
5863                workspace.active_item(cx).unwrap().project_path(cx),
5864                Some((worktree_id, "4.txt").into())
5865            );
5866        });
5867        workspace_b.update(cx_b, |workspace, cx| {
5868            assert_eq!(
5869                workspace.active_item(cx).unwrap().project_path(cx),
5870                Some((worktree_id, "4.txt").into())
5871            );
5872            workspace.activate_next_pane(cx);
5873            assert_eq!(
5874                workspace.active_item(cx).unwrap().project_path(cx),
5875                Some((worktree_id, "3.txt").into())
5876            );
5877        });
5878    }
5879
5880    #[gpui::test(iterations = 10)]
5881    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5882        cx_a.foreground().forbid_parking();
5883        let fs = FakeFs::new(cx_a.background());
5884
5885        // 2 clients connect to a server.
5886        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5887        let mut client_a = server.create_client(cx_a, "user_a").await;
5888        let mut client_b = server.create_client(cx_b, "user_b").await;
5889        server
5890            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5891            .await;
5892        cx_a.update(editor::init);
5893        cx_b.update(editor::init);
5894
5895        // Client A shares a project.
5896        fs.insert_tree(
5897            "/a",
5898            json!({
5899                "1.txt": "one",
5900                "2.txt": "two",
5901                "3.txt": "three",
5902            }),
5903        )
5904        .await;
5905        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5906
5907        // Client B joins the project.
5908        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5909
5910        // Client A opens some editors.
5911        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5912        let _editor_a1 = workspace_a
5913            .update(cx_a, |workspace, cx| {
5914                workspace.open_path((worktree_id, "1.txt"), true, cx)
5915            })
5916            .await
5917            .unwrap()
5918            .downcast::<Editor>()
5919            .unwrap();
5920
5921        // Client B starts following client A.
5922        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5923        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5924        let leader_id = project_b.read_with(cx_b, |project, _| {
5925            project.collaborators().values().next().unwrap().peer_id
5926        });
5927        workspace_b
5928            .update(cx_b, |workspace, cx| {
5929                workspace
5930                    .toggle_follow(&ToggleFollow(leader_id), cx)
5931                    .unwrap()
5932            })
5933            .await
5934            .unwrap();
5935        assert_eq!(
5936            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5937            Some(leader_id)
5938        );
5939        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5940            workspace
5941                .active_item(cx)
5942                .unwrap()
5943                .downcast::<Editor>()
5944                .unwrap()
5945        });
5946
5947        // When client B moves, it automatically stops following client A.
5948        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5949        assert_eq!(
5950            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5951            None
5952        );
5953
5954        workspace_b
5955            .update(cx_b, |workspace, cx| {
5956                workspace
5957                    .toggle_follow(&ToggleFollow(leader_id), cx)
5958                    .unwrap()
5959            })
5960            .await
5961            .unwrap();
5962        assert_eq!(
5963            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5964            Some(leader_id)
5965        );
5966
5967        // When client B edits, it automatically stops following client A.
5968        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5969        assert_eq!(
5970            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5971            None
5972        );
5973
5974        workspace_b
5975            .update(cx_b, |workspace, cx| {
5976                workspace
5977                    .toggle_follow(&ToggleFollow(leader_id), cx)
5978                    .unwrap()
5979            })
5980            .await
5981            .unwrap();
5982        assert_eq!(
5983            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5984            Some(leader_id)
5985        );
5986
5987        // When client B scrolls, it automatically stops following client A.
5988        editor_b2.update(cx_b, |editor, cx| {
5989            editor.set_scroll_position(vec2f(0., 3.), cx)
5990        });
5991        assert_eq!(
5992            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5993            None
5994        );
5995
5996        workspace_b
5997            .update(cx_b, |workspace, cx| {
5998                workspace
5999                    .toggle_follow(&ToggleFollow(leader_id), cx)
6000                    .unwrap()
6001            })
6002            .await
6003            .unwrap();
6004        assert_eq!(
6005            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6006            Some(leader_id)
6007        );
6008
6009        // When client B activates a different pane, it continues following client A in the original pane.
6010        workspace_b.update(cx_b, |workspace, cx| {
6011            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
6012        });
6013        assert_eq!(
6014            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6015            Some(leader_id)
6016        );
6017
6018        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
6019        assert_eq!(
6020            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6021            Some(leader_id)
6022        );
6023
6024        // When client B activates a different item in the original pane, it automatically stops following client A.
6025        workspace_b
6026            .update(cx_b, |workspace, cx| {
6027                workspace.open_path((worktree_id, "2.txt"), true, cx)
6028            })
6029            .await
6030            .unwrap();
6031        assert_eq!(
6032            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
6033            None
6034        );
6035    }
6036
6037    #[gpui::test(iterations = 100)]
6038    async fn test_random_collaboration(
6039        cx: &mut TestAppContext,
6040        deterministic: Arc<Deterministic>,
6041        rng: StdRng,
6042    ) {
6043        cx.foreground().forbid_parking();
6044        let max_peers = env::var("MAX_PEERS")
6045            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
6046            .unwrap_or(5);
6047        assert!(max_peers <= 5);
6048
6049        let max_operations = env::var("OPERATIONS")
6050            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
6051            .unwrap_or(10);
6052
6053        let rng = Arc::new(Mutex::new(rng));
6054
6055        let guest_lang_registry = Arc::new(LanguageRegistry::test());
6056        let host_language_registry = Arc::new(LanguageRegistry::test());
6057
6058        let fs = FakeFs::new(cx.background());
6059        fs.insert_tree("/_collab", json!({"init": ""})).await;
6060
6061        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
6062        let db = server.app_state.db.clone();
6063        let host_user_id = db.create_user("host", false).await.unwrap();
6064        for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
6065            let guest_user_id = db.create_user(username, false).await.unwrap();
6066            server
6067                .app_state
6068                .db
6069                .send_contact_request(guest_user_id, host_user_id)
6070                .await
6071                .unwrap();
6072            server
6073                .app_state
6074                .db
6075                .respond_to_contact_request(host_user_id, guest_user_id, true)
6076                .await
6077                .unwrap();
6078        }
6079
6080        let mut clients = Vec::new();
6081        let mut user_ids = Vec::new();
6082        let mut op_start_signals = Vec::new();
6083
6084        let mut next_entity_id = 100000;
6085        let mut host_cx = TestAppContext::new(
6086            cx.foreground_platform(),
6087            cx.platform(),
6088            deterministic.build_foreground(next_entity_id),
6089            deterministic.build_background(),
6090            cx.font_cache(),
6091            cx.leak_detector(),
6092            next_entity_id,
6093        );
6094        let host = server.create_client(&mut host_cx, "host").await;
6095        let host_project = host_cx.update(|cx| {
6096            Project::local(
6097                host.client.clone(),
6098                host.user_store.clone(),
6099                host_language_registry.clone(),
6100                fs.clone(),
6101                cx,
6102            )
6103        });
6104        let host_project_id = host_project
6105            .update(&mut host_cx, |p, _| p.next_remote_id())
6106            .await;
6107
6108        let (collab_worktree, _) = host_project
6109            .update(&mut host_cx, |project, cx| {
6110                project.find_or_create_local_worktree("/_collab", true, cx)
6111            })
6112            .await
6113            .unwrap();
6114        collab_worktree
6115            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
6116            .await;
6117
6118        // Set up fake language servers.
6119        let mut language = Language::new(
6120            LanguageConfig {
6121                name: "Rust".into(),
6122                path_suffixes: vec!["rs".to_string()],
6123                ..Default::default()
6124            },
6125            None,
6126        );
6127        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6128            name: "the-fake-language-server",
6129            capabilities: lsp::LanguageServer::full_capabilities(),
6130            initializer: Some(Box::new({
6131                let rng = rng.clone();
6132                let fs = fs.clone();
6133                let project = host_project.downgrade();
6134                move |fake_server: &mut FakeLanguageServer| {
6135                    fake_server.handle_request::<lsp::request::Completion, _, _>(
6136                        |_, _| async move {
6137                            Ok(Some(lsp::CompletionResponse::Array(vec![
6138                                lsp::CompletionItem {
6139                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6140                                        range: lsp::Range::new(
6141                                            lsp::Position::new(0, 0),
6142                                            lsp::Position::new(0, 0),
6143                                        ),
6144                                        new_text: "the-new-text".to_string(),
6145                                    })),
6146                                    ..Default::default()
6147                                },
6148                            ])))
6149                        },
6150                    );
6151
6152                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6153                        |_, _| async move {
6154                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6155                                lsp::CodeAction {
6156                                    title: "the-code-action".to_string(),
6157                                    ..Default::default()
6158                                },
6159                            )]))
6160                        },
6161                    );
6162
6163                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6164                        |params, _| async move {
6165                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6166                                params.position,
6167                                params.position,
6168                            ))))
6169                        },
6170                    );
6171
6172                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6173                        let fs = fs.clone();
6174                        let rng = rng.clone();
6175                        move |_, _| {
6176                            let fs = fs.clone();
6177                            let rng = rng.clone();
6178                            async move {
6179                                let files = fs.files().await;
6180                                let mut rng = rng.lock();
6181                                let count = rng.gen_range::<usize, _>(1..3);
6182                                let files = (0..count)
6183                                    .map(|_| files.choose(&mut *rng).unwrap())
6184                                    .collect::<Vec<_>>();
6185                                log::info!("LSP: Returning definitions in files {:?}", &files);
6186                                Ok(Some(lsp::GotoDefinitionResponse::Array(
6187                                    files
6188                                        .into_iter()
6189                                        .map(|file| lsp::Location {
6190                                            uri: lsp::Url::from_file_path(file).unwrap(),
6191                                            range: Default::default(),
6192                                        })
6193                                        .collect(),
6194                                )))
6195                            }
6196                        }
6197                    });
6198
6199                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6200                        let rng = rng.clone();
6201                        let project = project.clone();
6202                        move |params, mut cx| {
6203                            let highlights = if let Some(project) = project.upgrade(&cx) {
6204                                project.update(&mut cx, |project, cx| {
6205                                    let path = params
6206                                        .text_document_position_params
6207                                        .text_document
6208                                        .uri
6209                                        .to_file_path()
6210                                        .unwrap();
6211                                    let (worktree, relative_path) =
6212                                        project.find_local_worktree(&path, cx)?;
6213                                    let project_path =
6214                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
6215                                    let buffer =
6216                                        project.get_open_buffer(&project_path, cx)?.read(cx);
6217
6218                                    let mut highlights = Vec::new();
6219                                    let highlight_count = rng.lock().gen_range(1..=5);
6220                                    let mut prev_end = 0;
6221                                    for _ in 0..highlight_count {
6222                                        let range =
6223                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
6224
6225                                        highlights.push(lsp::DocumentHighlight {
6226                                            range: range_to_lsp(range.to_point_utf16(buffer)),
6227                                            kind: Some(lsp::DocumentHighlightKind::READ),
6228                                        });
6229                                        prev_end = range.end;
6230                                    }
6231                                    Some(highlights)
6232                                })
6233                            } else {
6234                                None
6235                            };
6236                            async move { Ok(highlights) }
6237                        }
6238                    });
6239                }
6240            })),
6241            ..Default::default()
6242        });
6243        host_language_registry.add(Arc::new(language));
6244
6245        let op_start_signal = futures::channel::mpsc::unbounded();
6246        user_ids.push(host.current_user_id(&host_cx));
6247        op_start_signals.push(op_start_signal.0);
6248        clients.push(host_cx.foreground().spawn(host.simulate_host(
6249            host_project,
6250            op_start_signal.1,
6251            rng.clone(),
6252            host_cx,
6253        )));
6254
6255        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6256            rng.lock().gen_range(0..max_operations)
6257        } else {
6258            max_operations
6259        };
6260        let mut available_guests = vec![
6261            "guest-1".to_string(),
6262            "guest-2".to_string(),
6263            "guest-3".to_string(),
6264            "guest-4".to_string(),
6265        ];
6266        let mut operations = 0;
6267        while operations < max_operations {
6268            if operations == disconnect_host_at {
6269                server.disconnect_client(user_ids[0]);
6270                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6271                drop(op_start_signals);
6272                let mut clients = futures::future::join_all(clients).await;
6273                cx.foreground().run_until_parked();
6274
6275                let (host, mut host_cx, host_err) = clients.remove(0);
6276                if let Some(host_err) = host_err {
6277                    log::error!("host error - {:?}", host_err);
6278                }
6279                host.project
6280                    .as_ref()
6281                    .unwrap()
6282                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6283                for (guest, mut guest_cx, guest_err) in clients {
6284                    if let Some(guest_err) = guest_err {
6285                        log::error!("{} error - {:?}", guest.username, guest_err);
6286                    }
6287
6288                    let contacts = server
6289                        .app_state
6290                        .db
6291                        .get_contacts(guest.current_user_id(&guest_cx))
6292                        .await
6293                        .unwrap();
6294                    let contacts = server
6295                        .store
6296                        .read()
6297                        .await
6298                        .build_initial_contacts_update(contacts)
6299                        .contacts;
6300                    assert!(!contacts
6301                        .iter()
6302                        .flat_map(|contact| &contact.projects)
6303                        .any(|project| project.id == host_project_id));
6304                    guest
6305                        .project
6306                        .as_ref()
6307                        .unwrap()
6308                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6309                    guest_cx.update(|_| drop(guest));
6310                }
6311                host_cx.update(|_| drop(host));
6312
6313                return;
6314            }
6315
6316            let distribution = rng.lock().gen_range(0..100);
6317            match distribution {
6318                0..=19 if !available_guests.is_empty() => {
6319                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
6320                    let guest_username = available_guests.remove(guest_ix);
6321                    log::info!("Adding new connection for {}", guest_username);
6322                    next_entity_id += 100000;
6323                    let mut guest_cx = TestAppContext::new(
6324                        cx.foreground_platform(),
6325                        cx.platform(),
6326                        deterministic.build_foreground(next_entity_id),
6327                        deterministic.build_background(),
6328                        cx.font_cache(),
6329                        cx.leak_detector(),
6330                        next_entity_id,
6331                    );
6332                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
6333                    let guest_project = Project::remote(
6334                        host_project_id,
6335                        guest.client.clone(),
6336                        guest.user_store.clone(),
6337                        guest_lang_registry.clone(),
6338                        FakeFs::new(cx.background()),
6339                        &mut guest_cx.to_async(),
6340                    )
6341                    .await
6342                    .unwrap();
6343                    let op_start_signal = futures::channel::mpsc::unbounded();
6344                    user_ids.push(guest.current_user_id(&guest_cx));
6345                    op_start_signals.push(op_start_signal.0);
6346                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6347                        guest_username.clone(),
6348                        guest_project,
6349                        op_start_signal.1,
6350                        rng.clone(),
6351                        guest_cx,
6352                    )));
6353
6354                    log::info!("Added connection for {}", guest_username);
6355                    operations += 1;
6356                }
6357                20..=29 if clients.len() > 1 => {
6358                    let guest_ix = rng.lock().gen_range(1..clients.len());
6359                    log::info!("Removing guest {}", user_ids[guest_ix]);
6360                    let removed_guest_id = user_ids.remove(guest_ix);
6361                    let guest = clients.remove(guest_ix);
6362                    op_start_signals.remove(guest_ix);
6363                    server.forbid_connections();
6364                    server.disconnect_client(removed_guest_id);
6365                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6366                    let (guest, mut guest_cx, guest_err) = guest.await;
6367                    server.allow_connections();
6368
6369                    if let Some(guest_err) = guest_err {
6370                        log::error!("{} error - {:?}", guest.username, guest_err);
6371                    }
6372                    guest
6373                        .project
6374                        .as_ref()
6375                        .unwrap()
6376                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6377                    for user_id in &user_ids {
6378                        let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap();
6379                        let contacts = server
6380                            .store
6381                            .read()
6382                            .await
6383                            .build_initial_contacts_update(contacts)
6384                            .contacts;
6385                        for contact in contacts {
6386                            if contact.online {
6387                                assert_ne!(
6388                                    contact.user_id, removed_guest_id.0 as u64,
6389                                    "removed guest is still a contact of another peer"
6390                                );
6391                            }
6392                            for project in contact.projects {
6393                                for project_guest_id in project.guests {
6394                                    assert_ne!(
6395                                        project_guest_id, removed_guest_id.0 as u64,
6396                                        "removed guest appears as still participating on a project"
6397                                    );
6398                                }
6399                            }
6400                        }
6401                    }
6402
6403                    log::info!("{} removed", guest.username);
6404                    available_guests.push(guest.username.clone());
6405                    guest_cx.update(|_| drop(guest));
6406
6407                    operations += 1;
6408                }
6409                _ => {
6410                    while operations < max_operations && rng.lock().gen_bool(0.7) {
6411                        op_start_signals
6412                            .choose(&mut *rng.lock())
6413                            .unwrap()
6414                            .unbounded_send(())
6415                            .unwrap();
6416                        operations += 1;
6417                    }
6418
6419                    if rng.lock().gen_bool(0.8) {
6420                        cx.foreground().run_until_parked();
6421                    }
6422                }
6423            }
6424        }
6425
6426        drop(op_start_signals);
6427        let mut clients = futures::future::join_all(clients).await;
6428        cx.foreground().run_until_parked();
6429
6430        let (host_client, mut host_cx, host_err) = clients.remove(0);
6431        if let Some(host_err) = host_err {
6432            panic!("host error - {:?}", host_err);
6433        }
6434        let host_project = host_client.project.as_ref().unwrap();
6435        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6436            project
6437                .worktrees(cx)
6438                .map(|worktree| {
6439                    let snapshot = worktree.read(cx).snapshot();
6440                    (snapshot.id(), snapshot)
6441                })
6442                .collect::<BTreeMap<_, _>>()
6443        });
6444
6445        host_client
6446            .project
6447            .as_ref()
6448            .unwrap()
6449            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6450
6451        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6452            if let Some(guest_err) = guest_err {
6453                panic!("{} error - {:?}", guest_client.username, guest_err);
6454            }
6455            let worktree_snapshots =
6456                guest_client
6457                    .project
6458                    .as_ref()
6459                    .unwrap()
6460                    .read_with(&guest_cx, |project, cx| {
6461                        project
6462                            .worktrees(cx)
6463                            .map(|worktree| {
6464                                let worktree = worktree.read(cx);
6465                                (worktree.id(), worktree.snapshot())
6466                            })
6467                            .collect::<BTreeMap<_, _>>()
6468                    });
6469
6470            assert_eq!(
6471                worktree_snapshots.keys().collect::<Vec<_>>(),
6472                host_worktree_snapshots.keys().collect::<Vec<_>>(),
6473                "{} has different worktrees than the host",
6474                guest_client.username
6475            );
6476            for (id, host_snapshot) in &host_worktree_snapshots {
6477                let guest_snapshot = &worktree_snapshots[id];
6478                assert_eq!(
6479                    guest_snapshot.root_name(),
6480                    host_snapshot.root_name(),
6481                    "{} has different root name than the host for worktree {}",
6482                    guest_client.username,
6483                    id
6484                );
6485                assert_eq!(
6486                    guest_snapshot.entries(false).collect::<Vec<_>>(),
6487                    host_snapshot.entries(false).collect::<Vec<_>>(),
6488                    "{} has different snapshot than the host for worktree {}",
6489                    guest_client.username,
6490                    id
6491                );
6492                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6493            }
6494
6495            guest_client
6496                .project
6497                .as_ref()
6498                .unwrap()
6499                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6500
6501            for guest_buffer in &guest_client.buffers {
6502                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6503                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6504                    project.buffer_for_id(buffer_id, cx).expect(&format!(
6505                        "host does not have buffer for guest:{}, peer:{}, id:{}",
6506                        guest_client.username, guest_client.peer_id, buffer_id
6507                    ))
6508                });
6509                let path = host_buffer
6510                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6511
6512                assert_eq!(
6513                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6514                    0,
6515                    "{}, buffer {}, path {:?} has deferred operations",
6516                    guest_client.username,
6517                    buffer_id,
6518                    path,
6519                );
6520                assert_eq!(
6521                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6522                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6523                    "{}, buffer {}, path {:?}, differs from the host's buffer",
6524                    guest_client.username,
6525                    buffer_id,
6526                    path
6527                );
6528            }
6529
6530            guest_cx.update(|_| drop(guest_client));
6531        }
6532
6533        host_cx.update(|_| drop(host_client));
6534    }
6535
6536    struct TestServer {
6537        peer: Arc<Peer>,
6538        app_state: Arc<AppState>,
6539        server: Arc<Server>,
6540        foreground: Rc<executor::Foreground>,
6541        notifications: mpsc::UnboundedReceiver<()>,
6542        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6543        forbid_connections: Arc<AtomicBool>,
6544        _test_db: TestDb,
6545    }
6546
6547    impl TestServer {
6548        async fn start(
6549            foreground: Rc<executor::Foreground>,
6550            background: Arc<executor::Background>,
6551        ) -> Self {
6552            let test_db = TestDb::fake(background);
6553            let app_state = Self::build_app_state(&test_db).await;
6554            let peer = Peer::new();
6555            let notifications = mpsc::unbounded();
6556            let server = Server::new(app_state.clone(), Some(notifications.0));
6557            Self {
6558                peer,
6559                app_state,
6560                server,
6561                foreground,
6562                notifications: notifications.1,
6563                connection_killers: Default::default(),
6564                forbid_connections: Default::default(),
6565                _test_db: test_db,
6566            }
6567        }
6568
6569        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6570            cx.update(|cx| {
6571                let settings = Settings::test(cx);
6572                cx.set_global(settings);
6573            });
6574
6575            let http = FakeHttpClient::with_404_response();
6576            let user_id =
6577                if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6578                    user.id
6579                } else {
6580                    self.app_state.db.create_user(name, false).await.unwrap()
6581                };
6582            let client_name = name.to_string();
6583            let mut client = Client::new(http.clone());
6584            let server = self.server.clone();
6585            let db = self.app_state.db.clone();
6586            let connection_killers = self.connection_killers.clone();
6587            let forbid_connections = self.forbid_connections.clone();
6588            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6589
6590            Arc::get_mut(&mut client)
6591                .unwrap()
6592                .override_authenticate(move |cx| {
6593                    cx.spawn(|_| async move {
6594                        let access_token = "the-token".to_string();
6595                        Ok(Credentials {
6596                            user_id: user_id.0 as u64,
6597                            access_token,
6598                        })
6599                    })
6600                })
6601                .override_establish_connection(move |credentials, cx| {
6602                    assert_eq!(credentials.user_id, user_id.0 as u64);
6603                    assert_eq!(credentials.access_token, "the-token");
6604
6605                    let server = server.clone();
6606                    let db = db.clone();
6607                    let connection_killers = connection_killers.clone();
6608                    let forbid_connections = forbid_connections.clone();
6609                    let client_name = client_name.clone();
6610                    let connection_id_tx = connection_id_tx.clone();
6611                    cx.spawn(move |cx| async move {
6612                        if forbid_connections.load(SeqCst) {
6613                            Err(EstablishConnectionError::other(anyhow!(
6614                                "server is forbidding connections"
6615                            )))
6616                        } else {
6617                            let (client_conn, server_conn, killed) =
6618                                Connection::in_memory(cx.background());
6619                            connection_killers.lock().insert(user_id, killed);
6620                            let user = db.get_user_by_id(user_id).await.unwrap().unwrap();
6621                            cx.background()
6622                                .spawn(server.handle_connection(
6623                                    server_conn,
6624                                    client_name,
6625                                    user,
6626                                    Some(connection_id_tx),
6627                                    cx.background(),
6628                                ))
6629                                .detach();
6630                            Ok(client_conn)
6631                        }
6632                    })
6633                });
6634
6635            Channel::init(&client);
6636            Project::init(&client);
6637            cx.update(|cx| {
6638                workspace::init(&client, cx);
6639            });
6640
6641            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6642            client
6643                .authenticate_and_connect(false, &cx.to_async())
6644                .await
6645                .unwrap();
6646            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6647
6648            let client = TestClient {
6649                client,
6650                peer_id,
6651                username: name.to_string(),
6652                user_store,
6653                language_registry: Arc::new(LanguageRegistry::test()),
6654                project: Default::default(),
6655                buffers: Default::default(),
6656            };
6657            client.wait_for_current_user(cx).await;
6658            client
6659        }
6660
6661        fn disconnect_client(&self, user_id: UserId) {
6662            self.connection_killers
6663                .lock()
6664                .remove(&user_id)
6665                .unwrap()
6666                .store(true, SeqCst);
6667        }
6668
6669        fn forbid_connections(&self) {
6670            self.forbid_connections.store(true, SeqCst);
6671        }
6672
6673        fn allow_connections(&self) {
6674            self.forbid_connections.store(false, SeqCst);
6675        }
6676
6677        async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6678            while let Some((client_a, cx_a)) = clients.pop() {
6679                for (client_b, cx_b) in &mut clients {
6680                    client_a
6681                        .user_store
6682                        .update(cx_a, |store, cx| {
6683                            store.request_contact(client_b.user_id().unwrap(), cx)
6684                        })
6685                        .await
6686                        .unwrap();
6687                    cx_a.foreground().run_until_parked();
6688                    client_b
6689                        .user_store
6690                        .update(*cx_b, |store, cx| {
6691                            store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6692                        })
6693                        .await
6694                        .unwrap();
6695                }
6696            }
6697        }
6698
6699        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6700            Arc::new(AppState {
6701                db: test_db.db().clone(),
6702                api_token: Default::default(),
6703                invite_link_prefix: Default::default(),
6704            })
6705        }
6706
6707        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6708            self.server.store.read().await
6709        }
6710
6711        async fn condition<F>(&mut self, mut predicate: F)
6712        where
6713            F: FnMut(&Store) -> bool,
6714        {
6715            assert!(
6716                self.foreground.parking_forbidden(),
6717                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6718            );
6719            while !(predicate)(&*self.server.store.read().await) {
6720                self.foreground.start_waiting();
6721                self.notifications.next().await;
6722                self.foreground.finish_waiting();
6723            }
6724        }
6725    }
6726
6727    impl Deref for TestServer {
6728        type Target = Server;
6729
6730        fn deref(&self) -> &Self::Target {
6731            &self.server
6732        }
6733    }
6734
6735    impl Drop for TestServer {
6736        fn drop(&mut self) {
6737            self.peer.reset();
6738        }
6739    }
6740
6741    struct TestClient {
6742        client: Arc<Client>,
6743        username: String,
6744        pub peer_id: PeerId,
6745        pub user_store: ModelHandle<UserStore>,
6746        language_registry: Arc<LanguageRegistry>,
6747        project: Option<ModelHandle<Project>>,
6748        buffers: HashSet<ModelHandle<language::Buffer>>,
6749    }
6750
6751    impl Deref for TestClient {
6752        type Target = Arc<Client>;
6753
6754        fn deref(&self) -> &Self::Target {
6755            &self.client
6756        }
6757    }
6758
6759    struct ContactsSummary {
6760        pub current: Vec<String>,
6761        pub outgoing_requests: Vec<String>,
6762        pub incoming_requests: Vec<String>,
6763    }
6764
6765    impl TestClient {
6766        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6767            UserId::from_proto(
6768                self.user_store
6769                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6770            )
6771        }
6772
6773        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6774            let mut authed_user = self
6775                .user_store
6776                .read_with(cx, |user_store, _| user_store.watch_current_user());
6777            while authed_user.next().await.unwrap().is_none() {}
6778        }
6779
6780        async fn clear_contacts(&self, cx: &mut TestAppContext) {
6781            self.user_store
6782                .update(cx, |store, _| store.clear_contacts())
6783                .await;
6784        }
6785
6786        fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6787            self.user_store.read_with(cx, |store, _| ContactsSummary {
6788                current: store
6789                    .contacts()
6790                    .iter()
6791                    .map(|contact| contact.user.github_login.clone())
6792                    .collect(),
6793                outgoing_requests: store
6794                    .outgoing_contact_requests()
6795                    .iter()
6796                    .map(|user| user.github_login.clone())
6797                    .collect(),
6798                incoming_requests: store
6799                    .incoming_contact_requests()
6800                    .iter()
6801                    .map(|user| user.github_login.clone())
6802                    .collect(),
6803            })
6804        }
6805
6806        async fn build_local_project(
6807            &mut self,
6808            fs: Arc<FakeFs>,
6809            root_path: impl AsRef<Path>,
6810            cx: &mut TestAppContext,
6811        ) -> (ModelHandle<Project>, WorktreeId) {
6812            let project = cx.update(|cx| {
6813                Project::local(
6814                    self.client.clone(),
6815                    self.user_store.clone(),
6816                    self.language_registry.clone(),
6817                    fs,
6818                    cx,
6819                )
6820            });
6821            self.project = Some(project.clone());
6822            let (worktree, _) = project
6823                .update(cx, |p, cx| {
6824                    p.find_or_create_local_worktree(root_path, true, cx)
6825                })
6826                .await
6827                .unwrap();
6828            worktree
6829                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6830                .await;
6831            project
6832                .update(cx, |project, _| project.next_remote_id())
6833                .await;
6834            (project, worktree.read_with(cx, |tree, _| tree.id()))
6835        }
6836
6837        async fn build_remote_project(
6838            &mut self,
6839            host_project: &ModelHandle<Project>,
6840            host_cx: &mut TestAppContext,
6841            guest_cx: &mut TestAppContext,
6842        ) -> ModelHandle<Project> {
6843            let host_project_id = host_project
6844                .read_with(host_cx, |project, _| project.next_remote_id())
6845                .await;
6846            let guest_user_id = self.user_id().unwrap();
6847            let languages =
6848                host_project.read_with(host_cx, |project, _| project.languages().clone());
6849            let project_b = guest_cx.spawn(|mut cx| {
6850                let user_store = self.user_store.clone();
6851                let guest_client = self.client.clone();
6852                async move {
6853                    Project::remote(
6854                        host_project_id,
6855                        guest_client,
6856                        user_store.clone(),
6857                        languages,
6858                        FakeFs::new(cx.background()),
6859                        &mut cx,
6860                    )
6861                    .await
6862                    .unwrap()
6863                }
6864            });
6865            host_cx.foreground().run_until_parked();
6866            host_project.update(host_cx, |project, cx| {
6867                project.respond_to_join_request(guest_user_id, true, cx)
6868            });
6869            let project = project_b.await;
6870            self.project = Some(project.clone());
6871            project
6872        }
6873
6874        fn build_workspace(
6875            &self,
6876            project: &ModelHandle<Project>,
6877            cx: &mut TestAppContext,
6878        ) -> ViewHandle<Workspace> {
6879            let (window_id, _) = cx.add_window(|_| EmptyView);
6880            cx.add_view(window_id, |cx| {
6881                let fs = project.read(cx).fs().clone();
6882                Workspace::new(
6883                    &WorkspaceParams {
6884                        fs,
6885                        project: project.clone(),
6886                        user_store: self.user_store.clone(),
6887                        languages: self.language_registry.clone(),
6888                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6889                        channel_list: cx.add_model(|cx| {
6890                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6891                        }),
6892                        client: self.client.clone(),
6893                    },
6894                    cx,
6895                )
6896            })
6897        }
6898
6899        async fn simulate_host(
6900            mut self,
6901            project: ModelHandle<Project>,
6902            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6903            rng: Arc<Mutex<StdRng>>,
6904            mut cx: TestAppContext,
6905        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6906            async fn simulate_host_internal(
6907                client: &mut TestClient,
6908                project: ModelHandle<Project>,
6909                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6910                rng: Arc<Mutex<StdRng>>,
6911                cx: &mut TestAppContext,
6912            ) -> anyhow::Result<()> {
6913                let fs = project.read_with(cx, |project, _| project.fs().clone());
6914
6915                cx.update(|cx| {
6916                    cx.subscribe(&project, move |project, event, cx| {
6917                        if let project::Event::ContactRequestedJoin(user) = event {
6918                            log::info!("Host: accepting join request from {}", user.github_login);
6919                            project.update(cx, |project, cx| {
6920                                project.respond_to_join_request(user.id, true, cx)
6921                            });
6922                        }
6923                    })
6924                    .detach();
6925                });
6926
6927                while op_start_signal.next().await.is_some() {
6928                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6929                    let files = fs.as_fake().files().await;
6930                    match distribution {
6931                        0..=19 if !files.is_empty() => {
6932                            let path = files.choose(&mut *rng.lock()).unwrap();
6933                            let mut path = path.as_path();
6934                            while let Some(parent_path) = path.parent() {
6935                                path = parent_path;
6936                                if rng.lock().gen() {
6937                                    break;
6938                                }
6939                            }
6940
6941                            log::info!("Host: find/create local worktree {:?}", path);
6942                            let find_or_create_worktree = project.update(cx, |project, cx| {
6943                                project.find_or_create_local_worktree(path, true, cx)
6944                            });
6945                            if rng.lock().gen() {
6946                                cx.background().spawn(find_or_create_worktree).detach();
6947                            } else {
6948                                find_or_create_worktree.await?;
6949                            }
6950                        }
6951                        20..=79 if !files.is_empty() => {
6952                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6953                                let file = files.choose(&mut *rng.lock()).unwrap();
6954                                let (worktree, path) = project
6955                                    .update(cx, |project, cx| {
6956                                        project.find_or_create_local_worktree(
6957                                            file.clone(),
6958                                            true,
6959                                            cx,
6960                                        )
6961                                    })
6962                                    .await?;
6963                                let project_path =
6964                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6965                                log::info!(
6966                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6967                                    file,
6968                                    project_path.0,
6969                                    project_path.1
6970                                );
6971                                let buffer = project
6972                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6973                                    .await
6974                                    .unwrap();
6975                                client.buffers.insert(buffer.clone());
6976                                buffer
6977                            } else {
6978                                client
6979                                    .buffers
6980                                    .iter()
6981                                    .choose(&mut *rng.lock())
6982                                    .unwrap()
6983                                    .clone()
6984                            };
6985
6986                            if rng.lock().gen_bool(0.1) {
6987                                cx.update(|cx| {
6988                                    log::info!(
6989                                        "Host: dropping buffer {:?}",
6990                                        buffer.read(cx).file().unwrap().full_path(cx)
6991                                    );
6992                                    client.buffers.remove(&buffer);
6993                                    drop(buffer);
6994                                });
6995                            } else {
6996                                buffer.update(cx, |buffer, cx| {
6997                                    log::info!(
6998                                        "Host: updating buffer {:?} ({})",
6999                                        buffer.file().unwrap().full_path(cx),
7000                                        buffer.remote_id()
7001                                    );
7002
7003                                    if rng.lock().gen_bool(0.7) {
7004                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7005                                    } else {
7006                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7007                                    }
7008                                });
7009                            }
7010                        }
7011                        _ => loop {
7012                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
7013                            let mut path = PathBuf::new();
7014                            path.push("/");
7015                            for _ in 0..path_component_count {
7016                                let letter = rng.lock().gen_range(b'a'..=b'z');
7017                                path.push(std::str::from_utf8(&[letter]).unwrap());
7018                            }
7019                            path.set_extension("rs");
7020                            let parent_path = path.parent().unwrap();
7021
7022                            log::info!("Host: creating file {:?}", path,);
7023
7024                            if fs.create_dir(&parent_path).await.is_ok()
7025                                && fs.create_file(&path, Default::default()).await.is_ok()
7026                            {
7027                                break;
7028                            } else {
7029                                log::info!("Host: cannot create file");
7030                            }
7031                        },
7032                    }
7033
7034                    cx.background().simulate_random_delay().await;
7035                }
7036
7037                Ok(())
7038            }
7039
7040            let result =
7041                simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
7042                    .await;
7043            log::info!("Host done");
7044            self.project = Some(project);
7045            (self, cx, result.err())
7046        }
7047
7048        pub async fn simulate_guest(
7049            mut self,
7050            guest_username: String,
7051            project: ModelHandle<Project>,
7052            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
7053            rng: Arc<Mutex<StdRng>>,
7054            mut cx: TestAppContext,
7055        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
7056            async fn simulate_guest_internal(
7057                client: &mut TestClient,
7058                guest_username: &str,
7059                project: ModelHandle<Project>,
7060                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
7061                rng: Arc<Mutex<StdRng>>,
7062                cx: &mut TestAppContext,
7063            ) -> anyhow::Result<()> {
7064                while op_start_signal.next().await.is_some() {
7065                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
7066                        let worktree = if let Some(worktree) =
7067                            project.read_with(cx, |project, cx| {
7068                                project
7069                                    .worktrees(&cx)
7070                                    .filter(|worktree| {
7071                                        let worktree = worktree.read(cx);
7072                                        worktree.is_visible()
7073                                            && worktree.entries(false).any(|e| e.is_file())
7074                                    })
7075                                    .choose(&mut *rng.lock())
7076                            }) {
7077                            worktree
7078                        } else {
7079                            cx.background().simulate_random_delay().await;
7080                            continue;
7081                        };
7082
7083                        let (worktree_root_name, project_path) =
7084                            worktree.read_with(cx, |worktree, _| {
7085                                let entry = worktree
7086                                    .entries(false)
7087                                    .filter(|e| e.is_file())
7088                                    .choose(&mut *rng.lock())
7089                                    .unwrap();
7090                                (
7091                                    worktree.root_name().to_string(),
7092                                    (worktree.id(), entry.path.clone()),
7093                                )
7094                            });
7095                        log::info!(
7096                            "{}: opening path {:?} in worktree {} ({})",
7097                            guest_username,
7098                            project_path.1,
7099                            project_path.0,
7100                            worktree_root_name,
7101                        );
7102                        let buffer = project
7103                            .update(cx, |project, cx| {
7104                                project.open_buffer(project_path.clone(), cx)
7105                            })
7106                            .await?;
7107                        log::info!(
7108                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
7109                            guest_username,
7110                            project_path.1,
7111                            project_path.0,
7112                            worktree_root_name,
7113                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
7114                        );
7115                        client.buffers.insert(buffer.clone());
7116                        buffer
7117                    } else {
7118                        client
7119                            .buffers
7120                            .iter()
7121                            .choose(&mut *rng.lock())
7122                            .unwrap()
7123                            .clone()
7124                    };
7125
7126                    let choice = rng.lock().gen_range(0..100);
7127                    match choice {
7128                        0..=9 => {
7129                            cx.update(|cx| {
7130                                log::info!(
7131                                    "{}: dropping buffer {:?}",
7132                                    guest_username,
7133                                    buffer.read(cx).file().unwrap().full_path(cx)
7134                                );
7135                                client.buffers.remove(&buffer);
7136                                drop(buffer);
7137                            });
7138                        }
7139                        10..=19 => {
7140                            let completions = project.update(cx, |project, cx| {
7141                                log::info!(
7142                                    "{}: requesting completions for buffer {} ({:?})",
7143                                    guest_username,
7144                                    buffer.read(cx).remote_id(),
7145                                    buffer.read(cx).file().unwrap().full_path(cx)
7146                                );
7147                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7148                                project.completions(&buffer, offset, cx)
7149                            });
7150                            let completions = cx.background().spawn(async move {
7151                                completions
7152                                    .await
7153                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
7154                            });
7155                            if rng.lock().gen_bool(0.3) {
7156                                log::info!("{}: detaching completions request", guest_username);
7157                                cx.update(|cx| completions.detach_and_log_err(cx));
7158                            } else {
7159                                completions.await?;
7160                            }
7161                        }
7162                        20..=29 => {
7163                            let code_actions = project.update(cx, |project, cx| {
7164                                log::info!(
7165                                    "{}: requesting code actions for buffer {} ({:?})",
7166                                    guest_username,
7167                                    buffer.read(cx).remote_id(),
7168                                    buffer.read(cx).file().unwrap().full_path(cx)
7169                                );
7170                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7171                                project.code_actions(&buffer, range, cx)
7172                            });
7173                            let code_actions = cx.background().spawn(async move {
7174                                code_actions.await.map_err(|err| {
7175                                    anyhow!("code actions request failed: {:?}", err)
7176                                })
7177                            });
7178                            if rng.lock().gen_bool(0.3) {
7179                                log::info!("{}: detaching code actions request", guest_username);
7180                                cx.update(|cx| code_actions.detach_and_log_err(cx));
7181                            } else {
7182                                code_actions.await?;
7183                            }
7184                        }
7185                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7186                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7187                                log::info!(
7188                                    "{}: saving buffer {} ({:?})",
7189                                    guest_username,
7190                                    buffer.remote_id(),
7191                                    buffer.file().unwrap().full_path(cx)
7192                                );
7193                                (buffer.version(), buffer.save(cx))
7194                            });
7195                            let save = cx.background().spawn(async move {
7196                                let (saved_version, _) = save
7197                                    .await
7198                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7199                                assert!(saved_version.observed_all(&requested_version));
7200                                Ok::<_, anyhow::Error>(())
7201                            });
7202                            if rng.lock().gen_bool(0.3) {
7203                                log::info!("{}: detaching save request", guest_username);
7204                                cx.update(|cx| save.detach_and_log_err(cx));
7205                            } else {
7206                                save.await?;
7207                            }
7208                        }
7209                        40..=44 => {
7210                            let prepare_rename = project.update(cx, |project, cx| {
7211                                log::info!(
7212                                    "{}: preparing rename for buffer {} ({:?})",
7213                                    guest_username,
7214                                    buffer.read(cx).remote_id(),
7215                                    buffer.read(cx).file().unwrap().full_path(cx)
7216                                );
7217                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7218                                project.prepare_rename(buffer, offset, cx)
7219                            });
7220                            let prepare_rename = cx.background().spawn(async move {
7221                                prepare_rename.await.map_err(|err| {
7222                                    anyhow!("prepare rename request failed: {:?}", err)
7223                                })
7224                            });
7225                            if rng.lock().gen_bool(0.3) {
7226                                log::info!("{}: detaching prepare rename request", guest_username);
7227                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7228                            } else {
7229                                prepare_rename.await?;
7230                            }
7231                        }
7232                        45..=49 => {
7233                            let definitions = project.update(cx, |project, cx| {
7234                                log::info!(
7235                                    "{}: requesting definitions for buffer {} ({:?})",
7236                                    guest_username,
7237                                    buffer.read(cx).remote_id(),
7238                                    buffer.read(cx).file().unwrap().full_path(cx)
7239                                );
7240                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7241                                project.definition(&buffer, offset, cx)
7242                            });
7243                            let definitions = cx.background().spawn(async move {
7244                                definitions
7245                                    .await
7246                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7247                            });
7248                            if rng.lock().gen_bool(0.3) {
7249                                log::info!("{}: detaching definitions request", guest_username);
7250                                cx.update(|cx| definitions.detach_and_log_err(cx));
7251                            } else {
7252                                client
7253                                    .buffers
7254                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7255                            }
7256                        }
7257                        50..=54 => {
7258                            let highlights = project.update(cx, |project, cx| {
7259                                log::info!(
7260                                    "{}: requesting highlights for buffer {} ({:?})",
7261                                    guest_username,
7262                                    buffer.read(cx).remote_id(),
7263                                    buffer.read(cx).file().unwrap().full_path(cx)
7264                                );
7265                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7266                                project.document_highlights(&buffer, offset, cx)
7267                            });
7268                            let highlights = cx.background().spawn(async move {
7269                                highlights
7270                                    .await
7271                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7272                            });
7273                            if rng.lock().gen_bool(0.3) {
7274                                log::info!("{}: detaching highlights request", guest_username);
7275                                cx.update(|cx| highlights.detach_and_log_err(cx));
7276                            } else {
7277                                highlights.await?;
7278                            }
7279                        }
7280                        55..=59 => {
7281                            let search = project.update(cx, |project, cx| {
7282                                let query = rng.lock().gen_range('a'..='z');
7283                                log::info!("{}: project-wide search {:?}", guest_username, query);
7284                                project.search(SearchQuery::text(query, false, false), cx)
7285                            });
7286                            let search = cx.background().spawn(async move {
7287                                search
7288                                    .await
7289                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
7290                            });
7291                            if rng.lock().gen_bool(0.3) {
7292                                log::info!("{}: detaching search request", guest_username);
7293                                cx.update(|cx| search.detach_and_log_err(cx));
7294                            } else {
7295                                client.buffers.extend(search.await?.into_keys());
7296                            }
7297                        }
7298                        60..=69 => {
7299                            let worktree = project
7300                                .read_with(cx, |project, cx| {
7301                                    project
7302                                        .worktrees(&cx)
7303                                        .filter(|worktree| {
7304                                            let worktree = worktree.read(cx);
7305                                            worktree.is_visible()
7306                                                && worktree.entries(false).any(|e| e.is_file())
7307                                                && worktree
7308                                                    .root_entry()
7309                                                    .map_or(false, |e| e.is_dir())
7310                                        })
7311                                        .choose(&mut *rng.lock())
7312                                })
7313                                .unwrap();
7314                            let (worktree_id, worktree_root_name) = worktree
7315                                .read_with(cx, |worktree, _| {
7316                                    (worktree.id(), worktree.root_name().to_string())
7317                                });
7318
7319                            let mut new_name = String::new();
7320                            for _ in 0..10 {
7321                                let letter = rng.lock().gen_range('a'..='z');
7322                                new_name.push(letter);
7323                            }
7324                            let mut new_path = PathBuf::new();
7325                            new_path.push(new_name);
7326                            new_path.set_extension("rs");
7327                            log::info!(
7328                                "{}: creating {:?} in worktree {} ({})",
7329                                guest_username,
7330                                new_path,
7331                                worktree_id,
7332                                worktree_root_name,
7333                            );
7334                            project
7335                                .update(cx, |project, cx| {
7336                                    project.create_entry((worktree_id, new_path), false, cx)
7337                                })
7338                                .unwrap()
7339                                .await?;
7340                        }
7341                        _ => {
7342                            buffer.update(cx, |buffer, cx| {
7343                                log::info!(
7344                                    "{}: updating buffer {} ({:?})",
7345                                    guest_username,
7346                                    buffer.remote_id(),
7347                                    buffer.file().unwrap().full_path(cx)
7348                                );
7349                                if rng.lock().gen_bool(0.7) {
7350                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7351                                } else {
7352                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7353                                }
7354                            });
7355                        }
7356                    }
7357                    cx.background().simulate_random_delay().await;
7358                }
7359                Ok(())
7360            }
7361
7362            let result = simulate_guest_internal(
7363                &mut self,
7364                &guest_username,
7365                project.clone(),
7366                op_start_signal,
7367                rng,
7368                &mut cx,
7369            )
7370            .await;
7371            log::info!("{}: done", guest_username);
7372
7373            self.project = Some(project);
7374            (self, cx, result.err())
7375        }
7376    }
7377
7378    impl Drop for TestClient {
7379        fn drop(&mut self) {
7380            self.client.tear_down();
7381        }
7382    }
7383
7384    impl Executor for Arc<gpui::executor::Background> {
7385        type Sleep = gpui::executor::Timer;
7386
7387        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7388            self.spawn(future).detach();
7389        }
7390
7391        fn sleep(&self, duration: Duration) -> Self::Sleep {
7392            self.as_ref().timer(duration)
7393        }
7394    }
7395
7396    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7397        channel
7398            .messages()
7399            .cursor::<()>()
7400            .map(|m| {
7401                (
7402                    m.sender.github_login.clone(),
7403                    m.body.clone(),
7404                    m.is_pending(),
7405                )
7406            })
7407            .collect()
7408    }
7409
7410    struct EmptyView;
7411
7412    impl gpui::Entity for EmptyView {
7413        type Event = ();
7414    }
7415
7416    impl gpui::View for EmptyView {
7417        fn ui_name() -> &'static str {
7418            "empty view"
7419        }
7420
7421        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7422            gpui::Element::boxed(gpui::elements::Empty::new())
7423        }
7424    }
7425}