rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{self, ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::IntoResponse,
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::HashMap;
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::{
  40        atomic::{AtomicBool, Ordering::SeqCst},
  41        Arc,
  42    },
  43    time::Duration,
  44};
  45use store::{Store, Worktree};
  46use time::OffsetDateTime;
  47use tokio::{
  48    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  49    time::Sleep,
  50};
  51use tower::ServiceBuilder;
  52use tracing::{info_span, Instrument};
  53
  54type MessageHandler =
  55    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  56
  57struct Response<R> {
  58    server: Arc<Server>,
  59    receipt: Receipt<R>,
  60    responded: Arc<AtomicBool>,
  61}
  62
  63impl<R: RequestMessage> Response<R> {
  64    fn send(self, payload: R::Response) -> Result<()> {
  65        self.responded.store(true, SeqCst);
  66        self.server.peer.respond(self.receipt, payload)?;
  67        Ok(())
  68    }
  69
  70    fn into_receipt(self) -> Receipt<R> {
  71        self.responded.store(true, SeqCst);
  72        self.receipt
  73    }
  74}
  75
  76pub struct Server {
  77    peer: Arc<Peer>,
  78    store: RwLock<Store>,
  79    app_state: Arc<AppState>,
  80    handlers: HashMap<TypeId, MessageHandler>,
  81    notifications: Option<mpsc::UnboundedSender<()>>,
  82}
  83
  84pub trait Executor: Send + Clone {
  85    type Sleep: Send + Future;
  86    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  87    fn sleep(&self, duration: Duration) -> Self::Sleep;
  88}
  89
  90#[derive(Clone)]
  91pub struct RealExecutor;
  92
  93const MESSAGE_COUNT_PER_PAGE: usize = 100;
  94const MAX_MESSAGE_LEN: usize = 1024;
  95
  96struct StoreReadGuard<'a> {
  97    guard: RwLockReadGuard<'a, Store>,
  98    _not_send: PhantomData<Rc<()>>,
  99}
 100
 101struct StoreWriteGuard<'a> {
 102    guard: RwLockWriteGuard<'a, Store>,
 103    _not_send: PhantomData<Rc<()>>,
 104}
 105
 106impl Server {
 107    pub fn new(
 108        app_state: Arc<AppState>,
 109        notifications: Option<mpsc::UnboundedSender<()>>,
 110    ) -> Arc<Self> {
 111        let mut server = Self {
 112            peer: Peer::new(),
 113            app_state,
 114            store: Default::default(),
 115            handlers: Default::default(),
 116            notifications,
 117        };
 118
 119        server
 120            .add_request_handler(Server::ping)
 121            .add_request_handler(Server::register_project)
 122            .add_message_handler(Server::unregister_project)
 123            .add_request_handler(Server::join_project)
 124            .add_message_handler(Server::leave_project)
 125            .add_message_handler(Server::respond_to_join_project_request)
 126            .add_request_handler(Server::register_worktree)
 127            .add_message_handler(Server::unregister_worktree)
 128            .add_request_handler(Server::update_worktree)
 129            .add_message_handler(Server::start_language_server)
 130            .add_message_handler(Server::update_language_server)
 131            .add_message_handler(Server::update_diagnostic_summary)
 132            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 133            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 134            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 135            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 136            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 137            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 138            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 139            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 140            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 141            .add_request_handler(
 142                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 143            )
 144            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 145            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 146            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 147            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 148            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 149            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 150            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 151            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 152            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 153            .add_request_handler(Server::update_buffer)
 154            .add_message_handler(Server::update_buffer_file)
 155            .add_message_handler(Server::buffer_reloaded)
 156            .add_message_handler(Server::buffer_saved)
 157            .add_request_handler(Server::save_buffer)
 158            .add_request_handler(Server::get_channels)
 159            .add_request_handler(Server::get_users)
 160            .add_request_handler(Server::fuzzy_search_users)
 161            .add_request_handler(Server::request_contact)
 162            .add_request_handler(Server::remove_contact)
 163            .add_request_handler(Server::respond_to_contact_request)
 164            .add_request_handler(Server::join_channel)
 165            .add_message_handler(Server::leave_channel)
 166            .add_request_handler(Server::send_channel_message)
 167            .add_request_handler(Server::follow)
 168            .add_message_handler(Server::unfollow)
 169            .add_message_handler(Server::update_followers)
 170            .add_request_handler(Server::get_channel_messages);
 171
 172        Arc::new(server)
 173    }
 174
 175    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 176    where
 177        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 178        Fut: 'static + Send + Future<Output = Result<()>>,
 179        M: EnvelopedMessage,
 180    {
 181        let prev_handler = self.handlers.insert(
 182            TypeId::of::<M>(),
 183            Box::new(move |server, envelope| {
 184                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 185                let span = info_span!(
 186                    "handle message",
 187                    payload_type = envelope.payload_type_name(),
 188                    payload = format!("{:?}", envelope.payload).as_str(),
 189                );
 190                let future = (handler)(server, *envelope);
 191                async move {
 192                    if let Err(error) = future.await {
 193                        tracing::error!(%error, "error handling message");
 194                    }
 195                }
 196                .instrument(span)
 197                .boxed()
 198            }),
 199        );
 200        if prev_handler.is_some() {
 201            panic!("registered a handler for the same message twice");
 202        }
 203        self
 204    }
 205
 206    /// Handle a request while holding a lock to the store. This is useful when we're registering
 207    /// a connection but we want to respond on the connection before anybody else can send on it.
 208    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 209    where
 210        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
 211        Fut: Send + Future<Output = Result<()>>,
 212        M: RequestMessage,
 213    {
 214        let handler = Arc::new(handler);
 215        self.add_message_handler(move |server, envelope| {
 216            let receipt = envelope.receipt();
 217            let handler = handler.clone();
 218            async move {
 219                let responded = Arc::new(AtomicBool::default());
 220                let response = Response {
 221                    server: server.clone(),
 222                    responded: responded.clone(),
 223                    receipt: envelope.receipt(),
 224                };
 225                match (handler)(server.clone(), envelope, response).await {
 226                    Ok(()) => {
 227                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 228                            Ok(())
 229                        } else {
 230                            Err(anyhow!("handler did not send a response"))?
 231                        }
 232                    }
 233                    Err(error) => {
 234                        server.peer.respond_with_error(
 235                            receipt,
 236                            proto::Error {
 237                                message: error.to_string(),
 238                            },
 239                        )?;
 240                        Err(error)
 241                    }
 242                }
 243            }
 244        })
 245    }
 246
 247    pub fn handle_connection<E: Executor>(
 248        self: &Arc<Self>,
 249        connection: Connection,
 250        address: String,
 251        user_id: UserId,
 252        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 253        executor: E,
 254    ) -> impl Future<Output = Result<()>> {
 255        let mut this = self.clone();
 256        let span = info_span!("handle connection", %user_id, %address);
 257        async move {
 258            let (connection_id, handle_io, mut incoming_rx) = this
 259                .peer
 260                .add_connection(connection, {
 261                    let executor = executor.clone();
 262                    move |duration| {
 263                        let timer = executor.sleep(duration);
 264                        async move {
 265                            timer.await;
 266                        }
 267                    }
 268                })
 269                .await;
 270
 271            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 272
 273            if let Some(send_connection_id) = send_connection_id.as_mut() {
 274                let _ = send_connection_id.send(connection_id).await;
 275            }
 276
 277            let contacts = this.app_state.db.get_contacts(user_id).await?;
 278
 279            {
 280                let mut store = this.store_mut().await;
 281                store.add_connection(connection_id, user_id);
 282                this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 283            }
 284            this.update_user_contacts(user_id).await?;
 285
 286            let handle_io = handle_io.fuse();
 287            futures::pin_mut!(handle_io);
 288            loop {
 289                let next_message = incoming_rx.next().fuse();
 290                futures::pin_mut!(next_message);
 291                futures::select_biased! {
 292                    result = handle_io => {
 293                        if let Err(error) = result {
 294                            tracing::error!(%error, "error handling I/O");
 295                        }
 296                        break;
 297                    }
 298                    message = next_message => {
 299                        if let Some(message) = message {
 300                            let type_name = message.payload_type_name();
 301                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 302                            async {
 303                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 304                                    let notifications = this.notifications.clone();
 305                                    let is_background = message.is_background();
 306                                    let handle_message = (handler)(this.clone(), message);
 307                                    let handle_message = async move {
 308                                        handle_message.await;
 309                                        if let Some(mut notifications) = notifications {
 310                                            let _ = notifications.send(()).await;
 311                                        }
 312                                    };
 313                                    if is_background {
 314                                        executor.spawn_detached(handle_message);
 315                                    } else {
 316                                        handle_message.await;
 317                                    }
 318                                } else {
 319                                    tracing::error!("no message handler");
 320                                }
 321                            }.instrument(span).await;
 322                        } else {
 323                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 324                            break;
 325                        }
 326                    }
 327                }
 328            }
 329
 330            if let Err(error) = this.sign_out(connection_id).await {
 331                tracing::error!(%error, "error signing out");
 332            }
 333
 334            Ok(())
 335        }.instrument(span)
 336    }
 337
 338    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 339        self.peer.disconnect(connection_id);
 340
 341        let removed_user_id = {
 342            let mut store = self.store_mut().await;
 343            let removed_connection = store.remove_connection(connection_id)?;
 344
 345            for (project_id, project) in removed_connection.hosted_projects {
 346                broadcast(connection_id, project.guests.keys().copied(), |conn_id| {
 347                    self.peer
 348                        .send(conn_id, proto::UnregisterProject { project_id })
 349                });
 350            }
 351
 352            for project_id in removed_connection.guest_project_ids {
 353                if let Some(project) = store.project(project_id).trace_err() {
 354                    if project.guests.is_empty() {
 355                        self.peer
 356                            .send(
 357                                project.host_connection_id,
 358                                proto::ProjectUnshared { project_id },
 359                            )
 360                            .trace_err();
 361                    } else {
 362                        broadcast(connection_id, project.connection_ids(), |conn_id| {
 363                            self.peer.send(
 364                                conn_id,
 365                                proto::RemoveProjectCollaborator {
 366                                    project_id,
 367                                    peer_id: connection_id.0,
 368                                },
 369                            )
 370                        });
 371                    }
 372                }
 373            }
 374
 375            removed_connection.user_id
 376        };
 377
 378        self.update_user_contacts(removed_user_id).await?;
 379
 380        Ok(())
 381    }
 382
 383    async fn ping(
 384        self: Arc<Server>,
 385        _: TypedEnvelope<proto::Ping>,
 386        response: Response<proto::Ping>,
 387    ) -> Result<()> {
 388        response.send(proto::Ack {})?;
 389        Ok(())
 390    }
 391
 392    async fn register_project(
 393        self: Arc<Server>,
 394        request: TypedEnvelope<proto::RegisterProject>,
 395        response: Response<proto::RegisterProject>,
 396    ) -> Result<()> {
 397        let user_id;
 398        let project_id;
 399        {
 400            let mut state = self.store_mut().await;
 401            user_id = state.user_id_for_connection(request.sender_id)?;
 402            project_id = state.register_project(request.sender_id, user_id);
 403        };
 404        self.update_user_contacts(user_id).await?;
 405        response.send(proto::RegisterProjectResponse { project_id })?;
 406        Ok(())
 407    }
 408
 409    async fn unregister_project(
 410        self: Arc<Server>,
 411        request: TypedEnvelope<proto::UnregisterProject>,
 412    ) -> Result<()> {
 413        let user_id = {
 414            let mut state = self.store_mut().await;
 415            state.unregister_project(request.payload.project_id, request.sender_id)?;
 416            state.user_id_for_connection(request.sender_id)?
 417        };
 418
 419        self.update_user_contacts(user_id).await?;
 420        Ok(())
 421    }
 422
 423    // async fn share_project(
 424    //     self: Arc<Server>,
 425    //     request: TypedEnvelope<proto::ShareProject>,
 426    //     response: Response<proto::ShareProject>,
 427    // ) -> Result<()> {
 428    //     let user_id = {
 429    //         let mut state = self.store_mut().await;
 430    //         state.share_project(request.payload.project_id, request.sender_id)?;
 431    //         state.user_id_for_connection(request.sender_id)?
 432    //     };
 433    //     self.update_user_contacts(user_id).await?;
 434    //     response.send(proto::Ack {})?;
 435    //     Ok(())
 436    // }
 437
 438    async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
 439        let contacts = self.app_state.db.get_contacts(user_id).await?;
 440        let store = self.store().await;
 441        let updated_contact = store.contact_for_user(user_id, false);
 442        for contact in contacts {
 443            if let db::Contact::Accepted {
 444                user_id: contact_user_id,
 445                ..
 446            } = contact
 447            {
 448                for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
 449                    self.peer
 450                        .send(
 451                            contact_conn_id,
 452                            proto::UpdateContacts {
 453                                contacts: vec![updated_contact.clone()],
 454                                remove_contacts: Default::default(),
 455                                incoming_requests: Default::default(),
 456                                remove_incoming_requests: Default::default(),
 457                                outgoing_requests: Default::default(),
 458                                remove_outgoing_requests: Default::default(),
 459                            },
 460                        )
 461                        .trace_err();
 462                }
 463            }
 464        }
 465        Ok(())
 466    }
 467
 468    async fn join_project(
 469        self: Arc<Server>,
 470        request: TypedEnvelope<proto::JoinProject>,
 471        response: Response<proto::JoinProject>,
 472    ) -> Result<()> {
 473        let project_id = request.payload.project_id;
 474        let host_user_id;
 475        let guest_user_id;
 476        let host_connection_id;
 477        {
 478            let state = self.store().await;
 479            let project = state.project(project_id)?;
 480            host_user_id = project.host_user_id;
 481            host_connection_id = project.host_connection_id;
 482            guest_user_id = state.user_id_for_connection(request.sender_id)?;
 483        };
 484
 485        let has_contact = self
 486            .app_state
 487            .db
 488            .has_contact(guest_user_id, host_user_id)
 489            .await?;
 490        if !has_contact {
 491            return Err(anyhow!("no such project"))?;
 492        }
 493
 494        self.store_mut().await.request_join_project(
 495            guest_user_id,
 496            project_id,
 497            response.into_receipt(),
 498        )?;
 499        self.peer.send(
 500            host_connection_id,
 501            proto::RequestJoinProject {
 502                project_id,
 503                requester_id: guest_user_id.to_proto(),
 504            },
 505        )?;
 506        Ok(())
 507    }
 508
 509    async fn respond_to_join_project_request(
 510        self: Arc<Server>,
 511        request: TypedEnvelope<proto::RespondToJoinProjectRequest>,
 512    ) -> Result<()> {
 513        let host_user_id;
 514
 515        {
 516            let mut state = self.store_mut().await;
 517            let project_id = request.payload.project_id;
 518            let project = state.project(project_id)?;
 519            if project.host_connection_id != request.sender_id {
 520                Err(anyhow!("no such connection"))?;
 521            }
 522
 523            host_user_id = project.host_user_id;
 524            let guest_user_id = UserId::from_proto(request.payload.requester_id);
 525
 526            if !request.payload.allow {
 527                let receipts = state
 528                    .deny_join_project_request(request.sender_id, guest_user_id, project_id)
 529                    .ok_or_else(|| anyhow!("no such request"))?;
 530                for receipt in receipts {
 531                    self.peer.respond(
 532                        receipt,
 533                        proto::JoinProjectResponse {
 534                            variant: Some(proto::join_project_response::Variant::Decline(
 535                                proto::join_project_response::Decline {},
 536                            )),
 537                        },
 538                    )?;
 539                }
 540                return Ok(());
 541            }
 542
 543            let (receipts_with_replica_ids, project) = state
 544                .accept_join_project_request(request.sender_id, guest_user_id, project_id)
 545                .ok_or_else(|| anyhow!("no such request"))?;
 546
 547            let peer_count = project.guests.len();
 548            let mut collaborators = Vec::with_capacity(peer_count);
 549            collaborators.push(proto::Collaborator {
 550                peer_id: project.host_connection_id.0,
 551                replica_id: 0,
 552                user_id: project.host_user_id.to_proto(),
 553            });
 554            let worktrees = project
 555                .worktrees
 556                .iter()
 557                .filter_map(|(id, shared_worktree)| {
 558                    let worktree = project.worktrees.get(&id)?;
 559                    Some(proto::Worktree {
 560                        id: *id,
 561                        root_name: worktree.root_name.clone(),
 562                        entries: shared_worktree.entries.values().cloned().collect(),
 563                        diagnostic_summaries: shared_worktree
 564                            .diagnostic_summaries
 565                            .values()
 566                            .cloned()
 567                            .collect(),
 568                        visible: worktree.visible,
 569                        scan_id: shared_worktree.scan_id,
 570                    })
 571                })
 572                .collect::<Vec<_>>();
 573
 574            // Add all guests other than the requesting user's own connections as collaborators
 575            for (peer_conn_id, (peer_replica_id, peer_user_id)) in &project.guests {
 576                if receipts_with_replica_ids
 577                    .iter()
 578                    .all(|(receipt, _)| receipt.sender_id != *peer_conn_id)
 579                {
 580                    collaborators.push(proto::Collaborator {
 581                        peer_id: peer_conn_id.0,
 582                        replica_id: *peer_replica_id as u32,
 583                        user_id: peer_user_id.to_proto(),
 584                    });
 585                }
 586            }
 587
 588            for conn_id in project.connection_ids() {
 589                for (receipt, replica_id) in &receipts_with_replica_ids {
 590                    if conn_id != receipt.sender_id {
 591                        self.peer.send(
 592                            conn_id,
 593                            proto::AddProjectCollaborator {
 594                                project_id,
 595                                collaborator: Some(proto::Collaborator {
 596                                    peer_id: receipt.sender_id.0,
 597                                    replica_id: *replica_id as u32,
 598                                    user_id: guest_user_id.to_proto(),
 599                                }),
 600                            },
 601                        )?;
 602                    }
 603                }
 604            }
 605
 606            for (receipt, replica_id) in receipts_with_replica_ids {
 607                self.peer.respond(
 608                    receipt,
 609                    proto::JoinProjectResponse {
 610                        variant: Some(proto::join_project_response::Variant::Accept(
 611                            proto::join_project_response::Accept {
 612                                worktrees: worktrees.clone(),
 613                                replica_id: replica_id as u32,
 614                                collaborators: collaborators.clone(),
 615                                language_servers: project.language_servers.clone(),
 616                            },
 617                        )),
 618                    },
 619                )?;
 620            }
 621        }
 622
 623        self.update_user_contacts(host_user_id).await?;
 624        Ok(())
 625    }
 626
 627    async fn leave_project(
 628        self: Arc<Server>,
 629        request: TypedEnvelope<proto::LeaveProject>,
 630    ) -> Result<()> {
 631        let sender_id = request.sender_id;
 632        let project_id = request.payload.project_id;
 633        let project;
 634        {
 635            let mut state = self.store_mut().await;
 636            project = state.leave_project(sender_id, project_id)?;
 637            let unshare = project.connection_ids.len() <= 1;
 638            broadcast(sender_id, project.connection_ids, |conn_id| {
 639                self.peer.send(
 640                    conn_id,
 641                    proto::RemoveProjectCollaborator {
 642                        project_id,
 643                        peer_id: sender_id.0,
 644                    },
 645                )
 646            });
 647            if unshare {
 648                self.peer.send(
 649                    project.host_connection_id,
 650                    proto::ProjectUnshared { project_id },
 651                )?;
 652            }
 653        }
 654        self.update_user_contacts(project.host_user_id).await?;
 655        Ok(())
 656    }
 657
 658    async fn register_worktree(
 659        self: Arc<Server>,
 660        request: TypedEnvelope<proto::RegisterWorktree>,
 661        response: Response<proto::RegisterWorktree>,
 662    ) -> Result<()> {
 663        let host_user_id;
 664        {
 665            let mut state = self.store_mut().await;
 666            host_user_id = state.user_id_for_connection(request.sender_id)?;
 667
 668            let guest_connection_ids = state
 669                .read_project(request.payload.project_id, request.sender_id)?
 670                .guest_connection_ids();
 671            state.register_worktree(
 672                request.payload.project_id,
 673                request.payload.worktree_id,
 674                request.sender_id,
 675                Worktree {
 676                    root_name: request.payload.root_name.clone(),
 677                    visible: request.payload.visible,
 678                    ..Default::default()
 679                },
 680            )?;
 681
 682            broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 683                self.peer
 684                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 685            });
 686        }
 687        self.update_user_contacts(host_user_id).await?;
 688        response.send(proto::Ack {})?;
 689        Ok(())
 690    }
 691
 692    async fn unregister_worktree(
 693        self: Arc<Server>,
 694        request: TypedEnvelope<proto::UnregisterWorktree>,
 695    ) -> Result<()> {
 696        let host_user_id;
 697        let project_id = request.payload.project_id;
 698        let worktree_id = request.payload.worktree_id;
 699        {
 700            let mut state = self.store_mut().await;
 701            let (_, guest_connection_ids) =
 702                state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 703            host_user_id = state.user_id_for_connection(request.sender_id)?;
 704            broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 705                self.peer.send(
 706                    conn_id,
 707                    proto::UnregisterWorktree {
 708                        project_id,
 709                        worktree_id,
 710                    },
 711                )
 712            });
 713        }
 714        self.update_user_contacts(host_user_id).await?;
 715        Ok(())
 716    }
 717
 718    async fn update_worktree(
 719        self: Arc<Server>,
 720        request: TypedEnvelope<proto::UpdateWorktree>,
 721        response: Response<proto::UpdateWorktree>,
 722    ) -> Result<()> {
 723        let connection_ids = self.store_mut().await.update_worktree(
 724            request.sender_id,
 725            request.payload.project_id,
 726            request.payload.worktree_id,
 727            &request.payload.removed_entries,
 728            &request.payload.updated_entries,
 729            request.payload.scan_id,
 730        )?;
 731
 732        broadcast(request.sender_id, connection_ids, |connection_id| {
 733            self.peer
 734                .forward_send(request.sender_id, connection_id, request.payload.clone())
 735        });
 736        response.send(proto::Ack {})?;
 737        Ok(())
 738    }
 739
 740    async fn update_diagnostic_summary(
 741        self: Arc<Server>,
 742        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 743    ) -> Result<()> {
 744        let summary = request
 745            .payload
 746            .summary
 747            .clone()
 748            .ok_or_else(|| anyhow!("invalid summary"))?;
 749        let receiver_ids = self.store_mut().await.update_diagnostic_summary(
 750            request.payload.project_id,
 751            request.payload.worktree_id,
 752            request.sender_id,
 753            summary,
 754        )?;
 755
 756        broadcast(request.sender_id, receiver_ids, |connection_id| {
 757            self.peer
 758                .forward_send(request.sender_id, connection_id, request.payload.clone())
 759        });
 760        Ok(())
 761    }
 762
 763    async fn start_language_server(
 764        self: Arc<Server>,
 765        request: TypedEnvelope<proto::StartLanguageServer>,
 766    ) -> Result<()> {
 767        let receiver_ids = self.store_mut().await.start_language_server(
 768            request.payload.project_id,
 769            request.sender_id,
 770            request
 771                .payload
 772                .server
 773                .clone()
 774                .ok_or_else(|| anyhow!("invalid language server"))?,
 775        )?;
 776        broadcast(request.sender_id, receiver_ids, |connection_id| {
 777            self.peer
 778                .forward_send(request.sender_id, connection_id, request.payload.clone())
 779        });
 780        Ok(())
 781    }
 782
 783    async fn update_language_server(
 784        self: Arc<Server>,
 785        request: TypedEnvelope<proto::UpdateLanguageServer>,
 786    ) -> Result<()> {
 787        let receiver_ids = self
 788            .store()
 789            .await
 790            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 791        broadcast(request.sender_id, receiver_ids, |connection_id| {
 792            self.peer
 793                .forward_send(request.sender_id, connection_id, request.payload.clone())
 794        });
 795        Ok(())
 796    }
 797
 798    async fn forward_project_request<T>(
 799        self: Arc<Server>,
 800        request: TypedEnvelope<T>,
 801        response: Response<T>,
 802    ) -> Result<()>
 803    where
 804        T: EntityMessage + RequestMessage,
 805    {
 806        let host_connection_id = self
 807            .store()
 808            .await
 809            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 810            .host_connection_id;
 811
 812        response.send(
 813            self.peer
 814                .forward_request(request.sender_id, host_connection_id, request.payload)
 815                .await?,
 816        )?;
 817        Ok(())
 818    }
 819
 820    async fn save_buffer(
 821        self: Arc<Server>,
 822        request: TypedEnvelope<proto::SaveBuffer>,
 823        response: Response<proto::SaveBuffer>,
 824    ) -> Result<()> {
 825        let host = self
 826            .store()
 827            .await
 828            .read_project(request.payload.project_id, request.sender_id)?
 829            .host_connection_id;
 830        let response_payload = self
 831            .peer
 832            .forward_request(request.sender_id, host, request.payload.clone())
 833            .await?;
 834
 835        let mut guests = self
 836            .store()
 837            .await
 838            .read_project(request.payload.project_id, request.sender_id)?
 839            .connection_ids();
 840        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 841        broadcast(host, guests, |conn_id| {
 842            self.peer
 843                .forward_send(host, conn_id, response_payload.clone())
 844        });
 845        response.send(response_payload)?;
 846        Ok(())
 847    }
 848
 849    async fn update_buffer(
 850        self: Arc<Server>,
 851        request: TypedEnvelope<proto::UpdateBuffer>,
 852        response: Response<proto::UpdateBuffer>,
 853    ) -> Result<()> {
 854        let receiver_ids = self
 855            .store()
 856            .await
 857            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 858        broadcast(request.sender_id, receiver_ids, |connection_id| {
 859            self.peer
 860                .forward_send(request.sender_id, connection_id, request.payload.clone())
 861        });
 862        response.send(proto::Ack {})?;
 863        Ok(())
 864    }
 865
 866    async fn update_buffer_file(
 867        self: Arc<Server>,
 868        request: TypedEnvelope<proto::UpdateBufferFile>,
 869    ) -> Result<()> {
 870        let receiver_ids = self
 871            .store()
 872            .await
 873            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 874        broadcast(request.sender_id, receiver_ids, |connection_id| {
 875            self.peer
 876                .forward_send(request.sender_id, connection_id, request.payload.clone())
 877        });
 878        Ok(())
 879    }
 880
 881    async fn buffer_reloaded(
 882        self: Arc<Server>,
 883        request: TypedEnvelope<proto::BufferReloaded>,
 884    ) -> Result<()> {
 885        let receiver_ids = self
 886            .store()
 887            .await
 888            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 889        broadcast(request.sender_id, receiver_ids, |connection_id| {
 890            self.peer
 891                .forward_send(request.sender_id, connection_id, request.payload.clone())
 892        });
 893        Ok(())
 894    }
 895
 896    async fn buffer_saved(
 897        self: Arc<Server>,
 898        request: TypedEnvelope<proto::BufferSaved>,
 899    ) -> Result<()> {
 900        let receiver_ids = self
 901            .store()
 902            .await
 903            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 904        broadcast(request.sender_id, receiver_ids, |connection_id| {
 905            self.peer
 906                .forward_send(request.sender_id, connection_id, request.payload.clone())
 907        });
 908        Ok(())
 909    }
 910
 911    async fn follow(
 912        self: Arc<Self>,
 913        request: TypedEnvelope<proto::Follow>,
 914        response: Response<proto::Follow>,
 915    ) -> Result<()> {
 916        let leader_id = ConnectionId(request.payload.leader_id);
 917        let follower_id = request.sender_id;
 918        if !self
 919            .store()
 920            .await
 921            .project_connection_ids(request.payload.project_id, follower_id)?
 922            .contains(&leader_id)
 923        {
 924            Err(anyhow!("no such peer"))?;
 925        }
 926        let mut response_payload = self
 927            .peer
 928            .forward_request(request.sender_id, leader_id, request.payload)
 929            .await?;
 930        response_payload
 931            .views
 932            .retain(|view| view.leader_id != Some(follower_id.0));
 933        response.send(response_payload)?;
 934        Ok(())
 935    }
 936
 937    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 938        let leader_id = ConnectionId(request.payload.leader_id);
 939        if !self
 940            .store()
 941            .await
 942            .project_connection_ids(request.payload.project_id, request.sender_id)?
 943            .contains(&leader_id)
 944        {
 945            Err(anyhow!("no such peer"))?;
 946        }
 947        self.peer
 948            .forward_send(request.sender_id, leader_id, request.payload)?;
 949        Ok(())
 950    }
 951
 952    async fn update_followers(
 953        self: Arc<Self>,
 954        request: TypedEnvelope<proto::UpdateFollowers>,
 955    ) -> Result<()> {
 956        let connection_ids = self
 957            .store()
 958            .await
 959            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 960        let leader_id = request
 961            .payload
 962            .variant
 963            .as_ref()
 964            .and_then(|variant| match variant {
 965                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 966                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 967                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 968            });
 969        for follower_id in &request.payload.follower_ids {
 970            let follower_id = ConnectionId(*follower_id);
 971            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 972                self.peer
 973                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 974            }
 975        }
 976        Ok(())
 977    }
 978
 979    async fn get_channels(
 980        self: Arc<Server>,
 981        request: TypedEnvelope<proto::GetChannels>,
 982        response: Response<proto::GetChannels>,
 983    ) -> Result<()> {
 984        let user_id = self
 985            .store()
 986            .await
 987            .user_id_for_connection(request.sender_id)?;
 988        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 989        response.send(proto::GetChannelsResponse {
 990            channels: channels
 991                .into_iter()
 992                .map(|chan| proto::Channel {
 993                    id: chan.id.to_proto(),
 994                    name: chan.name,
 995                })
 996                .collect(),
 997        })?;
 998        Ok(())
 999    }
1000
1001    async fn get_users(
1002        self: Arc<Server>,
1003        request: TypedEnvelope<proto::GetUsers>,
1004        response: Response<proto::GetUsers>,
1005    ) -> Result<()> {
1006        let user_ids = request
1007            .payload
1008            .user_ids
1009            .into_iter()
1010            .map(UserId::from_proto)
1011            .collect();
1012        let users = self
1013            .app_state
1014            .db
1015            .get_users_by_ids(user_ids)
1016            .await?
1017            .into_iter()
1018            .map(|user| proto::User {
1019                id: user.id.to_proto(),
1020                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1021                github_login: user.github_login,
1022            })
1023            .collect();
1024        response.send(proto::UsersResponse { users })?;
1025        Ok(())
1026    }
1027
1028    async fn fuzzy_search_users(
1029        self: Arc<Server>,
1030        request: TypedEnvelope<proto::FuzzySearchUsers>,
1031        response: Response<proto::FuzzySearchUsers>,
1032    ) -> Result<()> {
1033        let user_id = self
1034            .store()
1035            .await
1036            .user_id_for_connection(request.sender_id)?;
1037        let query = request.payload.query;
1038        let db = &self.app_state.db;
1039        let users = match query.len() {
1040            0 => vec![],
1041            1 | 2 => db
1042                .get_user_by_github_login(&query)
1043                .await?
1044                .into_iter()
1045                .collect(),
1046            _ => db.fuzzy_search_users(&query, 10).await?,
1047        };
1048        let users = users
1049            .into_iter()
1050            .filter(|user| user.id != user_id)
1051            .map(|user| proto::User {
1052                id: user.id.to_proto(),
1053                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
1054                github_login: user.github_login,
1055            })
1056            .collect();
1057        response.send(proto::UsersResponse { users })?;
1058        Ok(())
1059    }
1060
1061    async fn request_contact(
1062        self: Arc<Server>,
1063        request: TypedEnvelope<proto::RequestContact>,
1064        response: Response<proto::RequestContact>,
1065    ) -> Result<()> {
1066        let requester_id = self
1067            .store()
1068            .await
1069            .user_id_for_connection(request.sender_id)?;
1070        let responder_id = UserId::from_proto(request.payload.responder_id);
1071        if requester_id == responder_id {
1072            return Err(anyhow!("cannot add yourself as a contact"))?;
1073        }
1074
1075        self.app_state
1076            .db
1077            .send_contact_request(requester_id, responder_id)
1078            .await?;
1079
1080        // Update outgoing contact requests of requester
1081        let mut update = proto::UpdateContacts::default();
1082        update.outgoing_requests.push(responder_id.to_proto());
1083        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1084            self.peer.send(connection_id, update.clone())?;
1085        }
1086
1087        // Update incoming contact requests of responder
1088        let mut update = proto::UpdateContacts::default();
1089        update
1090            .incoming_requests
1091            .push(proto::IncomingContactRequest {
1092                requester_id: requester_id.to_proto(),
1093                should_notify: true,
1094            });
1095        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1096            self.peer.send(connection_id, update.clone())?;
1097        }
1098
1099        response.send(proto::Ack {})?;
1100        Ok(())
1101    }
1102
1103    async fn respond_to_contact_request(
1104        self: Arc<Server>,
1105        request: TypedEnvelope<proto::RespondToContactRequest>,
1106        response: Response<proto::RespondToContactRequest>,
1107    ) -> Result<()> {
1108        let responder_id = self
1109            .store()
1110            .await
1111            .user_id_for_connection(request.sender_id)?;
1112        let requester_id = UserId::from_proto(request.payload.requester_id);
1113        if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1114            self.app_state
1115                .db
1116                .dismiss_contact_notification(responder_id, requester_id)
1117                .await?;
1118        } else {
1119            let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1120            self.app_state
1121                .db
1122                .respond_to_contact_request(responder_id, requester_id, accept)
1123                .await?;
1124
1125            let store = self.store().await;
1126            // Update responder with new contact
1127            let mut update = proto::UpdateContacts::default();
1128            if accept {
1129                update
1130                    .contacts
1131                    .push(store.contact_for_user(requester_id, false));
1132            }
1133            update
1134                .remove_incoming_requests
1135                .push(requester_id.to_proto());
1136            for connection_id in store.connection_ids_for_user(responder_id) {
1137                self.peer.send(connection_id, update.clone())?;
1138            }
1139
1140            // Update requester with new contact
1141            let mut update = proto::UpdateContacts::default();
1142            if accept {
1143                update
1144                    .contacts
1145                    .push(store.contact_for_user(responder_id, true));
1146            }
1147            update
1148                .remove_outgoing_requests
1149                .push(responder_id.to_proto());
1150            for connection_id in store.connection_ids_for_user(requester_id) {
1151                self.peer.send(connection_id, update.clone())?;
1152            }
1153        }
1154
1155        response.send(proto::Ack {})?;
1156        Ok(())
1157    }
1158
1159    async fn remove_contact(
1160        self: Arc<Server>,
1161        request: TypedEnvelope<proto::RemoveContact>,
1162        response: Response<proto::RemoveContact>,
1163    ) -> Result<()> {
1164        let requester_id = self
1165            .store()
1166            .await
1167            .user_id_for_connection(request.sender_id)?;
1168        let responder_id = UserId::from_proto(request.payload.user_id);
1169        self.app_state
1170            .db
1171            .remove_contact(requester_id, responder_id)
1172            .await?;
1173
1174        // Update outgoing contact requests of requester
1175        let mut update = proto::UpdateContacts::default();
1176        update
1177            .remove_outgoing_requests
1178            .push(responder_id.to_proto());
1179        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1180            self.peer.send(connection_id, update.clone())?;
1181        }
1182
1183        // Update incoming contact requests of responder
1184        let mut update = proto::UpdateContacts::default();
1185        update
1186            .remove_incoming_requests
1187            .push(requester_id.to_proto());
1188        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1189            self.peer.send(connection_id, update.clone())?;
1190        }
1191
1192        response.send(proto::Ack {})?;
1193        Ok(())
1194    }
1195
1196    // #[instrument(skip(self, state, user_ids))]
1197    // fn update_contacts_for_users<'a>(
1198    //     self: &Arc<Self>,
1199    //     state: &Store,
1200    //     user_ids: impl IntoIterator<Item = &'a UserId>,
1201    // ) {
1202    //     for user_id in user_ids {
1203    //         let contacts = state.contacts_for_user(*user_id);
1204    //         for connection_id in state.connection_ids_for_user(*user_id) {
1205    //             self.peer
1206    //                 .send(
1207    //                     connection_id,
1208    //                     proto::UpdateContacts {
1209    //                         contacts: contacts.clone(),
1210    //                         pending_requests_from_user_ids: Default::default(),
1211    //                         pending_requests_to_user_ids: Default::default(),
1212    //                     },
1213    //                 )
1214    //                 .trace_err();
1215    //         }
1216    //     }
1217    // }
1218
1219    async fn join_channel(
1220        self: Arc<Self>,
1221        request: TypedEnvelope<proto::JoinChannel>,
1222        response: Response<proto::JoinChannel>,
1223    ) -> Result<()> {
1224        let user_id = self
1225            .store()
1226            .await
1227            .user_id_for_connection(request.sender_id)?;
1228        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1229        if !self
1230            .app_state
1231            .db
1232            .can_user_access_channel(user_id, channel_id)
1233            .await?
1234        {
1235            Err(anyhow!("access denied"))?;
1236        }
1237
1238        self.store_mut()
1239            .await
1240            .join_channel(request.sender_id, channel_id);
1241        let messages = self
1242            .app_state
1243            .db
1244            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1245            .await?
1246            .into_iter()
1247            .map(|msg| proto::ChannelMessage {
1248                id: msg.id.to_proto(),
1249                body: msg.body,
1250                timestamp: msg.sent_at.unix_timestamp() as u64,
1251                sender_id: msg.sender_id.to_proto(),
1252                nonce: Some(msg.nonce.as_u128().into()),
1253            })
1254            .collect::<Vec<_>>();
1255        response.send(proto::JoinChannelResponse {
1256            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1257            messages,
1258        })?;
1259        Ok(())
1260    }
1261
1262    async fn leave_channel(
1263        self: Arc<Self>,
1264        request: TypedEnvelope<proto::LeaveChannel>,
1265    ) -> Result<()> {
1266        let user_id = self
1267            .store()
1268            .await
1269            .user_id_for_connection(request.sender_id)?;
1270        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1271        if !self
1272            .app_state
1273            .db
1274            .can_user_access_channel(user_id, channel_id)
1275            .await?
1276        {
1277            Err(anyhow!("access denied"))?;
1278        }
1279
1280        self.store_mut()
1281            .await
1282            .leave_channel(request.sender_id, channel_id);
1283
1284        Ok(())
1285    }
1286
1287    async fn send_channel_message(
1288        self: Arc<Self>,
1289        request: TypedEnvelope<proto::SendChannelMessage>,
1290        response: Response<proto::SendChannelMessage>,
1291    ) -> Result<()> {
1292        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1293        let user_id;
1294        let connection_ids;
1295        {
1296            let state = self.store().await;
1297            user_id = state.user_id_for_connection(request.sender_id)?;
1298            connection_ids = state.channel_connection_ids(channel_id)?;
1299        }
1300
1301        // Validate the message body.
1302        let body = request.payload.body.trim().to_string();
1303        if body.len() > MAX_MESSAGE_LEN {
1304            return Err(anyhow!("message is too long"))?;
1305        }
1306        if body.is_empty() {
1307            return Err(anyhow!("message can't be blank"))?;
1308        }
1309
1310        let timestamp = OffsetDateTime::now_utc();
1311        let nonce = request
1312            .payload
1313            .nonce
1314            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1315
1316        let message_id = self
1317            .app_state
1318            .db
1319            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1320            .await?
1321            .to_proto();
1322        let message = proto::ChannelMessage {
1323            sender_id: user_id.to_proto(),
1324            id: message_id,
1325            body,
1326            timestamp: timestamp.unix_timestamp() as u64,
1327            nonce: Some(nonce),
1328        };
1329        broadcast(request.sender_id, connection_ids, |conn_id| {
1330            self.peer.send(
1331                conn_id,
1332                proto::ChannelMessageSent {
1333                    channel_id: channel_id.to_proto(),
1334                    message: Some(message.clone()),
1335                },
1336            )
1337        });
1338        response.send(proto::SendChannelMessageResponse {
1339            message: Some(message),
1340        })?;
1341        Ok(())
1342    }
1343
1344    async fn get_channel_messages(
1345        self: Arc<Self>,
1346        request: TypedEnvelope<proto::GetChannelMessages>,
1347        response: Response<proto::GetChannelMessages>,
1348    ) -> Result<()> {
1349        let user_id = self
1350            .store()
1351            .await
1352            .user_id_for_connection(request.sender_id)?;
1353        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1354        if !self
1355            .app_state
1356            .db
1357            .can_user_access_channel(user_id, channel_id)
1358            .await?
1359        {
1360            Err(anyhow!("access denied"))?;
1361        }
1362
1363        let messages = self
1364            .app_state
1365            .db
1366            .get_channel_messages(
1367                channel_id,
1368                MESSAGE_COUNT_PER_PAGE,
1369                Some(MessageId::from_proto(request.payload.before_message_id)),
1370            )
1371            .await?
1372            .into_iter()
1373            .map(|msg| proto::ChannelMessage {
1374                id: msg.id.to_proto(),
1375                body: msg.body,
1376                timestamp: msg.sent_at.unix_timestamp() as u64,
1377                sender_id: msg.sender_id.to_proto(),
1378                nonce: Some(msg.nonce.as_u128().into()),
1379            })
1380            .collect::<Vec<_>>();
1381        response.send(proto::GetChannelMessagesResponse {
1382            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1383            messages,
1384        })?;
1385        Ok(())
1386    }
1387
1388    async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1389        #[cfg(test)]
1390        tokio::task::yield_now().await;
1391        let guard = self.store.read().await;
1392        #[cfg(test)]
1393        tokio::task::yield_now().await;
1394        StoreReadGuard {
1395            guard,
1396            _not_send: PhantomData,
1397        }
1398    }
1399
1400    async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1401        #[cfg(test)]
1402        tokio::task::yield_now().await;
1403        let guard = self.store.write().await;
1404        #[cfg(test)]
1405        tokio::task::yield_now().await;
1406        StoreWriteGuard {
1407            guard,
1408            _not_send: PhantomData,
1409        }
1410    }
1411}
1412
1413impl<'a> Deref for StoreReadGuard<'a> {
1414    type Target = Store;
1415
1416    fn deref(&self) -> &Self::Target {
1417        &*self.guard
1418    }
1419}
1420
1421impl<'a> Deref for StoreWriteGuard<'a> {
1422    type Target = Store;
1423
1424    fn deref(&self) -> &Self::Target {
1425        &*self.guard
1426    }
1427}
1428
1429impl<'a> DerefMut for StoreWriteGuard<'a> {
1430    fn deref_mut(&mut self) -> &mut Self::Target {
1431        &mut *self.guard
1432    }
1433}
1434
1435impl<'a> Drop for StoreWriteGuard<'a> {
1436    fn drop(&mut self) {
1437        #[cfg(test)]
1438        self.check_invariants();
1439
1440        let metrics = self.metrics();
1441        tracing::info!(
1442            connections = metrics.connections,
1443            registered_projects = metrics.registered_projects,
1444            shared_projects = metrics.shared_projects,
1445            "metrics"
1446        );
1447    }
1448}
1449
1450impl Executor for RealExecutor {
1451    type Sleep = Sleep;
1452
1453    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1454        tokio::task::spawn(future);
1455    }
1456
1457    fn sleep(&self, duration: Duration) -> Self::Sleep {
1458        tokio::time::sleep(duration)
1459    }
1460}
1461
1462fn broadcast<F>(
1463    sender_id: ConnectionId,
1464    receiver_ids: impl IntoIterator<Item = ConnectionId>,
1465    mut f: F,
1466) where
1467    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1468{
1469    for receiver_id in receiver_ids {
1470        if receiver_id != sender_id {
1471            f(receiver_id).trace_err();
1472        }
1473    }
1474}
1475
1476lazy_static! {
1477    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1478}
1479
1480pub struct ProtocolVersion(u32);
1481
1482impl Header for ProtocolVersion {
1483    fn name() -> &'static HeaderName {
1484        &ZED_PROTOCOL_VERSION
1485    }
1486
1487    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1488    where
1489        Self: Sized,
1490        I: Iterator<Item = &'i axum::http::HeaderValue>,
1491    {
1492        let version = values
1493            .next()
1494            .ok_or_else(|| axum::headers::Error::invalid())?
1495            .to_str()
1496            .map_err(|_| axum::headers::Error::invalid())?
1497            .parse()
1498            .map_err(|_| axum::headers::Error::invalid())?;
1499        Ok(Self(version))
1500    }
1501
1502    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1503        values.extend([self.0.to_string().parse().unwrap()]);
1504    }
1505}
1506
1507pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1508    let server = Server::new(app_state.clone(), None);
1509    Router::new()
1510        .route("/rpc", get(handle_websocket_request))
1511        .layer(
1512            ServiceBuilder::new()
1513                .layer(Extension(app_state))
1514                .layer(middleware::from_fn(auth::validate_header))
1515                .layer(Extension(server)),
1516        )
1517}
1518
1519pub async fn handle_websocket_request(
1520    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1521    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1522    Extension(server): Extension<Arc<Server>>,
1523    Extension(user_id): Extension<UserId>,
1524    ws: WebSocketUpgrade,
1525) -> axum::response::Response {
1526    if protocol_version != rpc::PROTOCOL_VERSION {
1527        return (
1528            StatusCode::UPGRADE_REQUIRED,
1529            "client must be upgraded".to_string(),
1530        )
1531            .into_response();
1532    }
1533    let socket_address = socket_address.to_string();
1534    ws.on_upgrade(move |socket| {
1535        use util::ResultExt;
1536        let socket = socket
1537            .map_ok(to_tungstenite_message)
1538            .err_into()
1539            .with(|message| async move { Ok(to_axum_message(message)) });
1540        let connection = Connection::new(Box::pin(socket));
1541        async move {
1542            server
1543                .handle_connection(connection, socket_address, user_id, None, RealExecutor)
1544                .await
1545                .log_err();
1546        }
1547    })
1548}
1549
1550fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1551    match message {
1552        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1553        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1554        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1555        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1556        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1557            code: frame.code.into(),
1558            reason: frame.reason,
1559        })),
1560    }
1561}
1562
1563fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1564    match message {
1565        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1566        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1567        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1568        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1569        AxumMessage::Close(frame) => {
1570            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1571                code: frame.code.into(),
1572                reason: frame.reason,
1573            }))
1574        }
1575    }
1576}
1577
1578pub trait ResultExt {
1579    type Ok;
1580
1581    fn trace_err(self) -> Option<Self::Ok>;
1582}
1583
1584impl<T, E> ResultExt for Result<T, E>
1585where
1586    E: std::fmt::Debug,
1587{
1588    type Ok = T;
1589
1590    fn trace_err(self) -> Option<T> {
1591        match self {
1592            Ok(value) => Some(value),
1593            Err(error) => {
1594                tracing::error!("{:?}", error);
1595                None
1596            }
1597        }
1598    }
1599}
1600
1601#[cfg(test)]
1602mod tests {
1603    use super::*;
1604    use crate::{
1605        db::{tests::TestDb, UserId},
1606        AppState,
1607    };
1608    use ::rpc::Peer;
1609    use client::{
1610        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1611        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1612    };
1613    use collections::{BTreeMap, HashSet};
1614    use editor::{
1615        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1616        ToOffset, ToggleCodeActions, Undo,
1617    };
1618    use gpui::{
1619        executor::{self, Deterministic},
1620        geometry::vector::vec2f,
1621        ModelHandle, TestAppContext, ViewHandle,
1622    };
1623    use language::{
1624        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1625        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1626    };
1627    use lsp::{self, FakeLanguageServer};
1628    use parking_lot::Mutex;
1629    use project::{
1630        fs::{FakeFs, Fs as _},
1631        search::SearchQuery,
1632        worktree::WorktreeHandle,
1633        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1634    };
1635    use rand::prelude::*;
1636    use rpc::PeerId;
1637    use serde_json::json;
1638    use settings::Settings;
1639    use sqlx::types::time::OffsetDateTime;
1640    use std::{
1641        env,
1642        ops::Deref,
1643        path::{Path, PathBuf},
1644        rc::Rc,
1645        sync::{
1646            atomic::{AtomicBool, Ordering::SeqCst},
1647            Arc,
1648        },
1649        time::Duration,
1650    };
1651    use theme::ThemeRegistry;
1652    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1653
1654    #[cfg(test)]
1655    #[ctor::ctor]
1656    fn init_logger() {
1657        if std::env::var("RUST_LOG").is_ok() {
1658            env_logger::init();
1659        }
1660    }
1661
1662    #[gpui::test(iterations = 10)]
1663    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1664        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1665        let lang_registry = Arc::new(LanguageRegistry::test());
1666        let fs = FakeFs::new(cx_a.background());
1667        cx_a.foreground().forbid_parking();
1668
1669        // Connect to a server as 2 clients.
1670        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1671        let client_a = server.create_client(cx_a, "user_a").await;
1672        let mut client_b = server.create_client(cx_b, "user_b").await;
1673        server
1674            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1675            .await;
1676
1677        // Share a project as client A
1678        fs.insert_tree(
1679            "/a",
1680            json!({
1681                "a.txt": "a-contents",
1682                "b.txt": "b-contents",
1683            }),
1684        )
1685        .await;
1686        let project_a = cx_a.update(|cx| {
1687            Project::local(
1688                client_a.clone(),
1689                client_a.user_store.clone(),
1690                lang_registry.clone(),
1691                fs.clone(),
1692                cx,
1693            )
1694        });
1695        let (worktree_a, _) = project_a
1696            .update(cx_a, |p, cx| {
1697                p.find_or_create_local_worktree("/a", true, cx)
1698            })
1699            .await
1700            .unwrap();
1701        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1702        worktree_a
1703            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1704            .await;
1705
1706        // Join that project as client B
1707        let client_b_peer_id = client_b.peer_id;
1708        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1709
1710        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1711            assert_eq!(
1712                project
1713                    .collaborators()
1714                    .get(&client_a.peer_id)
1715                    .unwrap()
1716                    .user
1717                    .github_login,
1718                "user_a"
1719            );
1720            project.replica_id()
1721        });
1722        project_a
1723            .condition(&cx_a, |tree, _| {
1724                tree.collaborators()
1725                    .get(&client_b_peer_id)
1726                    .map_or(false, |collaborator| {
1727                        collaborator.replica_id == replica_id_b
1728                            && collaborator.user.github_login == "user_b"
1729                    })
1730            })
1731            .await;
1732
1733        // Open the same file as client B and client A.
1734        let buffer_b = project_b
1735            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1736            .await
1737            .unwrap();
1738        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1739        project_a.read_with(cx_a, |project, cx| {
1740            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1741        });
1742        let buffer_a = project_a
1743            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1744            .await
1745            .unwrap();
1746
1747        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1748
1749        // TODO
1750        // // Create a selection set as client B and see that selection set as client A.
1751        // buffer_a
1752        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1753        //     .await;
1754
1755        // Edit the buffer as client B and see that edit as client A.
1756        editor_b.update(cx_b, |editor, cx| {
1757            editor.handle_input(&Input("ok, ".into()), cx)
1758        });
1759        buffer_a
1760            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1761            .await;
1762
1763        // TODO
1764        // // Remove the selection set as client B, see those selections disappear as client A.
1765        cx_b.update(move |_| drop(editor_b));
1766        // buffer_a
1767        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1768        //     .await;
1769
1770        // Dropping the client B's project removes client B from client A's collaborators.
1771        cx_b.update(move |_| {
1772            drop(client_b.project.take());
1773            drop(project_b);
1774        });
1775        project_a
1776            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1777            .await;
1778    }
1779
1780    #[gpui::test(iterations = 10)]
1781    async fn test_unshare_project(
1782        deterministic: Arc<Deterministic>,
1783        cx_a: &mut TestAppContext,
1784        cx_b: &mut TestAppContext,
1785    ) {
1786        let lang_registry = Arc::new(LanguageRegistry::test());
1787        let fs = FakeFs::new(cx_a.background());
1788        cx_a.foreground().forbid_parking();
1789
1790        // Connect to a server as 2 clients.
1791        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1792        let client_a = server.create_client(cx_a, "user_a").await;
1793        let mut client_b = server.create_client(cx_b, "user_b").await;
1794        server
1795            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1796            .await;
1797
1798        // Share a project as client A
1799        fs.insert_tree(
1800            "/a",
1801            json!({
1802                "a.txt": "a-contents",
1803                "b.txt": "b-contents",
1804            }),
1805        )
1806        .await;
1807        let project_a = cx_a.update(|cx| {
1808            Project::local(
1809                client_a.clone(),
1810                client_a.user_store.clone(),
1811                lang_registry.clone(),
1812                fs.clone(),
1813                cx,
1814            )
1815        });
1816        let (worktree_a, _) = project_a
1817            .update(cx_a, |p, cx| {
1818                p.find_or_create_local_worktree("/a", true, cx)
1819            })
1820            .await
1821            .unwrap();
1822        worktree_a
1823            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1824            .await;
1825        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1826
1827        // Join that project as client B
1828        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1829        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1830        project_b
1831            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1832            .await
1833            .unwrap();
1834
1835        // When client B leaves the project, it gets automatically unshared.
1836        cx_b.update(|_| {
1837            drop(client_b.project.take());
1838            drop(project_b);
1839        });
1840        deterministic.run_until_parked();
1841        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1842
1843        // When client B joins again, the project gets re-shared.
1844        let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1845        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1846        project_b2
1847            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1848            .await
1849            .unwrap();
1850    }
1851
1852    #[gpui::test(iterations = 10)]
1853    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1854        let lang_registry = Arc::new(LanguageRegistry::test());
1855        let fs = FakeFs::new(cx_a.background());
1856        cx_a.foreground().forbid_parking();
1857
1858        // Connect to a server as 2 clients.
1859        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1860        let client_a = server.create_client(cx_a, "user_a").await;
1861        let mut client_b = server.create_client(cx_b, "user_b").await;
1862        server
1863            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1864            .await;
1865
1866        // Share a project as client A
1867        fs.insert_tree(
1868            "/a",
1869            json!({
1870                "a.txt": "a-contents",
1871                "b.txt": "b-contents",
1872            }),
1873        )
1874        .await;
1875        let project_a = cx_a.update(|cx| {
1876            Project::local(
1877                client_a.clone(),
1878                client_a.user_store.clone(),
1879                lang_registry.clone(),
1880                fs.clone(),
1881                cx,
1882            )
1883        });
1884        let (worktree_a, _) = project_a
1885            .update(cx_a, |p, cx| {
1886                p.find_or_create_local_worktree("/a", true, cx)
1887            })
1888            .await
1889            .unwrap();
1890        worktree_a
1891            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1892            .await;
1893        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1894
1895        // Join that project as client B
1896        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1897        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1898        project_b
1899            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1900            .await
1901            .unwrap();
1902
1903        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1904        server.disconnect_client(client_a.current_user_id(cx_a));
1905        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1906        project_a
1907            .condition(cx_a, |project, _| project.collaborators().is_empty())
1908            .await;
1909        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1910        project_b
1911            .condition(cx_b, |project, _| project.is_read_only())
1912            .await;
1913        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1914        cx_b.update(|_| {
1915            drop(project_b);
1916        });
1917
1918        // Ensure guests can still join.
1919        let project_b2 = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1920        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1921        project_b2
1922            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1923            .await
1924            .unwrap();
1925    }
1926
1927    #[gpui::test(iterations = 10)]
1928    async fn test_propagate_saves_and_fs_changes(
1929        cx_a: &mut TestAppContext,
1930        cx_b: &mut TestAppContext,
1931        cx_c: &mut TestAppContext,
1932    ) {
1933        let lang_registry = Arc::new(LanguageRegistry::test());
1934        let fs = FakeFs::new(cx_a.background());
1935        cx_a.foreground().forbid_parking();
1936
1937        // Connect to a server as 3 clients.
1938        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1939        let client_a = server.create_client(cx_a, "user_a").await;
1940        let mut client_b = server.create_client(cx_b, "user_b").await;
1941        let mut client_c = server.create_client(cx_c, "user_c").await;
1942        server
1943            .make_contacts(vec![
1944                (&client_a, cx_a),
1945                (&client_b, cx_b),
1946                (&client_c, cx_c),
1947            ])
1948            .await;
1949
1950        // Share a worktree as client A.
1951        fs.insert_tree(
1952            "/a",
1953            json!({
1954                "file1": "",
1955                "file2": ""
1956            }),
1957        )
1958        .await;
1959        let project_a = cx_a.update(|cx| {
1960            Project::local(
1961                client_a.clone(),
1962                client_a.user_store.clone(),
1963                lang_registry.clone(),
1964                fs.clone(),
1965                cx,
1966            )
1967        });
1968        let (worktree_a, _) = project_a
1969            .update(cx_a, |p, cx| {
1970                p.find_or_create_local_worktree("/a", true, cx)
1971            })
1972            .await
1973            .unwrap();
1974        worktree_a
1975            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1976            .await;
1977        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1978
1979        // Join that worktree as clients B and C.
1980        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
1981        let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
1982        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1983        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1984
1985        // Open and edit a buffer as both guests B and C.
1986        let buffer_b = project_b
1987            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1988            .await
1989            .unwrap();
1990        let buffer_c = project_c
1991            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1992            .await
1993            .unwrap();
1994        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1995        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1996
1997        // Open and edit that buffer as the host.
1998        let buffer_a = project_a
1999            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2000            .await
2001            .unwrap();
2002
2003        buffer_a
2004            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
2005            .await;
2006        buffer_a.update(cx_a, |buf, cx| {
2007            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
2008        });
2009
2010        // Wait for edits to propagate
2011        buffer_a
2012            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2013            .await;
2014        buffer_b
2015            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2016            .await;
2017        buffer_c
2018            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2019            .await;
2020
2021        // Edit the buffer as the host and concurrently save as guest B.
2022        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2023        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2024        save_b.await.unwrap();
2025        assert_eq!(
2026            fs.load("/a/file1".as_ref()).await.unwrap(),
2027            "hi-a, i-am-c, i-am-b, i-am-a"
2028        );
2029        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2030        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2031        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2032
2033        worktree_a.flush_fs_events(cx_a).await;
2034
2035        // Make changes on host's file system, see those changes on guest worktrees.
2036        fs.rename(
2037            "/a/file1".as_ref(),
2038            "/a/file1-renamed".as_ref(),
2039            Default::default(),
2040        )
2041        .await
2042        .unwrap();
2043
2044        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2045            .await
2046            .unwrap();
2047        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2048
2049        worktree_a
2050            .condition(&cx_a, |tree, _| {
2051                tree.paths()
2052                    .map(|p| p.to_string_lossy())
2053                    .collect::<Vec<_>>()
2054                    == ["file1-renamed", "file3", "file4"]
2055            })
2056            .await;
2057        worktree_b
2058            .condition(&cx_b, |tree, _| {
2059                tree.paths()
2060                    .map(|p| p.to_string_lossy())
2061                    .collect::<Vec<_>>()
2062                    == ["file1-renamed", "file3", "file4"]
2063            })
2064            .await;
2065        worktree_c
2066            .condition(&cx_c, |tree, _| {
2067                tree.paths()
2068                    .map(|p| p.to_string_lossy())
2069                    .collect::<Vec<_>>()
2070                    == ["file1-renamed", "file3", "file4"]
2071            })
2072            .await;
2073
2074        // Ensure buffer files are updated as well.
2075        buffer_a
2076            .condition(&cx_a, |buf, _| {
2077                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2078            })
2079            .await;
2080        buffer_b
2081            .condition(&cx_b, |buf, _| {
2082                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2083            })
2084            .await;
2085        buffer_c
2086            .condition(&cx_c, |buf, _| {
2087                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2088            })
2089            .await;
2090    }
2091
2092    #[gpui::test(iterations = 10)]
2093    async fn test_fs_operations(
2094        executor: Arc<Deterministic>,
2095        cx_a: &mut TestAppContext,
2096        cx_b: &mut TestAppContext,
2097    ) {
2098        executor.forbid_parking();
2099        let fs = FakeFs::new(cx_a.background());
2100
2101        // Connect to a server as 2 clients.
2102        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2103        let mut client_a = server.create_client(cx_a, "user_a").await;
2104        let mut client_b = server.create_client(cx_b, "user_b").await;
2105        server
2106            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2107            .await;
2108
2109        // Share a project as client A
2110        fs.insert_tree(
2111            "/dir",
2112            json!({
2113                "a.txt": "a-contents",
2114                "b.txt": "b-contents",
2115            }),
2116        )
2117        .await;
2118
2119        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2120        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2121
2122        let worktree_a =
2123            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2124        let worktree_b =
2125            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2126
2127        let entry = project_b
2128            .update(cx_b, |project, cx| {
2129                project
2130                    .create_entry((worktree_id, "c.txt"), false, cx)
2131                    .unwrap()
2132            })
2133            .await
2134            .unwrap();
2135        worktree_a.read_with(cx_a, |worktree, _| {
2136            assert_eq!(
2137                worktree
2138                    .paths()
2139                    .map(|p| p.to_string_lossy())
2140                    .collect::<Vec<_>>(),
2141                ["a.txt", "b.txt", "c.txt"]
2142            );
2143        });
2144        worktree_b.read_with(cx_b, |worktree, _| {
2145            assert_eq!(
2146                worktree
2147                    .paths()
2148                    .map(|p| p.to_string_lossy())
2149                    .collect::<Vec<_>>(),
2150                ["a.txt", "b.txt", "c.txt"]
2151            );
2152        });
2153
2154        project_b
2155            .update(cx_b, |project, cx| {
2156                project.rename_entry(entry.id, Path::new("d.txt"), cx)
2157            })
2158            .unwrap()
2159            .await
2160            .unwrap();
2161        worktree_a.read_with(cx_a, |worktree, _| {
2162            assert_eq!(
2163                worktree
2164                    .paths()
2165                    .map(|p| p.to_string_lossy())
2166                    .collect::<Vec<_>>(),
2167                ["a.txt", "b.txt", "d.txt"]
2168            );
2169        });
2170        worktree_b.read_with(cx_b, |worktree, _| {
2171            assert_eq!(
2172                worktree
2173                    .paths()
2174                    .map(|p| p.to_string_lossy())
2175                    .collect::<Vec<_>>(),
2176                ["a.txt", "b.txt", "d.txt"]
2177            );
2178        });
2179
2180        let dir_entry = project_b
2181            .update(cx_b, |project, cx| {
2182                project
2183                    .create_entry((worktree_id, "DIR"), true, cx)
2184                    .unwrap()
2185            })
2186            .await
2187            .unwrap();
2188        worktree_a.read_with(cx_a, |worktree, _| {
2189            assert_eq!(
2190                worktree
2191                    .paths()
2192                    .map(|p| p.to_string_lossy())
2193                    .collect::<Vec<_>>(),
2194                ["DIR", "a.txt", "b.txt", "d.txt"]
2195            );
2196        });
2197        worktree_b.read_with(cx_b, |worktree, _| {
2198            assert_eq!(
2199                worktree
2200                    .paths()
2201                    .map(|p| p.to_string_lossy())
2202                    .collect::<Vec<_>>(),
2203                ["DIR", "a.txt", "b.txt", "d.txt"]
2204            );
2205        });
2206
2207        project_b
2208            .update(cx_b, |project, cx| {
2209                project.delete_entry(dir_entry.id, cx).unwrap()
2210            })
2211            .await
2212            .unwrap();
2213        worktree_a.read_with(cx_a, |worktree, _| {
2214            assert_eq!(
2215                worktree
2216                    .paths()
2217                    .map(|p| p.to_string_lossy())
2218                    .collect::<Vec<_>>(),
2219                ["a.txt", "b.txt", "d.txt"]
2220            );
2221        });
2222        worktree_b.read_with(cx_b, |worktree, _| {
2223            assert_eq!(
2224                worktree
2225                    .paths()
2226                    .map(|p| p.to_string_lossy())
2227                    .collect::<Vec<_>>(),
2228                ["a.txt", "b.txt", "d.txt"]
2229            );
2230        });
2231
2232        project_b
2233            .update(cx_b, |project, cx| {
2234                project.delete_entry(entry.id, cx).unwrap()
2235            })
2236            .await
2237            .unwrap();
2238        worktree_a.read_with(cx_a, |worktree, _| {
2239            assert_eq!(
2240                worktree
2241                    .paths()
2242                    .map(|p| p.to_string_lossy())
2243                    .collect::<Vec<_>>(),
2244                ["a.txt", "b.txt"]
2245            );
2246        });
2247        worktree_b.read_with(cx_b, |worktree, _| {
2248            assert_eq!(
2249                worktree
2250                    .paths()
2251                    .map(|p| p.to_string_lossy())
2252                    .collect::<Vec<_>>(),
2253                ["a.txt", "b.txt"]
2254            );
2255        });
2256    }
2257
2258    #[gpui::test(iterations = 10)]
2259    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2260        cx_a.foreground().forbid_parking();
2261        let lang_registry = Arc::new(LanguageRegistry::test());
2262        let fs = FakeFs::new(cx_a.background());
2263
2264        // Connect to a server as 2 clients.
2265        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2266        let client_a = server.create_client(cx_a, "user_a").await;
2267        let mut client_b = server.create_client(cx_b, "user_b").await;
2268        server
2269            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2270            .await;
2271
2272        // Share a project as client A
2273        fs.insert_tree(
2274            "/dir",
2275            json!({
2276                "a.txt": "a-contents",
2277            }),
2278        )
2279        .await;
2280
2281        let project_a = cx_a.update(|cx| {
2282            Project::local(
2283                client_a.clone(),
2284                client_a.user_store.clone(),
2285                lang_registry.clone(),
2286                fs.clone(),
2287                cx,
2288            )
2289        });
2290        let (worktree_a, _) = project_a
2291            .update(cx_a, |p, cx| {
2292                p.find_or_create_local_worktree("/dir", true, cx)
2293            })
2294            .await
2295            .unwrap();
2296        worktree_a
2297            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2298            .await;
2299        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2300
2301        // Join that project as client B
2302        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2303
2304        // Open a buffer as client B
2305        let buffer_b = project_b
2306            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2307            .await
2308            .unwrap();
2309
2310        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2311        buffer_b.read_with(cx_b, |buf, _| {
2312            assert!(buf.is_dirty());
2313            assert!(!buf.has_conflict());
2314        });
2315
2316        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2317        buffer_b
2318            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2319            .await;
2320        buffer_b.read_with(cx_b, |buf, _| {
2321            assert!(!buf.has_conflict());
2322        });
2323
2324        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2325        buffer_b.read_with(cx_b, |buf, _| {
2326            assert!(buf.is_dirty());
2327            assert!(!buf.has_conflict());
2328        });
2329    }
2330
2331    #[gpui::test(iterations = 10)]
2332    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2333        cx_a.foreground().forbid_parking();
2334        let lang_registry = Arc::new(LanguageRegistry::test());
2335        let fs = FakeFs::new(cx_a.background());
2336
2337        // Connect to a server as 2 clients.
2338        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2339        let client_a = server.create_client(cx_a, "user_a").await;
2340        let mut client_b = server.create_client(cx_b, "user_b").await;
2341        server
2342            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2343            .await;
2344
2345        // Share a project as client A
2346        fs.insert_tree(
2347            "/dir",
2348            json!({
2349                "a.txt": "a-contents",
2350            }),
2351        )
2352        .await;
2353
2354        let project_a = cx_a.update(|cx| {
2355            Project::local(
2356                client_a.clone(),
2357                client_a.user_store.clone(),
2358                lang_registry.clone(),
2359                fs.clone(),
2360                cx,
2361            )
2362        });
2363        let (worktree_a, _) = project_a
2364            .update(cx_a, |p, cx| {
2365                p.find_or_create_local_worktree("/dir", true, cx)
2366            })
2367            .await
2368            .unwrap();
2369        worktree_a
2370            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2371            .await;
2372        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2373
2374        // Join that project as client B
2375        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2376        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2377
2378        // Open a buffer as client B
2379        let buffer_b = project_b
2380            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2381            .await
2382            .unwrap();
2383        buffer_b.read_with(cx_b, |buf, _| {
2384            assert!(!buf.is_dirty());
2385            assert!(!buf.has_conflict());
2386        });
2387
2388        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2389            .await
2390            .unwrap();
2391        buffer_b
2392            .condition(&cx_b, |buf, _| {
2393                buf.text() == "new contents" && !buf.is_dirty()
2394            })
2395            .await;
2396        buffer_b.read_with(cx_b, |buf, _| {
2397            assert!(!buf.has_conflict());
2398        });
2399    }
2400
2401    #[gpui::test(iterations = 10)]
2402    async fn test_editing_while_guest_opens_buffer(
2403        cx_a: &mut TestAppContext,
2404        cx_b: &mut TestAppContext,
2405    ) {
2406        cx_a.foreground().forbid_parking();
2407        let lang_registry = Arc::new(LanguageRegistry::test());
2408        let fs = FakeFs::new(cx_a.background());
2409
2410        // Connect to a server as 2 clients.
2411        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2412        let client_a = server.create_client(cx_a, "user_a").await;
2413        let mut client_b = server.create_client(cx_b, "user_b").await;
2414        server
2415            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2416            .await;
2417
2418        // Share a project as client A
2419        fs.insert_tree(
2420            "/dir",
2421            json!({
2422                "a.txt": "a-contents",
2423            }),
2424        )
2425        .await;
2426        let project_a = cx_a.update(|cx| {
2427            Project::local(
2428                client_a.clone(),
2429                client_a.user_store.clone(),
2430                lang_registry.clone(),
2431                fs.clone(),
2432                cx,
2433            )
2434        });
2435        let (worktree_a, _) = project_a
2436            .update(cx_a, |p, cx| {
2437                p.find_or_create_local_worktree("/dir", true, cx)
2438            })
2439            .await
2440            .unwrap();
2441        worktree_a
2442            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2443            .await;
2444        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2445
2446        // Join that project as client B
2447        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2448
2449        // Open a buffer as client A
2450        let buffer_a = project_a
2451            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2452            .await
2453            .unwrap();
2454
2455        // Start opening the same buffer as client B
2456        let buffer_b = cx_b
2457            .background()
2458            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2459
2460        // Edit the buffer as client A while client B is still opening it.
2461        cx_b.background().simulate_random_delay().await;
2462        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2463        cx_b.background().simulate_random_delay().await;
2464        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2465
2466        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2467        let buffer_b = buffer_b.await.unwrap();
2468        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2469    }
2470
2471    #[gpui::test(iterations = 10)]
2472    async fn test_leaving_worktree_while_opening_buffer(
2473        cx_a: &mut TestAppContext,
2474        cx_b: &mut TestAppContext,
2475    ) {
2476        cx_a.foreground().forbid_parking();
2477        let lang_registry = Arc::new(LanguageRegistry::test());
2478        let fs = FakeFs::new(cx_a.background());
2479
2480        // Connect to a server as 2 clients.
2481        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2482        let client_a = server.create_client(cx_a, "user_a").await;
2483        let mut client_b = server.create_client(cx_b, "user_b").await;
2484        server
2485            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2486            .await;
2487
2488        // Share a project as client A
2489        fs.insert_tree(
2490            "/dir",
2491            json!({
2492                "a.txt": "a-contents",
2493            }),
2494        )
2495        .await;
2496        let project_a = cx_a.update(|cx| {
2497            Project::local(
2498                client_a.clone(),
2499                client_a.user_store.clone(),
2500                lang_registry.clone(),
2501                fs.clone(),
2502                cx,
2503            )
2504        });
2505        let (worktree_a, _) = project_a
2506            .update(cx_a, |p, cx| {
2507                p.find_or_create_local_worktree("/dir", true, cx)
2508            })
2509            .await
2510            .unwrap();
2511        worktree_a
2512            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2513            .await;
2514        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2515
2516        // Join that project as client B
2517        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2518
2519        // See that a guest has joined as client A.
2520        project_a
2521            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2522            .await;
2523
2524        // Begin opening a buffer as client B, but leave the project before the open completes.
2525        let buffer_b = cx_b
2526            .background()
2527            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2528        cx_b.update(|_| {
2529            drop(client_b.project.take());
2530            drop(project_b);
2531        });
2532        drop(buffer_b);
2533
2534        // See that the guest has left.
2535        project_a
2536            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2537            .await;
2538    }
2539
2540    #[gpui::test(iterations = 10)]
2541    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2542        cx_a.foreground().forbid_parking();
2543        let lang_registry = Arc::new(LanguageRegistry::test());
2544        let fs = FakeFs::new(cx_a.background());
2545
2546        // Connect to a server as 2 clients.
2547        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2548        let client_a = server.create_client(cx_a, "user_a").await;
2549        let mut client_b = server.create_client(cx_b, "user_b").await;
2550        server
2551            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2552            .await;
2553
2554        // Share a project as client A
2555        fs.insert_tree(
2556            "/a",
2557            json!({
2558                "a.txt": "a-contents",
2559                "b.txt": "b-contents",
2560            }),
2561        )
2562        .await;
2563        let project_a = cx_a.update(|cx| {
2564            Project::local(
2565                client_a.clone(),
2566                client_a.user_store.clone(),
2567                lang_registry.clone(),
2568                fs.clone(),
2569                cx,
2570            )
2571        });
2572        let (worktree_a, _) = project_a
2573            .update(cx_a, |p, cx| {
2574                p.find_or_create_local_worktree("/a", true, cx)
2575            })
2576            .await
2577            .unwrap();
2578        worktree_a
2579            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2580            .await;
2581
2582        // Join that project as client B
2583        let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2584
2585        // Client A sees that a guest has joined.
2586        project_a
2587            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2588            .await;
2589
2590        // Drop client B's connection and ensure client A observes client B leaving the project.
2591        client_b.disconnect(&cx_b.to_async()).unwrap();
2592        project_a
2593            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2594            .await;
2595
2596        // Rejoin the project as client B
2597        let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2598
2599        // Client A sees that a guest has re-joined.
2600        project_a
2601            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2602            .await;
2603
2604        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2605        client_b.wait_for_current_user(cx_b).await;
2606        server.disconnect_client(client_b.current_user_id(cx_b));
2607        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2608        project_a
2609            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2610            .await;
2611    }
2612
2613    #[gpui::test(iterations = 10)]
2614    async fn test_collaborating_with_diagnostics(
2615        deterministic: Arc<Deterministic>,
2616        cx_a: &mut TestAppContext,
2617        cx_b: &mut TestAppContext,
2618        cx_c: &mut TestAppContext,
2619    ) {
2620        deterministic.forbid_parking();
2621        let lang_registry = Arc::new(LanguageRegistry::test());
2622        let fs = FakeFs::new(cx_a.background());
2623
2624        // Set up a fake language server.
2625        let mut language = Language::new(
2626            LanguageConfig {
2627                name: "Rust".into(),
2628                path_suffixes: vec!["rs".to_string()],
2629                ..Default::default()
2630            },
2631            Some(tree_sitter_rust::language()),
2632        );
2633        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2634        lang_registry.add(Arc::new(language));
2635
2636        // Connect to a server as 2 clients.
2637        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2638        let client_a = server.create_client(cx_a, "user_a").await;
2639        let mut client_b = server.create_client(cx_b, "user_b").await;
2640        let mut client_c = server.create_client(cx_c, "user_c").await;
2641        server
2642            .make_contacts(vec![
2643                (&client_a, cx_a),
2644                (&client_b, cx_b),
2645                (&client_c, cx_c),
2646            ])
2647            .await;
2648
2649        // Share a project as client A
2650        fs.insert_tree(
2651            "/a",
2652            json!({
2653                "a.rs": "let one = two",
2654                "other.rs": "",
2655            }),
2656        )
2657        .await;
2658        let project_a = cx_a.update(|cx| {
2659            Project::local(
2660                client_a.clone(),
2661                client_a.user_store.clone(),
2662                lang_registry.clone(),
2663                fs.clone(),
2664                cx,
2665            )
2666        });
2667        let (worktree_a, _) = project_a
2668            .update(cx_a, |p, cx| {
2669                p.find_or_create_local_worktree("/a", true, cx)
2670            })
2671            .await
2672            .unwrap();
2673        worktree_a
2674            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2675            .await;
2676        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2677        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2678
2679        // Cause the language server to start.
2680        let _buffer = cx_a
2681            .background()
2682            .spawn(project_a.update(cx_a, |project, cx| {
2683                project.open_buffer(
2684                    ProjectPath {
2685                        worktree_id,
2686                        path: Path::new("other.rs").into(),
2687                    },
2688                    cx,
2689                )
2690            }))
2691            .await
2692            .unwrap();
2693
2694        // Join the worktree as client B.
2695        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2696
2697        // Simulate a language server reporting errors for a file.
2698        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2699        fake_language_server
2700            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2701            .await;
2702        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2703            lsp::PublishDiagnosticsParams {
2704                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2705                version: None,
2706                diagnostics: vec![lsp::Diagnostic {
2707                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2708                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2709                    message: "message 1".to_string(),
2710                    ..Default::default()
2711                }],
2712            },
2713        );
2714
2715        // Wait for server to see the diagnostics update.
2716        deterministic.run_until_parked();
2717        {
2718            let store = server.store.read().await;
2719            let project = store.project(project_id).unwrap();
2720            let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap();
2721            assert!(!worktree.diagnostic_summaries.is_empty());
2722        }
2723
2724        // Ensure client B observes the new diagnostics.
2725        project_b.read_with(cx_b, |project, cx| {
2726            assert_eq!(
2727                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2728                &[(
2729                    ProjectPath {
2730                        worktree_id,
2731                        path: Arc::from(Path::new("a.rs")),
2732                    },
2733                    DiagnosticSummary {
2734                        error_count: 1,
2735                        warning_count: 0,
2736                        ..Default::default()
2737                    },
2738                )]
2739            )
2740        });
2741
2742        // Join project as client C and observe the diagnostics.
2743        let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
2744        project_c.read_with(cx_c, |project, cx| {
2745            assert_eq!(
2746                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2747                &[(
2748                    ProjectPath {
2749                        worktree_id,
2750                        path: Arc::from(Path::new("a.rs")),
2751                    },
2752                    DiagnosticSummary {
2753                        error_count: 1,
2754                        warning_count: 0,
2755                        ..Default::default()
2756                    },
2757                )]
2758            )
2759        });
2760
2761        // Simulate a language server reporting more errors for a file.
2762        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2763            lsp::PublishDiagnosticsParams {
2764                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2765                version: None,
2766                diagnostics: vec![
2767                    lsp::Diagnostic {
2768                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2769                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2770                        message: "message 1".to_string(),
2771                        ..Default::default()
2772                    },
2773                    lsp::Diagnostic {
2774                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2775                        range: lsp::Range::new(
2776                            lsp::Position::new(0, 10),
2777                            lsp::Position::new(0, 13),
2778                        ),
2779                        message: "message 2".to_string(),
2780                        ..Default::default()
2781                    },
2782                ],
2783            },
2784        );
2785
2786        // Clients B and C get the updated summaries
2787        deterministic.run_until_parked();
2788        project_b.read_with(cx_b, |project, cx| {
2789            assert_eq!(
2790                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2791                [(
2792                    ProjectPath {
2793                        worktree_id,
2794                        path: Arc::from(Path::new("a.rs")),
2795                    },
2796                    DiagnosticSummary {
2797                        error_count: 1,
2798                        warning_count: 1,
2799                        ..Default::default()
2800                    },
2801                )]
2802            );
2803        });
2804        project_c.read_with(cx_c, |project, cx| {
2805            assert_eq!(
2806                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2807                [(
2808                    ProjectPath {
2809                        worktree_id,
2810                        path: Arc::from(Path::new("a.rs")),
2811                    },
2812                    DiagnosticSummary {
2813                        error_count: 1,
2814                        warning_count: 1,
2815                        ..Default::default()
2816                    },
2817                )]
2818            );
2819        });
2820
2821        // Open the file with the errors on client B. They should be present.
2822        let buffer_b = cx_b
2823            .background()
2824            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2825            .await
2826            .unwrap();
2827
2828        buffer_b.read_with(cx_b, |buffer, _| {
2829            assert_eq!(
2830                buffer
2831                    .snapshot()
2832                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2833                    .map(|entry| entry)
2834                    .collect::<Vec<_>>(),
2835                &[
2836                    DiagnosticEntry {
2837                        range: Point::new(0, 4)..Point::new(0, 7),
2838                        diagnostic: Diagnostic {
2839                            group_id: 0,
2840                            message: "message 1".to_string(),
2841                            severity: lsp::DiagnosticSeverity::ERROR,
2842                            is_primary: true,
2843                            ..Default::default()
2844                        }
2845                    },
2846                    DiagnosticEntry {
2847                        range: Point::new(0, 10)..Point::new(0, 13),
2848                        diagnostic: Diagnostic {
2849                            group_id: 1,
2850                            severity: lsp::DiagnosticSeverity::WARNING,
2851                            message: "message 2".to_string(),
2852                            is_primary: true,
2853                            ..Default::default()
2854                        }
2855                    }
2856                ]
2857            );
2858        });
2859
2860        // Simulate a language server reporting no errors for a file.
2861        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2862            lsp::PublishDiagnosticsParams {
2863                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2864                version: None,
2865                diagnostics: vec![],
2866            },
2867        );
2868        deterministic.run_until_parked();
2869        project_a.read_with(cx_a, |project, cx| {
2870            assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
2871        });
2872        project_b.read_with(cx_b, |project, cx| {
2873            assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
2874        });
2875        project_c.read_with(cx_c, |project, cx| {
2876            assert_eq!(project.diagnostic_summaries(cx).collect::<Vec<_>>(), [])
2877        });
2878    }
2879
2880    #[gpui::test(iterations = 10)]
2881    async fn test_collaborating_with_completion(
2882        cx_a: &mut TestAppContext,
2883        cx_b: &mut TestAppContext,
2884    ) {
2885        cx_a.foreground().forbid_parking();
2886        let lang_registry = Arc::new(LanguageRegistry::test());
2887        let fs = FakeFs::new(cx_a.background());
2888
2889        // Set up a fake language server.
2890        let mut language = Language::new(
2891            LanguageConfig {
2892                name: "Rust".into(),
2893                path_suffixes: vec!["rs".to_string()],
2894                ..Default::default()
2895            },
2896            Some(tree_sitter_rust::language()),
2897        );
2898        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2899            capabilities: lsp::ServerCapabilities {
2900                completion_provider: Some(lsp::CompletionOptions {
2901                    trigger_characters: Some(vec![".".to_string()]),
2902                    ..Default::default()
2903                }),
2904                ..Default::default()
2905            },
2906            ..Default::default()
2907        });
2908        lang_registry.add(Arc::new(language));
2909
2910        // Connect to a server as 2 clients.
2911        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2912        let client_a = server.create_client(cx_a, "user_a").await;
2913        let mut client_b = server.create_client(cx_b, "user_b").await;
2914        server
2915            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2916            .await;
2917
2918        // Share a project as client A
2919        fs.insert_tree(
2920            "/a",
2921            json!({
2922                "main.rs": "fn main() { a }",
2923                "other.rs": "",
2924            }),
2925        )
2926        .await;
2927        let project_a = cx_a.update(|cx| {
2928            Project::local(
2929                client_a.clone(),
2930                client_a.user_store.clone(),
2931                lang_registry.clone(),
2932                fs.clone(),
2933                cx,
2934            )
2935        });
2936        let (worktree_a, _) = project_a
2937            .update(cx_a, |p, cx| {
2938                p.find_or_create_local_worktree("/a", true, cx)
2939            })
2940            .await
2941            .unwrap();
2942        worktree_a
2943            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2944            .await;
2945        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2946
2947        // Join the worktree as client B.
2948        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
2949
2950        // Open a file in an editor as the guest.
2951        let buffer_b = project_b
2952            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2953            .await
2954            .unwrap();
2955        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2956        let editor_b = cx_b.add_view(window_b, |cx| {
2957            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2958        });
2959
2960        let fake_language_server = fake_language_servers.next().await.unwrap();
2961        buffer_b
2962            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2963            .await;
2964
2965        // Type a completion trigger character as the guest.
2966        editor_b.update(cx_b, |editor, cx| {
2967            editor.select_ranges([13..13], None, cx);
2968            editor.handle_input(&Input(".".into()), cx);
2969            cx.focus(&editor_b);
2970        });
2971
2972        // Receive a completion request as the host's language server.
2973        // Return some completions from the host's language server.
2974        cx_a.foreground().start_waiting();
2975        fake_language_server
2976            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2977                assert_eq!(
2978                    params.text_document_position.text_document.uri,
2979                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2980                );
2981                assert_eq!(
2982                    params.text_document_position.position,
2983                    lsp::Position::new(0, 14),
2984                );
2985
2986                Ok(Some(lsp::CompletionResponse::Array(vec![
2987                    lsp::CompletionItem {
2988                        label: "first_method(…)".into(),
2989                        detail: Some("fn(&mut self, B) -> C".into()),
2990                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2991                            new_text: "first_method($1)".to_string(),
2992                            range: lsp::Range::new(
2993                                lsp::Position::new(0, 14),
2994                                lsp::Position::new(0, 14),
2995                            ),
2996                        })),
2997                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2998                        ..Default::default()
2999                    },
3000                    lsp::CompletionItem {
3001                        label: "second_method(…)".into(),
3002                        detail: Some("fn(&mut self, C) -> D<E>".into()),
3003                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3004                            new_text: "second_method()".to_string(),
3005                            range: lsp::Range::new(
3006                                lsp::Position::new(0, 14),
3007                                lsp::Position::new(0, 14),
3008                            ),
3009                        })),
3010                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3011                        ..Default::default()
3012                    },
3013                ])))
3014            })
3015            .next()
3016            .await
3017            .unwrap();
3018        cx_a.foreground().finish_waiting();
3019
3020        // Open the buffer on the host.
3021        let buffer_a = project_a
3022            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3023            .await
3024            .unwrap();
3025        buffer_a
3026            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3027            .await;
3028
3029        // Confirm a completion on the guest.
3030        editor_b
3031            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3032            .await;
3033        editor_b.update(cx_b, |editor, cx| {
3034            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3035            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3036        });
3037
3038        // Return a resolved completion from the host's language server.
3039        // The resolved completion has an additional text edit.
3040        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3041            |params, _| async move {
3042                assert_eq!(params.label, "first_method(…)");
3043                Ok(lsp::CompletionItem {
3044                    label: "first_method(…)".into(),
3045                    detail: Some("fn(&mut self, B) -> C".into()),
3046                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3047                        new_text: "first_method($1)".to_string(),
3048                        range: lsp::Range::new(
3049                            lsp::Position::new(0, 14),
3050                            lsp::Position::new(0, 14),
3051                        ),
3052                    })),
3053                    additional_text_edits: Some(vec![lsp::TextEdit {
3054                        new_text: "use d::SomeTrait;\n".to_string(),
3055                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3056                    }]),
3057                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3058                    ..Default::default()
3059                })
3060            },
3061        );
3062
3063        // The additional edit is applied.
3064        buffer_a
3065            .condition(&cx_a, |buffer, _| {
3066                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3067            })
3068            .await;
3069        buffer_b
3070            .condition(&cx_b, |buffer, _| {
3071                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3072            })
3073            .await;
3074    }
3075
3076    #[gpui::test(iterations = 10)]
3077    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3078        cx_a.foreground().forbid_parking();
3079        let lang_registry = Arc::new(LanguageRegistry::test());
3080        let fs = FakeFs::new(cx_a.background());
3081
3082        // Connect to a server as 2 clients.
3083        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3084        let client_a = server.create_client(cx_a, "user_a").await;
3085        let mut client_b = server.create_client(cx_b, "user_b").await;
3086        server
3087            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3088            .await;
3089
3090        // Share a project as client A
3091        fs.insert_tree(
3092            "/a",
3093            json!({
3094                "a.rs": "let one = 1;",
3095            }),
3096        )
3097        .await;
3098        let project_a = cx_a.update(|cx| {
3099            Project::local(
3100                client_a.clone(),
3101                client_a.user_store.clone(),
3102                lang_registry.clone(),
3103                fs.clone(),
3104                cx,
3105            )
3106        });
3107        let (worktree_a, _) = project_a
3108            .update(cx_a, |p, cx| {
3109                p.find_or_create_local_worktree("/a", true, cx)
3110            })
3111            .await
3112            .unwrap();
3113        worktree_a
3114            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3115            .await;
3116        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3117        let buffer_a = project_a
3118            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3119            .await
3120            .unwrap();
3121
3122        // Join the worktree as client B.
3123        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3124
3125        let buffer_b = cx_b
3126            .background()
3127            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3128            .await
3129            .unwrap();
3130        buffer_b.update(cx_b, |buffer, cx| {
3131            buffer.edit([(4..7, "six")], cx);
3132            buffer.edit([(10..11, "6")], cx);
3133            assert_eq!(buffer.text(), "let six = 6;");
3134            assert!(buffer.is_dirty());
3135            assert!(!buffer.has_conflict());
3136        });
3137        buffer_a
3138            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3139            .await;
3140
3141        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3142            .await
3143            .unwrap();
3144        buffer_a
3145            .condition(cx_a, |buffer, _| buffer.has_conflict())
3146            .await;
3147        buffer_b
3148            .condition(cx_b, |buffer, _| buffer.has_conflict())
3149            .await;
3150
3151        project_b
3152            .update(cx_b, |project, cx| {
3153                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3154            })
3155            .await
3156            .unwrap();
3157        buffer_a.read_with(cx_a, |buffer, _| {
3158            assert_eq!(buffer.text(), "let seven = 7;");
3159            assert!(!buffer.is_dirty());
3160            assert!(!buffer.has_conflict());
3161        });
3162        buffer_b.read_with(cx_b, |buffer, _| {
3163            assert_eq!(buffer.text(), "let seven = 7;");
3164            assert!(!buffer.is_dirty());
3165            assert!(!buffer.has_conflict());
3166        });
3167
3168        buffer_a.update(cx_a, |buffer, cx| {
3169            // Undoing on the host is a no-op when the reload was initiated by the guest.
3170            buffer.undo(cx);
3171            assert_eq!(buffer.text(), "let seven = 7;");
3172            assert!(!buffer.is_dirty());
3173            assert!(!buffer.has_conflict());
3174        });
3175        buffer_b.update(cx_b, |buffer, cx| {
3176            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3177            buffer.undo(cx);
3178            assert_eq!(buffer.text(), "let six = 6;");
3179            assert!(buffer.is_dirty());
3180            assert!(!buffer.has_conflict());
3181        });
3182    }
3183
3184    #[gpui::test(iterations = 10)]
3185    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3186        cx_a.foreground().forbid_parking();
3187        let lang_registry = Arc::new(LanguageRegistry::test());
3188        let fs = FakeFs::new(cx_a.background());
3189
3190        // Set up a fake language server.
3191        let mut language = Language::new(
3192            LanguageConfig {
3193                name: "Rust".into(),
3194                path_suffixes: vec!["rs".to_string()],
3195                ..Default::default()
3196            },
3197            Some(tree_sitter_rust::language()),
3198        );
3199        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3200        lang_registry.add(Arc::new(language));
3201
3202        // Connect to a server as 2 clients.
3203        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3204        let client_a = server.create_client(cx_a, "user_a").await;
3205        let mut client_b = server.create_client(cx_b, "user_b").await;
3206        server
3207            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3208            .await;
3209
3210        // Share a project as client A
3211        fs.insert_tree(
3212            "/a",
3213            json!({
3214                "a.rs": "let one = two",
3215            }),
3216        )
3217        .await;
3218        let project_a = cx_a.update(|cx| {
3219            Project::local(
3220                client_a.clone(),
3221                client_a.user_store.clone(),
3222                lang_registry.clone(),
3223                fs.clone(),
3224                cx,
3225            )
3226        });
3227        let (worktree_a, _) = project_a
3228            .update(cx_a, |p, cx| {
3229                p.find_or_create_local_worktree("/a", true, cx)
3230            })
3231            .await
3232            .unwrap();
3233        worktree_a
3234            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3235            .await;
3236        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3237
3238        // Join the project as client B.
3239        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3240
3241        let buffer_b = cx_b
3242            .background()
3243            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3244            .await
3245            .unwrap();
3246
3247        let fake_language_server = fake_language_servers.next().await.unwrap();
3248        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3249            Ok(Some(vec![
3250                lsp::TextEdit {
3251                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3252                    new_text: "h".to_string(),
3253                },
3254                lsp::TextEdit {
3255                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3256                    new_text: "y".to_string(),
3257                },
3258            ]))
3259        });
3260
3261        project_b
3262            .update(cx_b, |project, cx| {
3263                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3264            })
3265            .await
3266            .unwrap();
3267        assert_eq!(
3268            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3269            "let honey = two"
3270        );
3271    }
3272
3273    #[gpui::test(iterations = 10)]
3274    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3275        cx_a.foreground().forbid_parking();
3276        let lang_registry = Arc::new(LanguageRegistry::test());
3277        let fs = FakeFs::new(cx_a.background());
3278        fs.insert_tree(
3279            "/root-1",
3280            json!({
3281                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3282            }),
3283        )
3284        .await;
3285        fs.insert_tree(
3286            "/root-2",
3287            json!({
3288                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3289            }),
3290        )
3291        .await;
3292
3293        // Set up a fake language server.
3294        let mut language = Language::new(
3295            LanguageConfig {
3296                name: "Rust".into(),
3297                path_suffixes: vec!["rs".to_string()],
3298                ..Default::default()
3299            },
3300            Some(tree_sitter_rust::language()),
3301        );
3302        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3303        lang_registry.add(Arc::new(language));
3304
3305        // Connect to a server as 2 clients.
3306        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3307        let client_a = server.create_client(cx_a, "user_a").await;
3308        let mut client_b = server.create_client(cx_b, "user_b").await;
3309        server
3310            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3311            .await;
3312
3313        // Share a project as client A
3314        let project_a = cx_a.update(|cx| {
3315            Project::local(
3316                client_a.clone(),
3317                client_a.user_store.clone(),
3318                lang_registry.clone(),
3319                fs.clone(),
3320                cx,
3321            )
3322        });
3323        let (worktree_a, _) = project_a
3324            .update(cx_a, |p, cx| {
3325                p.find_or_create_local_worktree("/root-1", true, cx)
3326            })
3327            .await
3328            .unwrap();
3329        worktree_a
3330            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3331            .await;
3332        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3333
3334        // Join the project as client B.
3335        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3336
3337        // Open the file on client B.
3338        let buffer_b = cx_b
3339            .background()
3340            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3341            .await
3342            .unwrap();
3343
3344        // Request the definition of a symbol as the guest.
3345        let fake_language_server = fake_language_servers.next().await.unwrap();
3346        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3347            |_, _| async move {
3348                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3349                    lsp::Location::new(
3350                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3351                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3352                    ),
3353                )))
3354            },
3355        );
3356
3357        let definitions_1 = project_b
3358            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3359            .await
3360            .unwrap();
3361        cx_b.read(|cx| {
3362            assert_eq!(definitions_1.len(), 1);
3363            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3364            let target_buffer = definitions_1[0].buffer.read(cx);
3365            assert_eq!(
3366                target_buffer.text(),
3367                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3368            );
3369            assert_eq!(
3370                definitions_1[0].range.to_point(target_buffer),
3371                Point::new(0, 6)..Point::new(0, 9)
3372            );
3373        });
3374
3375        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3376        // the previous call to `definition`.
3377        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3378            |_, _| async move {
3379                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3380                    lsp::Location::new(
3381                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3382                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3383                    ),
3384                )))
3385            },
3386        );
3387
3388        let definitions_2 = project_b
3389            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3390            .await
3391            .unwrap();
3392        cx_b.read(|cx| {
3393            assert_eq!(definitions_2.len(), 1);
3394            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3395            let target_buffer = definitions_2[0].buffer.read(cx);
3396            assert_eq!(
3397                target_buffer.text(),
3398                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3399            );
3400            assert_eq!(
3401                definitions_2[0].range.to_point(target_buffer),
3402                Point::new(1, 6)..Point::new(1, 11)
3403            );
3404        });
3405        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3406    }
3407
3408    #[gpui::test(iterations = 10)]
3409    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3410        cx_a.foreground().forbid_parking();
3411        let lang_registry = Arc::new(LanguageRegistry::test());
3412        let fs = FakeFs::new(cx_a.background());
3413        fs.insert_tree(
3414            "/root-1",
3415            json!({
3416                "one.rs": "const ONE: usize = 1;",
3417                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3418            }),
3419        )
3420        .await;
3421        fs.insert_tree(
3422            "/root-2",
3423            json!({
3424                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3425            }),
3426        )
3427        .await;
3428
3429        // Set up a fake language server.
3430        let mut language = Language::new(
3431            LanguageConfig {
3432                name: "Rust".into(),
3433                path_suffixes: vec!["rs".to_string()],
3434                ..Default::default()
3435            },
3436            Some(tree_sitter_rust::language()),
3437        );
3438        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3439        lang_registry.add(Arc::new(language));
3440
3441        // Connect to a server as 2 clients.
3442        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3443        let client_a = server.create_client(cx_a, "user_a").await;
3444        let mut client_b = server.create_client(cx_b, "user_b").await;
3445        server
3446            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3447            .await;
3448
3449        // Share a project as client A
3450        let project_a = cx_a.update(|cx| {
3451            Project::local(
3452                client_a.clone(),
3453                client_a.user_store.clone(),
3454                lang_registry.clone(),
3455                fs.clone(),
3456                cx,
3457            )
3458        });
3459        let (worktree_a, _) = project_a
3460            .update(cx_a, |p, cx| {
3461                p.find_or_create_local_worktree("/root-1", true, cx)
3462            })
3463            .await
3464            .unwrap();
3465        worktree_a
3466            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3467            .await;
3468        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3469
3470        // Join the worktree as client B.
3471        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3472
3473        // Open the file on client B.
3474        let buffer_b = cx_b
3475            .background()
3476            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3477            .await
3478            .unwrap();
3479
3480        // Request references to a symbol as the guest.
3481        let fake_language_server = fake_language_servers.next().await.unwrap();
3482        fake_language_server.handle_request::<lsp::request::References, _, _>(
3483            |params, _| async move {
3484                assert_eq!(
3485                    params.text_document_position.text_document.uri.as_str(),
3486                    "file:///root-1/one.rs"
3487                );
3488                Ok(Some(vec![
3489                    lsp::Location {
3490                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3491                        range: lsp::Range::new(
3492                            lsp::Position::new(0, 24),
3493                            lsp::Position::new(0, 27),
3494                        ),
3495                    },
3496                    lsp::Location {
3497                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3498                        range: lsp::Range::new(
3499                            lsp::Position::new(0, 35),
3500                            lsp::Position::new(0, 38),
3501                        ),
3502                    },
3503                    lsp::Location {
3504                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3505                        range: lsp::Range::new(
3506                            lsp::Position::new(0, 37),
3507                            lsp::Position::new(0, 40),
3508                        ),
3509                    },
3510                ]))
3511            },
3512        );
3513
3514        let references = project_b
3515            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3516            .await
3517            .unwrap();
3518        cx_b.read(|cx| {
3519            assert_eq!(references.len(), 3);
3520            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3521
3522            let two_buffer = references[0].buffer.read(cx);
3523            let three_buffer = references[2].buffer.read(cx);
3524            assert_eq!(
3525                two_buffer.file().unwrap().path().as_ref(),
3526                Path::new("two.rs")
3527            );
3528            assert_eq!(references[1].buffer, references[0].buffer);
3529            assert_eq!(
3530                three_buffer.file().unwrap().full_path(cx),
3531                Path::new("three.rs")
3532            );
3533
3534            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3535            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3536            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3537        });
3538    }
3539
3540    #[gpui::test(iterations = 10)]
3541    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3542        cx_a.foreground().forbid_parking();
3543        let lang_registry = Arc::new(LanguageRegistry::test());
3544        let fs = FakeFs::new(cx_a.background());
3545        fs.insert_tree(
3546            "/root-1",
3547            json!({
3548                "a": "hello world",
3549                "b": "goodnight moon",
3550                "c": "a world of goo",
3551                "d": "world champion of clown world",
3552            }),
3553        )
3554        .await;
3555        fs.insert_tree(
3556            "/root-2",
3557            json!({
3558                "e": "disney world is fun",
3559            }),
3560        )
3561        .await;
3562
3563        // Connect to a server as 2 clients.
3564        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3565        let client_a = server.create_client(cx_a, "user_a").await;
3566        let mut client_b = server.create_client(cx_b, "user_b").await;
3567        server
3568            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3569            .await;
3570
3571        // Share a project as client A
3572        let project_a = cx_a.update(|cx| {
3573            Project::local(
3574                client_a.clone(),
3575                client_a.user_store.clone(),
3576                lang_registry.clone(),
3577                fs.clone(),
3578                cx,
3579            )
3580        });
3581
3582        let (worktree_1, _) = project_a
3583            .update(cx_a, |p, cx| {
3584                p.find_or_create_local_worktree("/root-1", true, cx)
3585            })
3586            .await
3587            .unwrap();
3588        worktree_1
3589            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3590            .await;
3591        let (worktree_2, _) = project_a
3592            .update(cx_a, |p, cx| {
3593                p.find_or_create_local_worktree("/root-2", true, cx)
3594            })
3595            .await
3596            .unwrap();
3597        worktree_2
3598            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3599            .await;
3600
3601        // Join the worktree as client B.
3602        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3603        let results = project_b
3604            .update(cx_b, |project, cx| {
3605                project.search(SearchQuery::text("world", false, false), cx)
3606            })
3607            .await
3608            .unwrap();
3609
3610        let mut ranges_by_path = results
3611            .into_iter()
3612            .map(|(buffer, ranges)| {
3613                buffer.read_with(cx_b, |buffer, cx| {
3614                    let path = buffer.file().unwrap().full_path(cx);
3615                    let offset_ranges = ranges
3616                        .into_iter()
3617                        .map(|range| range.to_offset(buffer))
3618                        .collect::<Vec<_>>();
3619                    (path, offset_ranges)
3620                })
3621            })
3622            .collect::<Vec<_>>();
3623        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3624
3625        assert_eq!(
3626            ranges_by_path,
3627            &[
3628                (PathBuf::from("root-1/a"), vec![6..11]),
3629                (PathBuf::from("root-1/c"), vec![2..7]),
3630                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3631                (PathBuf::from("root-2/e"), vec![7..12]),
3632            ]
3633        );
3634    }
3635
3636    #[gpui::test(iterations = 10)]
3637    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3638        cx_a.foreground().forbid_parking();
3639        let lang_registry = Arc::new(LanguageRegistry::test());
3640        let fs = FakeFs::new(cx_a.background());
3641        fs.insert_tree(
3642            "/root-1",
3643            json!({
3644                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3645            }),
3646        )
3647        .await;
3648
3649        // Set up a fake language server.
3650        let mut language = Language::new(
3651            LanguageConfig {
3652                name: "Rust".into(),
3653                path_suffixes: vec!["rs".to_string()],
3654                ..Default::default()
3655            },
3656            Some(tree_sitter_rust::language()),
3657        );
3658        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3659        lang_registry.add(Arc::new(language));
3660
3661        // Connect to a server as 2 clients.
3662        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3663        let client_a = server.create_client(cx_a, "user_a").await;
3664        let mut client_b = server.create_client(cx_b, "user_b").await;
3665        server
3666            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3667            .await;
3668
3669        // Share a project as client A
3670        let project_a = cx_a.update(|cx| {
3671            Project::local(
3672                client_a.clone(),
3673                client_a.user_store.clone(),
3674                lang_registry.clone(),
3675                fs.clone(),
3676                cx,
3677            )
3678        });
3679        let (worktree_a, _) = project_a
3680            .update(cx_a, |p, cx| {
3681                p.find_or_create_local_worktree("/root-1", true, cx)
3682            })
3683            .await
3684            .unwrap();
3685        worktree_a
3686            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3687            .await;
3688        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3689
3690        // Join the worktree as client B.
3691        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3692
3693        // Open the file on client B.
3694        let buffer_b = cx_b
3695            .background()
3696            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3697            .await
3698            .unwrap();
3699
3700        // Request document highlights as the guest.
3701        let fake_language_server = fake_language_servers.next().await.unwrap();
3702        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3703            |params, _| async move {
3704                assert_eq!(
3705                    params
3706                        .text_document_position_params
3707                        .text_document
3708                        .uri
3709                        .as_str(),
3710                    "file:///root-1/main.rs"
3711                );
3712                assert_eq!(
3713                    params.text_document_position_params.position,
3714                    lsp::Position::new(0, 34)
3715                );
3716                Ok(Some(vec![
3717                    lsp::DocumentHighlight {
3718                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3719                        range: lsp::Range::new(
3720                            lsp::Position::new(0, 10),
3721                            lsp::Position::new(0, 16),
3722                        ),
3723                    },
3724                    lsp::DocumentHighlight {
3725                        kind: Some(lsp::DocumentHighlightKind::READ),
3726                        range: lsp::Range::new(
3727                            lsp::Position::new(0, 32),
3728                            lsp::Position::new(0, 38),
3729                        ),
3730                    },
3731                    lsp::DocumentHighlight {
3732                        kind: Some(lsp::DocumentHighlightKind::READ),
3733                        range: lsp::Range::new(
3734                            lsp::Position::new(0, 41),
3735                            lsp::Position::new(0, 47),
3736                        ),
3737                    },
3738                ]))
3739            },
3740        );
3741
3742        let highlights = project_b
3743            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3744            .await
3745            .unwrap();
3746        buffer_b.read_with(cx_b, |buffer, _| {
3747            let snapshot = buffer.snapshot();
3748
3749            let highlights = highlights
3750                .into_iter()
3751                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3752                .collect::<Vec<_>>();
3753            assert_eq!(
3754                highlights,
3755                &[
3756                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3757                    (lsp::DocumentHighlightKind::READ, 32..38),
3758                    (lsp::DocumentHighlightKind::READ, 41..47)
3759                ]
3760            )
3761        });
3762    }
3763
3764    #[gpui::test(iterations = 10)]
3765    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3766        cx_a.foreground().forbid_parking();
3767        let lang_registry = Arc::new(LanguageRegistry::test());
3768        let fs = FakeFs::new(cx_a.background());
3769        fs.insert_tree(
3770            "/code",
3771            json!({
3772                "crate-1": {
3773                    "one.rs": "const ONE: usize = 1;",
3774                },
3775                "crate-2": {
3776                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3777                },
3778                "private": {
3779                    "passwords.txt": "the-password",
3780                }
3781            }),
3782        )
3783        .await;
3784
3785        // Set up a fake language server.
3786        let mut language = Language::new(
3787            LanguageConfig {
3788                name: "Rust".into(),
3789                path_suffixes: vec!["rs".to_string()],
3790                ..Default::default()
3791            },
3792            Some(tree_sitter_rust::language()),
3793        );
3794        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3795        lang_registry.add(Arc::new(language));
3796
3797        // Connect to a server as 2 clients.
3798        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3799        let client_a = server.create_client(cx_a, "user_a").await;
3800        let mut client_b = server.create_client(cx_b, "user_b").await;
3801        server
3802            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3803            .await;
3804
3805        // Share a project as client A
3806        let project_a = cx_a.update(|cx| {
3807            Project::local(
3808                client_a.clone(),
3809                client_a.user_store.clone(),
3810                lang_registry.clone(),
3811                fs.clone(),
3812                cx,
3813            )
3814        });
3815        let (worktree_a, _) = project_a
3816            .update(cx_a, |p, cx| {
3817                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3818            })
3819            .await
3820            .unwrap();
3821        worktree_a
3822            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3823            .await;
3824        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3825
3826        // Join the worktree as client B.
3827        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3828
3829        // Cause the language server to start.
3830        let _buffer = cx_b
3831            .background()
3832            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3833            .await
3834            .unwrap();
3835
3836        let fake_language_server = fake_language_servers.next().await.unwrap();
3837        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3838            |_, _| async move {
3839                #[allow(deprecated)]
3840                Ok(Some(vec![lsp::SymbolInformation {
3841                    name: "TWO".into(),
3842                    location: lsp::Location {
3843                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3844                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3845                    },
3846                    kind: lsp::SymbolKind::CONSTANT,
3847                    tags: None,
3848                    container_name: None,
3849                    deprecated: None,
3850                }]))
3851            },
3852        );
3853
3854        // Request the definition of a symbol as the guest.
3855        let symbols = project_b
3856            .update(cx_b, |p, cx| p.symbols("two", cx))
3857            .await
3858            .unwrap();
3859        assert_eq!(symbols.len(), 1);
3860        assert_eq!(symbols[0].name, "TWO");
3861
3862        // Open one of the returned symbols.
3863        let buffer_b_2 = project_b
3864            .update(cx_b, |project, cx| {
3865                project.open_buffer_for_symbol(&symbols[0], cx)
3866            })
3867            .await
3868            .unwrap();
3869        buffer_b_2.read_with(cx_b, |buffer, _| {
3870            assert_eq!(
3871                buffer.file().unwrap().path().as_ref(),
3872                Path::new("../crate-2/two.rs")
3873            );
3874        });
3875
3876        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3877        let mut fake_symbol = symbols[0].clone();
3878        fake_symbol.path = Path::new("/code/secrets").into();
3879        let error = project_b
3880            .update(cx_b, |project, cx| {
3881                project.open_buffer_for_symbol(&fake_symbol, cx)
3882            })
3883            .await
3884            .unwrap_err();
3885        assert!(error.to_string().contains("invalid symbol signature"));
3886    }
3887
3888    #[gpui::test(iterations = 10)]
3889    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3890        cx_a: &mut TestAppContext,
3891        cx_b: &mut TestAppContext,
3892        mut rng: StdRng,
3893    ) {
3894        cx_a.foreground().forbid_parking();
3895        let lang_registry = Arc::new(LanguageRegistry::test());
3896        let fs = FakeFs::new(cx_a.background());
3897        fs.insert_tree(
3898            "/root",
3899            json!({
3900                "a.rs": "const ONE: usize = b::TWO;",
3901                "b.rs": "const TWO: usize = 2",
3902            }),
3903        )
3904        .await;
3905
3906        // Set up a fake language server.
3907        let mut language = Language::new(
3908            LanguageConfig {
3909                name: "Rust".into(),
3910                path_suffixes: vec!["rs".to_string()],
3911                ..Default::default()
3912            },
3913            Some(tree_sitter_rust::language()),
3914        );
3915        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3916        lang_registry.add(Arc::new(language));
3917
3918        // Connect to a server as 2 clients.
3919        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3920        let client_a = server.create_client(cx_a, "user_a").await;
3921        let mut client_b = server.create_client(cx_b, "user_b").await;
3922        server
3923            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3924            .await;
3925
3926        // Share a project as client A
3927        let project_a = cx_a.update(|cx| {
3928            Project::local(
3929                client_a.clone(),
3930                client_a.user_store.clone(),
3931                lang_registry.clone(),
3932                fs.clone(),
3933                cx,
3934            )
3935        });
3936
3937        let (worktree_a, _) = project_a
3938            .update(cx_a, |p, cx| {
3939                p.find_or_create_local_worktree("/root", true, cx)
3940            })
3941            .await
3942            .unwrap();
3943        worktree_a
3944            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3945            .await;
3946        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3947
3948        // Join the project as client B.
3949        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
3950
3951        let buffer_b1 = cx_b
3952            .background()
3953            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3954            .await
3955            .unwrap();
3956
3957        let fake_language_server = fake_language_servers.next().await.unwrap();
3958        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3959            |_, _| async move {
3960                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3961                    lsp::Location::new(
3962                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
3963                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3964                    ),
3965                )))
3966            },
3967        );
3968
3969        let definitions;
3970        let buffer_b2;
3971        if rng.gen() {
3972            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3973            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3974        } else {
3975            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3976            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3977        }
3978
3979        let buffer_b2 = buffer_b2.await.unwrap();
3980        let definitions = definitions.await.unwrap();
3981        assert_eq!(definitions.len(), 1);
3982        assert_eq!(definitions[0].buffer, buffer_b2);
3983    }
3984
3985    #[gpui::test(iterations = 10)]
3986    async fn test_collaborating_with_code_actions(
3987        cx_a: &mut TestAppContext,
3988        cx_b: &mut TestAppContext,
3989    ) {
3990        cx_a.foreground().forbid_parking();
3991        let lang_registry = Arc::new(LanguageRegistry::test());
3992        let fs = FakeFs::new(cx_a.background());
3993        cx_b.update(|cx| editor::init(cx));
3994
3995        // Set up a fake language server.
3996        let mut language = Language::new(
3997            LanguageConfig {
3998                name: "Rust".into(),
3999                path_suffixes: vec!["rs".to_string()],
4000                ..Default::default()
4001            },
4002            Some(tree_sitter_rust::language()),
4003        );
4004        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4005        lang_registry.add(Arc::new(language));
4006
4007        // Connect to a server as 2 clients.
4008        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4009        let client_a = server.create_client(cx_a, "user_a").await;
4010        let mut client_b = server.create_client(cx_b, "user_b").await;
4011        server
4012            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4013            .await;
4014
4015        // Share a project as client A
4016        fs.insert_tree(
4017            "/a",
4018            json!({
4019                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4020                "other.rs": "pub fn foo() -> usize { 4 }",
4021            }),
4022        )
4023        .await;
4024        let project_a = cx_a.update(|cx| {
4025            Project::local(
4026                client_a.clone(),
4027                client_a.user_store.clone(),
4028                lang_registry.clone(),
4029                fs.clone(),
4030                cx,
4031            )
4032        });
4033        let (worktree_a, _) = project_a
4034            .update(cx_a, |p, cx| {
4035                p.find_or_create_local_worktree("/a", true, cx)
4036            })
4037            .await
4038            .unwrap();
4039        worktree_a
4040            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4041            .await;
4042        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4043
4044        // Join the project as client B.
4045        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4046        let mut params = cx_b.update(WorkspaceParams::test);
4047        params.languages = lang_registry.clone();
4048        params.project = project_b.clone();
4049        params.client = client_b.client.clone();
4050        params.user_store = client_b.user_store.clone();
4051
4052        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4053        let editor_b = workspace_b
4054            .update(cx_b, |workspace, cx| {
4055                workspace.open_path((worktree_id, "main.rs"), true, cx)
4056            })
4057            .await
4058            .unwrap()
4059            .downcast::<Editor>()
4060            .unwrap();
4061
4062        let mut fake_language_server = fake_language_servers.next().await.unwrap();
4063        fake_language_server
4064            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4065                assert_eq!(
4066                    params.text_document.uri,
4067                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4068                );
4069                assert_eq!(params.range.start, lsp::Position::new(0, 0));
4070                assert_eq!(params.range.end, lsp::Position::new(0, 0));
4071                Ok(None)
4072            })
4073            .next()
4074            .await;
4075
4076        // Move cursor to a location that contains code actions.
4077        editor_b.update(cx_b, |editor, cx| {
4078            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4079            cx.focus(&editor_b);
4080        });
4081
4082        fake_language_server
4083            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4084                assert_eq!(
4085                    params.text_document.uri,
4086                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4087                );
4088                assert_eq!(params.range.start, lsp::Position::new(1, 31));
4089                assert_eq!(params.range.end, lsp::Position::new(1, 31));
4090
4091                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4092                    lsp::CodeAction {
4093                        title: "Inline into all callers".to_string(),
4094                        edit: Some(lsp::WorkspaceEdit {
4095                            changes: Some(
4096                                [
4097                                    (
4098                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
4099                                        vec![lsp::TextEdit::new(
4100                                            lsp::Range::new(
4101                                                lsp::Position::new(1, 22),
4102                                                lsp::Position::new(1, 34),
4103                                            ),
4104                                            "4".to_string(),
4105                                        )],
4106                                    ),
4107                                    (
4108                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
4109                                        vec![lsp::TextEdit::new(
4110                                            lsp::Range::new(
4111                                                lsp::Position::new(0, 0),
4112                                                lsp::Position::new(0, 27),
4113                                            ),
4114                                            "".to_string(),
4115                                        )],
4116                                    ),
4117                                ]
4118                                .into_iter()
4119                                .collect(),
4120                            ),
4121                            ..Default::default()
4122                        }),
4123                        data: Some(json!({
4124                            "codeActionParams": {
4125                                "range": {
4126                                    "start": {"line": 1, "column": 31},
4127                                    "end": {"line": 1, "column": 31},
4128                                }
4129                            }
4130                        })),
4131                        ..Default::default()
4132                    },
4133                )]))
4134            })
4135            .next()
4136            .await;
4137
4138        // Toggle code actions and wait for them to display.
4139        editor_b.update(cx_b, |editor, cx| {
4140            editor.toggle_code_actions(
4141                &ToggleCodeActions {
4142                    deployed_from_indicator: false,
4143                },
4144                cx,
4145            );
4146        });
4147        editor_b
4148            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4149            .await;
4150
4151        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4152
4153        // Confirming the code action will trigger a resolve request.
4154        let confirm_action = workspace_b
4155            .update(cx_b, |workspace, cx| {
4156                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4157            })
4158            .unwrap();
4159        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4160            |_, _| async move {
4161                Ok(lsp::CodeAction {
4162                    title: "Inline into all callers".to_string(),
4163                    edit: Some(lsp::WorkspaceEdit {
4164                        changes: Some(
4165                            [
4166                                (
4167                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4168                                    vec![lsp::TextEdit::new(
4169                                        lsp::Range::new(
4170                                            lsp::Position::new(1, 22),
4171                                            lsp::Position::new(1, 34),
4172                                        ),
4173                                        "4".to_string(),
4174                                    )],
4175                                ),
4176                                (
4177                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4178                                    vec![lsp::TextEdit::new(
4179                                        lsp::Range::new(
4180                                            lsp::Position::new(0, 0),
4181                                            lsp::Position::new(0, 27),
4182                                        ),
4183                                        "".to_string(),
4184                                    )],
4185                                ),
4186                            ]
4187                            .into_iter()
4188                            .collect(),
4189                        ),
4190                        ..Default::default()
4191                    }),
4192                    ..Default::default()
4193                })
4194            },
4195        );
4196
4197        // After the action is confirmed, an editor containing both modified files is opened.
4198        confirm_action.await.unwrap();
4199        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4200            workspace
4201                .active_item(cx)
4202                .unwrap()
4203                .downcast::<Editor>()
4204                .unwrap()
4205        });
4206        code_action_editor.update(cx_b, |editor, cx| {
4207            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4208            editor.undo(&Undo, cx);
4209            assert_eq!(
4210                editor.text(cx),
4211                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4212            );
4213            editor.redo(&Redo, cx);
4214            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4215        });
4216    }
4217
4218    #[gpui::test(iterations = 10)]
4219    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4220        cx_a.foreground().forbid_parking();
4221        let lang_registry = Arc::new(LanguageRegistry::test());
4222        let fs = FakeFs::new(cx_a.background());
4223        cx_b.update(|cx| editor::init(cx));
4224
4225        // Set up a fake language server.
4226        let mut language = Language::new(
4227            LanguageConfig {
4228                name: "Rust".into(),
4229                path_suffixes: vec!["rs".to_string()],
4230                ..Default::default()
4231            },
4232            Some(tree_sitter_rust::language()),
4233        );
4234        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4235            capabilities: lsp::ServerCapabilities {
4236                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4237                    prepare_provider: Some(true),
4238                    work_done_progress_options: Default::default(),
4239                })),
4240                ..Default::default()
4241            },
4242            ..Default::default()
4243        });
4244        lang_registry.add(Arc::new(language));
4245
4246        // Connect to a server as 2 clients.
4247        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4248        let client_a = server.create_client(cx_a, "user_a").await;
4249        let mut client_b = server.create_client(cx_b, "user_b").await;
4250        server
4251            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4252            .await;
4253
4254        // Share a project as client A
4255        fs.insert_tree(
4256            "/dir",
4257            json!({
4258                "one.rs": "const ONE: usize = 1;",
4259                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4260            }),
4261        )
4262        .await;
4263        let project_a = cx_a.update(|cx| {
4264            Project::local(
4265                client_a.clone(),
4266                client_a.user_store.clone(),
4267                lang_registry.clone(),
4268                fs.clone(),
4269                cx,
4270            )
4271        });
4272        let (worktree_a, _) = project_a
4273            .update(cx_a, |p, cx| {
4274                p.find_or_create_local_worktree("/dir", true, cx)
4275            })
4276            .await
4277            .unwrap();
4278        worktree_a
4279            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4280            .await;
4281        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4282
4283        // Join the worktree as client B.
4284        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4285        let mut params = cx_b.update(WorkspaceParams::test);
4286        params.languages = lang_registry.clone();
4287        params.project = project_b.clone();
4288        params.client = client_b.client.clone();
4289        params.user_store = client_b.user_store.clone();
4290
4291        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4292        let editor_b = workspace_b
4293            .update(cx_b, |workspace, cx| {
4294                workspace.open_path((worktree_id, "one.rs"), true, cx)
4295            })
4296            .await
4297            .unwrap()
4298            .downcast::<Editor>()
4299            .unwrap();
4300        let fake_language_server = fake_language_servers.next().await.unwrap();
4301
4302        // Move cursor to a location that can be renamed.
4303        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4304            editor.select_ranges([7..7], None, cx);
4305            editor.rename(&Rename, cx).unwrap()
4306        });
4307
4308        fake_language_server
4309            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4310                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4311                assert_eq!(params.position, lsp::Position::new(0, 7));
4312                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4313                    lsp::Position::new(0, 6),
4314                    lsp::Position::new(0, 9),
4315                ))))
4316            })
4317            .next()
4318            .await
4319            .unwrap();
4320        prepare_rename.await.unwrap();
4321        editor_b.update(cx_b, |editor, cx| {
4322            let rename = editor.pending_rename().unwrap();
4323            let buffer = editor.buffer().read(cx).snapshot(cx);
4324            assert_eq!(
4325                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4326                6..9
4327            );
4328            rename.editor.update(cx, |rename_editor, cx| {
4329                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4330                    rename_buffer.edit([(0..3, "THREE")], cx);
4331                });
4332            });
4333        });
4334
4335        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4336            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4337        });
4338        fake_language_server
4339            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4340                assert_eq!(
4341                    params.text_document_position.text_document.uri.as_str(),
4342                    "file:///dir/one.rs"
4343                );
4344                assert_eq!(
4345                    params.text_document_position.position,
4346                    lsp::Position::new(0, 6)
4347                );
4348                assert_eq!(params.new_name, "THREE");
4349                Ok(Some(lsp::WorkspaceEdit {
4350                    changes: Some(
4351                        [
4352                            (
4353                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4354                                vec![lsp::TextEdit::new(
4355                                    lsp::Range::new(
4356                                        lsp::Position::new(0, 6),
4357                                        lsp::Position::new(0, 9),
4358                                    ),
4359                                    "THREE".to_string(),
4360                                )],
4361                            ),
4362                            (
4363                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4364                                vec![
4365                                    lsp::TextEdit::new(
4366                                        lsp::Range::new(
4367                                            lsp::Position::new(0, 24),
4368                                            lsp::Position::new(0, 27),
4369                                        ),
4370                                        "THREE".to_string(),
4371                                    ),
4372                                    lsp::TextEdit::new(
4373                                        lsp::Range::new(
4374                                            lsp::Position::new(0, 35),
4375                                            lsp::Position::new(0, 38),
4376                                        ),
4377                                        "THREE".to_string(),
4378                                    ),
4379                                ],
4380                            ),
4381                        ]
4382                        .into_iter()
4383                        .collect(),
4384                    ),
4385                    ..Default::default()
4386                }))
4387            })
4388            .next()
4389            .await
4390            .unwrap();
4391        confirm_rename.await.unwrap();
4392
4393        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4394            workspace
4395                .active_item(cx)
4396                .unwrap()
4397                .downcast::<Editor>()
4398                .unwrap()
4399        });
4400        rename_editor.update(cx_b, |editor, cx| {
4401            assert_eq!(
4402                editor.text(cx),
4403                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4404            );
4405            editor.undo(&Undo, cx);
4406            assert_eq!(
4407                editor.text(cx),
4408                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4409            );
4410            editor.redo(&Redo, cx);
4411            assert_eq!(
4412                editor.text(cx),
4413                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4414            );
4415        });
4416
4417        // Ensure temporary rename edits cannot be undone/redone.
4418        editor_b.update(cx_b, |editor, cx| {
4419            editor.undo(&Undo, cx);
4420            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4421            editor.undo(&Undo, cx);
4422            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4423            editor.redo(&Redo, cx);
4424            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4425        })
4426    }
4427
4428    #[gpui::test(iterations = 10)]
4429    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4430        cx_a.foreground().forbid_parking();
4431
4432        // Connect to a server as 2 clients.
4433        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4434        let client_a = server.create_client(cx_a, "user_a").await;
4435        let client_b = server.create_client(cx_b, "user_b").await;
4436
4437        // Create an org that includes these 2 users.
4438        let db = &server.app_state.db;
4439        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4440        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4441            .await
4442            .unwrap();
4443        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4444            .await
4445            .unwrap();
4446
4447        // Create a channel that includes all the users.
4448        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4449        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4450            .await
4451            .unwrap();
4452        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4453            .await
4454            .unwrap();
4455        db.create_channel_message(
4456            channel_id,
4457            client_b.current_user_id(&cx_b),
4458            "hello A, it's B.",
4459            OffsetDateTime::now_utc(),
4460            1,
4461        )
4462        .await
4463        .unwrap();
4464
4465        let channels_a = cx_a
4466            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4467        channels_a
4468            .condition(cx_a, |list, _| list.available_channels().is_some())
4469            .await;
4470        channels_a.read_with(cx_a, |list, _| {
4471            assert_eq!(
4472                list.available_channels().unwrap(),
4473                &[ChannelDetails {
4474                    id: channel_id.to_proto(),
4475                    name: "test-channel".to_string()
4476                }]
4477            )
4478        });
4479        let channel_a = channels_a.update(cx_a, |this, cx| {
4480            this.get_channel(channel_id.to_proto(), cx).unwrap()
4481        });
4482        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4483        channel_a
4484            .condition(&cx_a, |channel, _| {
4485                channel_messages(channel)
4486                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4487            })
4488            .await;
4489
4490        let channels_b = cx_b
4491            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4492        channels_b
4493            .condition(cx_b, |list, _| list.available_channels().is_some())
4494            .await;
4495        channels_b.read_with(cx_b, |list, _| {
4496            assert_eq!(
4497                list.available_channels().unwrap(),
4498                &[ChannelDetails {
4499                    id: channel_id.to_proto(),
4500                    name: "test-channel".to_string()
4501                }]
4502            )
4503        });
4504
4505        let channel_b = channels_b.update(cx_b, |this, cx| {
4506            this.get_channel(channel_id.to_proto(), cx).unwrap()
4507        });
4508        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4509        channel_b
4510            .condition(&cx_b, |channel, _| {
4511                channel_messages(channel)
4512                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4513            })
4514            .await;
4515
4516        channel_a
4517            .update(cx_a, |channel, cx| {
4518                channel
4519                    .send_message("oh, hi B.".to_string(), cx)
4520                    .unwrap()
4521                    .detach();
4522                let task = channel.send_message("sup".to_string(), cx).unwrap();
4523                assert_eq!(
4524                    channel_messages(channel),
4525                    &[
4526                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4527                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4528                        ("user_a".to_string(), "sup".to_string(), true)
4529                    ]
4530                );
4531                task
4532            })
4533            .await
4534            .unwrap();
4535
4536        channel_b
4537            .condition(&cx_b, |channel, _| {
4538                channel_messages(channel)
4539                    == [
4540                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4541                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4542                        ("user_a".to_string(), "sup".to_string(), false),
4543                    ]
4544            })
4545            .await;
4546
4547        assert_eq!(
4548            server
4549                .state()
4550                .await
4551                .channel(channel_id)
4552                .unwrap()
4553                .connection_ids
4554                .len(),
4555            2
4556        );
4557        cx_b.update(|_| drop(channel_b));
4558        server
4559            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4560            .await;
4561
4562        cx_a.update(|_| drop(channel_a));
4563        server
4564            .condition(|state| state.channel(channel_id).is_none())
4565            .await;
4566    }
4567
4568    #[gpui::test(iterations = 10)]
4569    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4570        cx_a.foreground().forbid_parking();
4571
4572        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4573        let client_a = server.create_client(cx_a, "user_a").await;
4574
4575        let db = &server.app_state.db;
4576        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4577        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4578        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4579            .await
4580            .unwrap();
4581        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4582            .await
4583            .unwrap();
4584
4585        let channels_a = cx_a
4586            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4587        channels_a
4588            .condition(cx_a, |list, _| list.available_channels().is_some())
4589            .await;
4590        let channel_a = channels_a.update(cx_a, |this, cx| {
4591            this.get_channel(channel_id.to_proto(), cx).unwrap()
4592        });
4593
4594        // Messages aren't allowed to be too long.
4595        channel_a
4596            .update(cx_a, |channel, cx| {
4597                let long_body = "this is long.\n".repeat(1024);
4598                channel.send_message(long_body, cx).unwrap()
4599            })
4600            .await
4601            .unwrap_err();
4602
4603        // Messages aren't allowed to be blank.
4604        channel_a.update(cx_a, |channel, cx| {
4605            channel.send_message(String::new(), cx).unwrap_err()
4606        });
4607
4608        // Leading and trailing whitespace are trimmed.
4609        channel_a
4610            .update(cx_a, |channel, cx| {
4611                channel
4612                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4613                    .unwrap()
4614            })
4615            .await
4616            .unwrap();
4617        assert_eq!(
4618            db.get_channel_messages(channel_id, 10, None)
4619                .await
4620                .unwrap()
4621                .iter()
4622                .map(|m| &m.body)
4623                .collect::<Vec<_>>(),
4624            &["surrounded by whitespace"]
4625        );
4626    }
4627
4628    #[gpui::test(iterations = 10)]
4629    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4630        cx_a.foreground().forbid_parking();
4631
4632        // Connect to a server as 2 clients.
4633        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4634        let client_a = server.create_client(cx_a, "user_a").await;
4635        let client_b = server.create_client(cx_b, "user_b").await;
4636        let mut status_b = client_b.status();
4637
4638        // Create an org that includes these 2 users.
4639        let db = &server.app_state.db;
4640        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4641        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4642            .await
4643            .unwrap();
4644        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4645            .await
4646            .unwrap();
4647
4648        // Create a channel that includes all the users.
4649        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4650        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4651            .await
4652            .unwrap();
4653        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4654            .await
4655            .unwrap();
4656        db.create_channel_message(
4657            channel_id,
4658            client_b.current_user_id(&cx_b),
4659            "hello A, it's B.",
4660            OffsetDateTime::now_utc(),
4661            2,
4662        )
4663        .await
4664        .unwrap();
4665
4666        let channels_a = cx_a
4667            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4668        channels_a
4669            .condition(cx_a, |list, _| list.available_channels().is_some())
4670            .await;
4671
4672        channels_a.read_with(cx_a, |list, _| {
4673            assert_eq!(
4674                list.available_channels().unwrap(),
4675                &[ChannelDetails {
4676                    id: channel_id.to_proto(),
4677                    name: "test-channel".to_string()
4678                }]
4679            )
4680        });
4681        let channel_a = channels_a.update(cx_a, |this, cx| {
4682            this.get_channel(channel_id.to_proto(), cx).unwrap()
4683        });
4684        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4685        channel_a
4686            .condition(&cx_a, |channel, _| {
4687                channel_messages(channel)
4688                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4689            })
4690            .await;
4691
4692        let channels_b = cx_b
4693            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4694        channels_b
4695            .condition(cx_b, |list, _| list.available_channels().is_some())
4696            .await;
4697        channels_b.read_with(cx_b, |list, _| {
4698            assert_eq!(
4699                list.available_channels().unwrap(),
4700                &[ChannelDetails {
4701                    id: channel_id.to_proto(),
4702                    name: "test-channel".to_string()
4703                }]
4704            )
4705        });
4706
4707        let channel_b = channels_b.update(cx_b, |this, cx| {
4708            this.get_channel(channel_id.to_proto(), cx).unwrap()
4709        });
4710        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4711        channel_b
4712            .condition(&cx_b, |channel, _| {
4713                channel_messages(channel)
4714                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4715            })
4716            .await;
4717
4718        // Disconnect client B, ensuring we can still access its cached channel data.
4719        server.forbid_connections();
4720        server.disconnect_client(client_b.current_user_id(&cx_b));
4721        cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4722        while !matches!(
4723            status_b.next().await,
4724            Some(client::Status::ReconnectionError { .. })
4725        ) {}
4726
4727        channels_b.read_with(cx_b, |channels, _| {
4728            assert_eq!(
4729                channels.available_channels().unwrap(),
4730                [ChannelDetails {
4731                    id: channel_id.to_proto(),
4732                    name: "test-channel".to_string()
4733                }]
4734            )
4735        });
4736        channel_b.read_with(cx_b, |channel, _| {
4737            assert_eq!(
4738                channel_messages(channel),
4739                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4740            )
4741        });
4742
4743        // Send a message from client B while it is disconnected.
4744        channel_b
4745            .update(cx_b, |channel, cx| {
4746                let task = channel
4747                    .send_message("can you see this?".to_string(), cx)
4748                    .unwrap();
4749                assert_eq!(
4750                    channel_messages(channel),
4751                    &[
4752                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4753                        ("user_b".to_string(), "can you see this?".to_string(), true)
4754                    ]
4755                );
4756                task
4757            })
4758            .await
4759            .unwrap_err();
4760
4761        // Send a message from client A while B is disconnected.
4762        channel_a
4763            .update(cx_a, |channel, cx| {
4764                channel
4765                    .send_message("oh, hi B.".to_string(), cx)
4766                    .unwrap()
4767                    .detach();
4768                let task = channel.send_message("sup".to_string(), cx).unwrap();
4769                assert_eq!(
4770                    channel_messages(channel),
4771                    &[
4772                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4773                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4774                        ("user_a".to_string(), "sup".to_string(), true)
4775                    ]
4776                );
4777                task
4778            })
4779            .await
4780            .unwrap();
4781
4782        // Give client B a chance to reconnect.
4783        server.allow_connections();
4784        cx_b.foreground().advance_clock(Duration::from_secs(10));
4785
4786        // Verify that B sees the new messages upon reconnection, as well as the message client B
4787        // sent while offline.
4788        channel_b
4789            .condition(&cx_b, |channel, _| {
4790                channel_messages(channel)
4791                    == [
4792                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4793                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4794                        ("user_a".to_string(), "sup".to_string(), false),
4795                        ("user_b".to_string(), "can you see this?".to_string(), false),
4796                    ]
4797            })
4798            .await;
4799
4800        // Ensure client A and B can communicate normally after reconnection.
4801        channel_a
4802            .update(cx_a, |channel, cx| {
4803                channel.send_message("you online?".to_string(), cx).unwrap()
4804            })
4805            .await
4806            .unwrap();
4807        channel_b
4808            .condition(&cx_b, |channel, _| {
4809                channel_messages(channel)
4810                    == [
4811                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4812                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4813                        ("user_a".to_string(), "sup".to_string(), false),
4814                        ("user_b".to_string(), "can you see this?".to_string(), false),
4815                        ("user_a".to_string(), "you online?".to_string(), false),
4816                    ]
4817            })
4818            .await;
4819
4820        channel_b
4821            .update(cx_b, |channel, cx| {
4822                channel.send_message("yep".to_string(), cx).unwrap()
4823            })
4824            .await
4825            .unwrap();
4826        channel_a
4827            .condition(&cx_a, |channel, _| {
4828                channel_messages(channel)
4829                    == [
4830                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4831                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4832                        ("user_a".to_string(), "sup".to_string(), false),
4833                        ("user_b".to_string(), "can you see this?".to_string(), false),
4834                        ("user_a".to_string(), "you online?".to_string(), false),
4835                        ("user_b".to_string(), "yep".to_string(), false),
4836                    ]
4837            })
4838            .await;
4839    }
4840
4841    #[gpui::test(iterations = 10)]
4842    async fn test_contacts(
4843        deterministic: Arc<Deterministic>,
4844        cx_a: &mut TestAppContext,
4845        cx_b: &mut TestAppContext,
4846        cx_c: &mut TestAppContext,
4847    ) {
4848        cx_a.foreground().forbid_parking();
4849
4850        // Connect to a server as 3 clients.
4851        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4852        let mut client_a = server.create_client(cx_a, "user_a").await;
4853        let mut client_b = server.create_client(cx_b, "user_b").await;
4854        let client_c = server.create_client(cx_c, "user_c").await;
4855        server
4856            .make_contacts(vec![
4857                (&client_a, cx_a),
4858                (&client_b, cx_b),
4859                (&client_c, cx_c),
4860            ])
4861            .await;
4862
4863        deterministic.run_until_parked();
4864        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4865            client.user_store.read_with(*cx, |store, _| {
4866                assert_eq!(
4867                    contacts(store),
4868                    [
4869                        ("user_a", true, vec![]),
4870                        ("user_b", true, vec![]),
4871                        ("user_c", true, vec![])
4872                    ]
4873                )
4874            });
4875        }
4876
4877        // Share a project as client A.
4878        let fs = FakeFs::new(cx_a.background());
4879        fs.create_dir(Path::new("/a")).await.unwrap();
4880        let (project_a, _) = client_a.build_local_project(fs, "/a", cx_a).await;
4881
4882        deterministic.run_until_parked();
4883        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4884            client.user_store.read_with(*cx, |store, _| {
4885                assert_eq!(
4886                    contacts(store),
4887                    [
4888                        ("user_a", true, vec![("a", vec![])]),
4889                        ("user_b", true, vec![]),
4890                        ("user_c", true, vec![])
4891                    ]
4892                )
4893            });
4894        }
4895
4896        let _project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
4897
4898        deterministic.run_until_parked();
4899        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4900            client.user_store.read_with(*cx, |store, _| {
4901                assert_eq!(
4902                    contacts(store),
4903                    [
4904                        ("user_a", true, vec![("a", vec!["user_b"])]),
4905                        ("user_b", true, vec![]),
4906                        ("user_c", true, vec![])
4907                    ]
4908                )
4909            });
4910        }
4911
4912        // Add a local project as client B
4913        let fs = FakeFs::new(cx_b.background());
4914        fs.create_dir(Path::new("/b")).await.unwrap();
4915        let (_project_b, _) = client_b.build_local_project(fs, "/b", cx_a).await;
4916
4917        deterministic.run_until_parked();
4918        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4919            client.user_store.read_with(*cx, |store, _| {
4920                assert_eq!(
4921                    contacts(store),
4922                    [
4923                        ("user_a", true, vec![("a", vec!["user_b"])]),
4924                        ("user_b", true, vec![("b", vec![])]),
4925                        ("user_c", true, vec![])
4926                    ]
4927                )
4928            });
4929        }
4930
4931        project_a
4932            .condition(&cx_a, |project, _| {
4933                project.collaborators().contains_key(&client_b.peer_id)
4934            })
4935            .await;
4936
4937        client_a.project.take();
4938        cx_a.update(move |_| drop(project_a));
4939        deterministic.run_until_parked();
4940        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4941            client.user_store.read_with(*cx, |store, _| {
4942                assert_eq!(
4943                    contacts(store),
4944                    [
4945                        ("user_a", true, vec![]),
4946                        ("user_b", true, vec![("b", vec![])]),
4947                        ("user_c", true, vec![])
4948                    ]
4949                )
4950            });
4951        }
4952
4953        server.disconnect_client(client_c.current_user_id(cx_c));
4954        server.forbid_connections();
4955        deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
4956        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
4957            client.user_store.read_with(*cx, |store, _| {
4958                assert_eq!(
4959                    contacts(store),
4960                    [
4961                        ("user_a", true, vec![]),
4962                        ("user_b", true, vec![("b", vec![])]),
4963                        ("user_c", false, vec![])
4964                    ]
4965                )
4966            });
4967        }
4968        client_c
4969            .user_store
4970            .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
4971
4972        server.allow_connections();
4973        client_c
4974            .authenticate_and_connect(false, &cx_c.to_async())
4975            .await
4976            .unwrap();
4977
4978        deterministic.run_until_parked();
4979        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
4980            client.user_store.read_with(*cx, |store, _| {
4981                assert_eq!(
4982                    contacts(store),
4983                    [
4984                        ("user_a", true, vec![]),
4985                        ("user_b", true, vec![("b", vec![])]),
4986                        ("user_c", true, vec![])
4987                    ]
4988                )
4989            });
4990        }
4991
4992        fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, Vec<&str>)>)> {
4993            user_store
4994                .contacts()
4995                .iter()
4996                .map(|contact| {
4997                    let projects = contact
4998                        .projects
4999                        .iter()
5000                        .map(|p| {
5001                            (
5002                                p.worktree_root_names[0].as_str(),
5003                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5004                            )
5005                        })
5006                        .collect();
5007                    (contact.user.github_login.as_str(), contact.online, projects)
5008                })
5009                .collect()
5010        }
5011    }
5012
5013    #[gpui::test(iterations = 10)]
5014    async fn test_contact_requests(
5015        executor: Arc<Deterministic>,
5016        cx_a: &mut TestAppContext,
5017        cx_a2: &mut TestAppContext,
5018        cx_b: &mut TestAppContext,
5019        cx_b2: &mut TestAppContext,
5020        cx_c: &mut TestAppContext,
5021        cx_c2: &mut TestAppContext,
5022    ) {
5023        cx_a.foreground().forbid_parking();
5024
5025        // Connect to a server as 3 clients.
5026        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5027        let client_a = server.create_client(cx_a, "user_a").await;
5028        let client_a2 = server.create_client(cx_a2, "user_a").await;
5029        let client_b = server.create_client(cx_b, "user_b").await;
5030        let client_b2 = server.create_client(cx_b2, "user_b").await;
5031        let client_c = server.create_client(cx_c, "user_c").await;
5032        let client_c2 = server.create_client(cx_c2, "user_c").await;
5033
5034        assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5035        assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5036        assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5037
5038        // User A and User C request that user B become their contact.
5039        client_a
5040            .user_store
5041            .update(cx_a, |store, cx| {
5042                store.request_contact(client_b.user_id().unwrap(), cx)
5043            })
5044            .await
5045            .unwrap();
5046        client_c
5047            .user_store
5048            .update(cx_c, |store, cx| {
5049                store.request_contact(client_b.user_id().unwrap(), cx)
5050            })
5051            .await
5052            .unwrap();
5053        executor.run_until_parked();
5054
5055        // All users see the pending request appear in all their clients.
5056        assert_eq!(
5057            client_a.summarize_contacts(&cx_a).outgoing_requests,
5058            &["user_b"]
5059        );
5060        assert_eq!(
5061            client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5062            &["user_b"]
5063        );
5064        assert_eq!(
5065            client_b.summarize_contacts(&cx_b).incoming_requests,
5066            &["user_a", "user_c"]
5067        );
5068        assert_eq!(
5069            client_b2.summarize_contacts(&cx_b2).incoming_requests,
5070            &["user_a", "user_c"]
5071        );
5072        assert_eq!(
5073            client_c.summarize_contacts(&cx_c).outgoing_requests,
5074            &["user_b"]
5075        );
5076        assert_eq!(
5077            client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5078            &["user_b"]
5079        );
5080
5081        // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5082        disconnect_and_reconnect(&client_a, cx_a).await;
5083        disconnect_and_reconnect(&client_b, cx_b).await;
5084        disconnect_and_reconnect(&client_c, cx_c).await;
5085        executor.run_until_parked();
5086        assert_eq!(
5087            client_a.summarize_contacts(&cx_a).outgoing_requests,
5088            &["user_b"]
5089        );
5090        assert_eq!(
5091            client_b.summarize_contacts(&cx_b).incoming_requests,
5092            &["user_a", "user_c"]
5093        );
5094        assert_eq!(
5095            client_c.summarize_contacts(&cx_c).outgoing_requests,
5096            &["user_b"]
5097        );
5098
5099        // User B accepts the request from user A.
5100        client_b
5101            .user_store
5102            .update(cx_b, |store, cx| {
5103                store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5104            })
5105            .await
5106            .unwrap();
5107
5108        executor.run_until_parked();
5109
5110        // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5111        let contacts_b = client_b.summarize_contacts(&cx_b);
5112        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5113        assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5114        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5115        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5116        assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5117
5118        // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5119        let contacts_a = client_a.summarize_contacts(&cx_a);
5120        assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5121        assert!(contacts_a.outgoing_requests.is_empty());
5122        let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5123        assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5124        assert!(contacts_a2.outgoing_requests.is_empty());
5125
5126        // Contacts are present upon connecting (tested here via disconnect/reconnect)
5127        disconnect_and_reconnect(&client_a, cx_a).await;
5128        disconnect_and_reconnect(&client_b, cx_b).await;
5129        disconnect_and_reconnect(&client_c, cx_c).await;
5130        executor.run_until_parked();
5131        assert_eq!(
5132            client_a.summarize_contacts(&cx_a).current,
5133            &["user_a", "user_b"]
5134        );
5135        assert_eq!(
5136            client_b.summarize_contacts(&cx_b).current,
5137            &["user_a", "user_b"]
5138        );
5139        assert_eq!(
5140            client_b.summarize_contacts(&cx_b).incoming_requests,
5141            &["user_c"]
5142        );
5143        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5144        assert_eq!(
5145            client_c.summarize_contacts(&cx_c).outgoing_requests,
5146            &["user_b"]
5147        );
5148
5149        // User B rejects the request from user C.
5150        client_b
5151            .user_store
5152            .update(cx_b, |store, cx| {
5153                store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5154            })
5155            .await
5156            .unwrap();
5157
5158        executor.run_until_parked();
5159
5160        // User B doesn't see user C as their contact, and the incoming request from them is removed.
5161        let contacts_b = client_b.summarize_contacts(&cx_b);
5162        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5163        assert!(contacts_b.incoming_requests.is_empty());
5164        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5165        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5166        assert!(contacts_b2.incoming_requests.is_empty());
5167
5168        // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5169        let contacts_c = client_c.summarize_contacts(&cx_c);
5170        assert_eq!(contacts_c.current, &["user_c"]);
5171        assert!(contacts_c.outgoing_requests.is_empty());
5172        let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5173        assert_eq!(contacts_c2.current, &["user_c"]);
5174        assert!(contacts_c2.outgoing_requests.is_empty());
5175
5176        // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5177        disconnect_and_reconnect(&client_a, cx_a).await;
5178        disconnect_and_reconnect(&client_b, cx_b).await;
5179        disconnect_and_reconnect(&client_c, cx_c).await;
5180        executor.run_until_parked();
5181        assert_eq!(
5182            client_a.summarize_contacts(&cx_a).current,
5183            &["user_a", "user_b"]
5184        );
5185        assert_eq!(
5186            client_b.summarize_contacts(&cx_b).current,
5187            &["user_a", "user_b"]
5188        );
5189        assert!(client_b
5190            .summarize_contacts(&cx_b)
5191            .incoming_requests
5192            .is_empty());
5193        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5194        assert!(client_c
5195            .summarize_contacts(&cx_c)
5196            .outgoing_requests
5197            .is_empty());
5198
5199        async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5200            client.disconnect(&cx.to_async()).unwrap();
5201            client.clear_contacts(cx).await;
5202            client
5203                .authenticate_and_connect(false, &cx.to_async())
5204                .await
5205                .unwrap();
5206        }
5207    }
5208
5209    #[gpui::test(iterations = 10)]
5210    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5211        cx_a.foreground().forbid_parking();
5212        let fs = FakeFs::new(cx_a.background());
5213
5214        // 2 clients connect to a server.
5215        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5216        let mut client_a = server.create_client(cx_a, "user_a").await;
5217        let mut client_b = server.create_client(cx_b, "user_b").await;
5218        server
5219            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5220            .await;
5221        cx_a.update(editor::init);
5222        cx_b.update(editor::init);
5223
5224        // Client A shares a project.
5225        fs.insert_tree(
5226            "/a",
5227            json!({
5228                "1.txt": "one",
5229                "2.txt": "two",
5230                "3.txt": "three",
5231            }),
5232        )
5233        .await;
5234        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5235
5236        // Client B joins the project.
5237        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5238
5239        // Client A opens some editors.
5240        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5241        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5242        let editor_a1 = workspace_a
5243            .update(cx_a, |workspace, cx| {
5244                workspace.open_path((worktree_id, "1.txt"), true, cx)
5245            })
5246            .await
5247            .unwrap()
5248            .downcast::<Editor>()
5249            .unwrap();
5250        let editor_a2 = workspace_a
5251            .update(cx_a, |workspace, cx| {
5252                workspace.open_path((worktree_id, "2.txt"), true, cx)
5253            })
5254            .await
5255            .unwrap()
5256            .downcast::<Editor>()
5257            .unwrap();
5258
5259        // Client B opens an editor.
5260        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5261        let editor_b1 = workspace_b
5262            .update(cx_b, |workspace, cx| {
5263                workspace.open_path((worktree_id, "1.txt"), true, cx)
5264            })
5265            .await
5266            .unwrap()
5267            .downcast::<Editor>()
5268            .unwrap();
5269
5270        let client_a_id = project_b.read_with(cx_b, |project, _| {
5271            project.collaborators().values().next().unwrap().peer_id
5272        });
5273        let client_b_id = project_a.read_with(cx_a, |project, _| {
5274            project.collaborators().values().next().unwrap().peer_id
5275        });
5276
5277        // When client B starts following client A, all visible view states are replicated to client B.
5278        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5279        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5280        workspace_b
5281            .update(cx_b, |workspace, cx| {
5282                workspace
5283                    .toggle_follow(&ToggleFollow(client_a_id), cx)
5284                    .unwrap()
5285            })
5286            .await
5287            .unwrap();
5288
5289        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5290            workspace
5291                .active_item(cx)
5292                .unwrap()
5293                .downcast::<Editor>()
5294                .unwrap()
5295        });
5296        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5297        assert_eq!(
5298            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5299            Some((worktree_id, "2.txt").into())
5300        );
5301        assert_eq!(
5302            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5303            vec![2..3]
5304        );
5305        assert_eq!(
5306            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5307            vec![0..1]
5308        );
5309
5310        // When client A activates a different editor, client B does so as well.
5311        workspace_a.update(cx_a, |workspace, cx| {
5312            workspace.activate_item(&editor_a1, cx)
5313        });
5314        workspace_b
5315            .condition(cx_b, |workspace, cx| {
5316                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5317            })
5318            .await;
5319
5320        // When client A navigates back and forth, client B does so as well.
5321        workspace_a
5322            .update(cx_a, |workspace, cx| {
5323                workspace::Pane::go_back(workspace, None, cx)
5324            })
5325            .await;
5326        workspace_b
5327            .condition(cx_b, |workspace, cx| {
5328                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5329            })
5330            .await;
5331
5332        workspace_a
5333            .update(cx_a, |workspace, cx| {
5334                workspace::Pane::go_forward(workspace, None, cx)
5335            })
5336            .await;
5337        workspace_b
5338            .condition(cx_b, |workspace, cx| {
5339                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5340            })
5341            .await;
5342
5343        // Changes to client A's editor are reflected on client B.
5344        editor_a1.update(cx_a, |editor, cx| {
5345            editor.select_ranges([1..1, 2..2], None, cx);
5346        });
5347        editor_b1
5348            .condition(cx_b, |editor, cx| {
5349                editor.selected_ranges(cx) == vec![1..1, 2..2]
5350            })
5351            .await;
5352
5353        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5354        editor_b1
5355            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5356            .await;
5357
5358        editor_a1.update(cx_a, |editor, cx| {
5359            editor.select_ranges([3..3], None, cx);
5360            editor.set_scroll_position(vec2f(0., 100.), cx);
5361        });
5362        editor_b1
5363            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5364            .await;
5365
5366        // After unfollowing, client B stops receiving updates from client A.
5367        workspace_b.update(cx_b, |workspace, cx| {
5368            workspace.unfollow(&workspace.active_pane().clone(), cx)
5369        });
5370        workspace_a.update(cx_a, |workspace, cx| {
5371            workspace.activate_item(&editor_a2, cx)
5372        });
5373        cx_a.foreground().run_until_parked();
5374        assert_eq!(
5375            workspace_b.read_with(cx_b, |workspace, cx| workspace
5376                .active_item(cx)
5377                .unwrap()
5378                .id()),
5379            editor_b1.id()
5380        );
5381
5382        // Client A starts following client B.
5383        workspace_a
5384            .update(cx_a, |workspace, cx| {
5385                workspace
5386                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5387                    .unwrap()
5388            })
5389            .await
5390            .unwrap();
5391        assert_eq!(
5392            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5393            Some(client_b_id)
5394        );
5395        assert_eq!(
5396            workspace_a.read_with(cx_a, |workspace, cx| workspace
5397                .active_item(cx)
5398                .unwrap()
5399                .id()),
5400            editor_a1.id()
5401        );
5402
5403        // Following interrupts when client B disconnects.
5404        client_b.disconnect(&cx_b.to_async()).unwrap();
5405        cx_a.foreground().run_until_parked();
5406        assert_eq!(
5407            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5408            None
5409        );
5410    }
5411
5412    #[gpui::test(iterations = 10)]
5413    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5414        cx_a.foreground().forbid_parking();
5415        let fs = FakeFs::new(cx_a.background());
5416
5417        // 2 clients connect to a server.
5418        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5419        let mut client_a = server.create_client(cx_a, "user_a").await;
5420        let mut client_b = server.create_client(cx_b, "user_b").await;
5421        server
5422            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5423            .await;
5424        cx_a.update(editor::init);
5425        cx_b.update(editor::init);
5426
5427        // Client A shares a project.
5428        fs.insert_tree(
5429            "/a",
5430            json!({
5431                "1.txt": "one",
5432                "2.txt": "two",
5433                "3.txt": "three",
5434                "4.txt": "four",
5435            }),
5436        )
5437        .await;
5438        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5439
5440        // Client B joins the project.
5441        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5442
5443        // Client A opens some editors.
5444        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5445        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5446        let _editor_a1 = workspace_a
5447            .update(cx_a, |workspace, cx| {
5448                workspace.open_path((worktree_id, "1.txt"), true, cx)
5449            })
5450            .await
5451            .unwrap()
5452            .downcast::<Editor>()
5453            .unwrap();
5454
5455        // Client B opens an editor.
5456        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5457        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5458        let _editor_b1 = workspace_b
5459            .update(cx_b, |workspace, cx| {
5460                workspace.open_path((worktree_id, "2.txt"), true, cx)
5461            })
5462            .await
5463            .unwrap()
5464            .downcast::<Editor>()
5465            .unwrap();
5466
5467        // Clients A and B follow each other in split panes
5468        workspace_a
5469            .update(cx_a, |workspace, cx| {
5470                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5471                assert_ne!(*workspace.active_pane(), pane_a1);
5472                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5473                workspace
5474                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5475                    .unwrap()
5476            })
5477            .await
5478            .unwrap();
5479        workspace_b
5480            .update(cx_b, |workspace, cx| {
5481                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5482                assert_ne!(*workspace.active_pane(), pane_b1);
5483                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5484                workspace
5485                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5486                    .unwrap()
5487            })
5488            .await
5489            .unwrap();
5490
5491        workspace_a
5492            .update(cx_a, |workspace, cx| {
5493                workspace.activate_next_pane(cx);
5494                assert_eq!(*workspace.active_pane(), pane_a1);
5495                workspace.open_path((worktree_id, "3.txt"), true, cx)
5496            })
5497            .await
5498            .unwrap();
5499        workspace_b
5500            .update(cx_b, |workspace, cx| {
5501                workspace.activate_next_pane(cx);
5502                assert_eq!(*workspace.active_pane(), pane_b1);
5503                workspace.open_path((worktree_id, "4.txt"), true, cx)
5504            })
5505            .await
5506            .unwrap();
5507        cx_a.foreground().run_until_parked();
5508
5509        // Ensure leader updates don't change the active pane of followers
5510        workspace_a.read_with(cx_a, |workspace, _| {
5511            assert_eq!(*workspace.active_pane(), pane_a1);
5512        });
5513        workspace_b.read_with(cx_b, |workspace, _| {
5514            assert_eq!(*workspace.active_pane(), pane_b1);
5515        });
5516
5517        // Ensure peers following each other doesn't cause an infinite loop.
5518        assert_eq!(
5519            workspace_a.read_with(cx_a, |workspace, cx| workspace
5520                .active_item(cx)
5521                .unwrap()
5522                .project_path(cx)),
5523            Some((worktree_id, "3.txt").into())
5524        );
5525        workspace_a.update(cx_a, |workspace, cx| {
5526            assert_eq!(
5527                workspace.active_item(cx).unwrap().project_path(cx),
5528                Some((worktree_id, "3.txt").into())
5529            );
5530            workspace.activate_next_pane(cx);
5531            assert_eq!(
5532                workspace.active_item(cx).unwrap().project_path(cx),
5533                Some((worktree_id, "4.txt").into())
5534            );
5535        });
5536        workspace_b.update(cx_b, |workspace, cx| {
5537            assert_eq!(
5538                workspace.active_item(cx).unwrap().project_path(cx),
5539                Some((worktree_id, "4.txt").into())
5540            );
5541            workspace.activate_next_pane(cx);
5542            assert_eq!(
5543                workspace.active_item(cx).unwrap().project_path(cx),
5544                Some((worktree_id, "3.txt").into())
5545            );
5546        });
5547    }
5548
5549    #[gpui::test(iterations = 10)]
5550    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5551        cx_a.foreground().forbid_parking();
5552        let fs = FakeFs::new(cx_a.background());
5553
5554        // 2 clients connect to a server.
5555        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5556        let mut client_a = server.create_client(cx_a, "user_a").await;
5557        let mut client_b = server.create_client(cx_b, "user_b").await;
5558        server
5559            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5560            .await;
5561        cx_a.update(editor::init);
5562        cx_b.update(editor::init);
5563
5564        // Client A shares a project.
5565        fs.insert_tree(
5566            "/a",
5567            json!({
5568                "1.txt": "one",
5569                "2.txt": "two",
5570                "3.txt": "three",
5571            }),
5572        )
5573        .await;
5574        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5575
5576        // Client B joins the project.
5577        let project_b = client_b.build_remote_project(&project_a, cx_a, cx_b).await;
5578
5579        // Client A opens some editors.
5580        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5581        let _editor_a1 = workspace_a
5582            .update(cx_a, |workspace, cx| {
5583                workspace.open_path((worktree_id, "1.txt"), true, cx)
5584            })
5585            .await
5586            .unwrap()
5587            .downcast::<Editor>()
5588            .unwrap();
5589
5590        // Client B starts following client A.
5591        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5592        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5593        let leader_id = project_b.read_with(cx_b, |project, _| {
5594            project.collaborators().values().next().unwrap().peer_id
5595        });
5596        workspace_b
5597            .update(cx_b, |workspace, cx| {
5598                workspace
5599                    .toggle_follow(&ToggleFollow(leader_id), cx)
5600                    .unwrap()
5601            })
5602            .await
5603            .unwrap();
5604        assert_eq!(
5605            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5606            Some(leader_id)
5607        );
5608        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5609            workspace
5610                .active_item(cx)
5611                .unwrap()
5612                .downcast::<Editor>()
5613                .unwrap()
5614        });
5615
5616        // When client B moves, it automatically stops following client A.
5617        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5618        assert_eq!(
5619            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5620            None
5621        );
5622
5623        workspace_b
5624            .update(cx_b, |workspace, cx| {
5625                workspace
5626                    .toggle_follow(&ToggleFollow(leader_id), cx)
5627                    .unwrap()
5628            })
5629            .await
5630            .unwrap();
5631        assert_eq!(
5632            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5633            Some(leader_id)
5634        );
5635
5636        // When client B edits, it automatically stops following client A.
5637        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5638        assert_eq!(
5639            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5640            None
5641        );
5642
5643        workspace_b
5644            .update(cx_b, |workspace, cx| {
5645                workspace
5646                    .toggle_follow(&ToggleFollow(leader_id), cx)
5647                    .unwrap()
5648            })
5649            .await
5650            .unwrap();
5651        assert_eq!(
5652            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5653            Some(leader_id)
5654        );
5655
5656        // When client B scrolls, it automatically stops following client A.
5657        editor_b2.update(cx_b, |editor, cx| {
5658            editor.set_scroll_position(vec2f(0., 3.), cx)
5659        });
5660        assert_eq!(
5661            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5662            None
5663        );
5664
5665        workspace_b
5666            .update(cx_b, |workspace, cx| {
5667                workspace
5668                    .toggle_follow(&ToggleFollow(leader_id), cx)
5669                    .unwrap()
5670            })
5671            .await
5672            .unwrap();
5673        assert_eq!(
5674            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5675            Some(leader_id)
5676        );
5677
5678        // When client B activates a different pane, it continues following client A in the original pane.
5679        workspace_b.update(cx_b, |workspace, cx| {
5680            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5681        });
5682        assert_eq!(
5683            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5684            Some(leader_id)
5685        );
5686
5687        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5688        assert_eq!(
5689            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5690            Some(leader_id)
5691        );
5692
5693        // When client B activates a different item in the original pane, it automatically stops following client A.
5694        workspace_b
5695            .update(cx_b, |workspace, cx| {
5696                workspace.open_path((worktree_id, "2.txt"), true, cx)
5697            })
5698            .await
5699            .unwrap();
5700        assert_eq!(
5701            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5702            None
5703        );
5704    }
5705
5706    #[gpui::test(iterations = 100)]
5707    async fn test_random_collaboration(
5708        cx: &mut TestAppContext,
5709        deterministic: Arc<Deterministic>,
5710        rng: StdRng,
5711    ) {
5712        cx.foreground().forbid_parking();
5713        let max_peers = env::var("MAX_PEERS")
5714            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5715            .unwrap_or(5);
5716        assert!(max_peers <= 5);
5717
5718        let max_operations = env::var("OPERATIONS")
5719            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5720            .unwrap_or(10);
5721
5722        let rng = Arc::new(Mutex::new(rng));
5723
5724        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5725        let host_language_registry = Arc::new(LanguageRegistry::test());
5726
5727        let fs = FakeFs::new(cx.background());
5728        fs.insert_tree("/_collab", json!({"init": ""})).await;
5729
5730        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5731        let db = server.app_state.db.clone();
5732        let host_user_id = db.create_user("host", false).await.unwrap();
5733        for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
5734            let guest_user_id = db.create_user(username, false).await.unwrap();
5735            server
5736                .app_state
5737                .db
5738                .send_contact_request(guest_user_id, host_user_id)
5739                .await
5740                .unwrap();
5741            server
5742                .app_state
5743                .db
5744                .respond_to_contact_request(host_user_id, guest_user_id, true)
5745                .await
5746                .unwrap();
5747        }
5748
5749        let mut clients = Vec::new();
5750        let mut user_ids = Vec::new();
5751        let mut op_start_signals = Vec::new();
5752
5753        let mut next_entity_id = 100000;
5754        let mut host_cx = TestAppContext::new(
5755            cx.foreground_platform(),
5756            cx.platform(),
5757            deterministic.build_foreground(next_entity_id),
5758            deterministic.build_background(),
5759            cx.font_cache(),
5760            cx.leak_detector(),
5761            next_entity_id,
5762        );
5763        let host = server.create_client(&mut host_cx, "host").await;
5764        let host_project = host_cx.update(|cx| {
5765            Project::local(
5766                host.client.clone(),
5767                host.user_store.clone(),
5768                host_language_registry.clone(),
5769                fs.clone(),
5770                cx,
5771            )
5772        });
5773        let host_project_id = host_project
5774            .update(&mut host_cx, |p, _| p.next_remote_id())
5775            .await;
5776
5777        let (collab_worktree, _) = host_project
5778            .update(&mut host_cx, |project, cx| {
5779                project.find_or_create_local_worktree("/_collab", true, cx)
5780            })
5781            .await
5782            .unwrap();
5783        collab_worktree
5784            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5785            .await;
5786
5787        // Set up fake language servers.
5788        let mut language = Language::new(
5789            LanguageConfig {
5790                name: "Rust".into(),
5791                path_suffixes: vec!["rs".to_string()],
5792                ..Default::default()
5793            },
5794            None,
5795        );
5796        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5797            name: "the-fake-language-server",
5798            capabilities: lsp::LanguageServer::full_capabilities(),
5799            initializer: Some(Box::new({
5800                let rng = rng.clone();
5801                let fs = fs.clone();
5802                let project = host_project.downgrade();
5803                move |fake_server: &mut FakeLanguageServer| {
5804                    fake_server.handle_request::<lsp::request::Completion, _, _>(
5805                        |_, _| async move {
5806                            Ok(Some(lsp::CompletionResponse::Array(vec![
5807                                lsp::CompletionItem {
5808                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5809                                        range: lsp::Range::new(
5810                                            lsp::Position::new(0, 0),
5811                                            lsp::Position::new(0, 0),
5812                                        ),
5813                                        new_text: "the-new-text".to_string(),
5814                                    })),
5815                                    ..Default::default()
5816                                },
5817                            ])))
5818                        },
5819                    );
5820
5821                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5822                        |_, _| async move {
5823                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5824                                lsp::CodeAction {
5825                                    title: "the-code-action".to_string(),
5826                                    ..Default::default()
5827                                },
5828                            )]))
5829                        },
5830                    );
5831
5832                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5833                        |params, _| async move {
5834                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5835                                params.position,
5836                                params.position,
5837                            ))))
5838                        },
5839                    );
5840
5841                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5842                        let fs = fs.clone();
5843                        let rng = rng.clone();
5844                        move |_, _| {
5845                            let fs = fs.clone();
5846                            let rng = rng.clone();
5847                            async move {
5848                                let files = fs.files().await;
5849                                let mut rng = rng.lock();
5850                                let count = rng.gen_range::<usize, _>(1..3);
5851                                let files = (0..count)
5852                                    .map(|_| files.choose(&mut *rng).unwrap())
5853                                    .collect::<Vec<_>>();
5854                                log::info!("LSP: Returning definitions in files {:?}", &files);
5855                                Ok(Some(lsp::GotoDefinitionResponse::Array(
5856                                    files
5857                                        .into_iter()
5858                                        .map(|file| lsp::Location {
5859                                            uri: lsp::Url::from_file_path(file).unwrap(),
5860                                            range: Default::default(),
5861                                        })
5862                                        .collect(),
5863                                )))
5864                            }
5865                        }
5866                    });
5867
5868                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5869                        let rng = rng.clone();
5870                        let project = project.clone();
5871                        move |params, mut cx| {
5872                            let highlights = if let Some(project) = project.upgrade(&cx) {
5873                                project.update(&mut cx, |project, cx| {
5874                                    let path = params
5875                                        .text_document_position_params
5876                                        .text_document
5877                                        .uri
5878                                        .to_file_path()
5879                                        .unwrap();
5880                                    let (worktree, relative_path) =
5881                                        project.find_local_worktree(&path, cx)?;
5882                                    let project_path =
5883                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
5884                                    let buffer =
5885                                        project.get_open_buffer(&project_path, cx)?.read(cx);
5886
5887                                    let mut highlights = Vec::new();
5888                                    let highlight_count = rng.lock().gen_range(1..=5);
5889                                    let mut prev_end = 0;
5890                                    for _ in 0..highlight_count {
5891                                        let range =
5892                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
5893
5894                                        highlights.push(lsp::DocumentHighlight {
5895                                            range: range_to_lsp(range.to_point_utf16(buffer)),
5896                                            kind: Some(lsp::DocumentHighlightKind::READ),
5897                                        });
5898                                        prev_end = range.end;
5899                                    }
5900                                    Some(highlights)
5901                                })
5902                            } else {
5903                                None
5904                            };
5905                            async move { Ok(highlights) }
5906                        }
5907                    });
5908                }
5909            })),
5910            ..Default::default()
5911        });
5912        host_language_registry.add(Arc::new(language));
5913
5914        let op_start_signal = futures::channel::mpsc::unbounded();
5915        user_ids.push(host.current_user_id(&host_cx));
5916        op_start_signals.push(op_start_signal.0);
5917        clients.push(host_cx.foreground().spawn(host.simulate_host(
5918            host_project,
5919            op_start_signal.1,
5920            rng.clone(),
5921            host_cx,
5922        )));
5923
5924        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5925            rng.lock().gen_range(0..max_operations)
5926        } else {
5927            max_operations
5928        };
5929        let mut available_guests = vec![
5930            "guest-1".to_string(),
5931            "guest-2".to_string(),
5932            "guest-3".to_string(),
5933            "guest-4".to_string(),
5934        ];
5935        let mut operations = 0;
5936        while operations < max_operations {
5937            if operations == disconnect_host_at {
5938                server.disconnect_client(user_ids[0]);
5939                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5940                drop(op_start_signals);
5941                let mut clients = futures::future::join_all(clients).await;
5942                cx.foreground().run_until_parked();
5943
5944                let (host, mut host_cx, host_err) = clients.remove(0);
5945                if let Some(host_err) = host_err {
5946                    log::error!("host error - {:?}", host_err);
5947                }
5948                host.project
5949                    .as_ref()
5950                    .unwrap()
5951                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5952                for (guest, mut guest_cx, guest_err) in clients {
5953                    if let Some(guest_err) = guest_err {
5954                        log::error!("{} error - {:?}", guest.username, guest_err);
5955                    }
5956                    // TODO
5957                    // let contacts = server
5958                    //     .store
5959                    //     .read()
5960                    //     .await
5961                    //     .contacts_for_user(guest.current_user_id(&guest_cx));
5962                    // assert!(!contacts
5963                    //     .iter()
5964                    //     .flat_map(|contact| &contact.projects)
5965                    //     .any(|project| project.id == host_project_id));
5966                    guest
5967                        .project
5968                        .as_ref()
5969                        .unwrap()
5970                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5971                    guest_cx.update(|_| drop(guest));
5972                }
5973                host_cx.update(|_| drop(host));
5974
5975                return;
5976            }
5977
5978            let distribution = rng.lock().gen_range(0..100);
5979            match distribution {
5980                0..=19 if !available_guests.is_empty() => {
5981                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
5982                    let guest_username = available_guests.remove(guest_ix);
5983                    log::info!("Adding new connection for {}", guest_username);
5984                    next_entity_id += 100000;
5985                    let mut guest_cx = TestAppContext::new(
5986                        cx.foreground_platform(),
5987                        cx.platform(),
5988                        deterministic.build_foreground(next_entity_id),
5989                        deterministic.build_background(),
5990                        cx.font_cache(),
5991                        cx.leak_detector(),
5992                        next_entity_id,
5993                    );
5994                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
5995                    let guest_project = Project::remote(
5996                        host_project_id,
5997                        guest.client.clone(),
5998                        guest.user_store.clone(),
5999                        guest_lang_registry.clone(),
6000                        FakeFs::new(cx.background()),
6001                        &mut guest_cx.to_async(),
6002                    )
6003                    .await
6004                    .unwrap();
6005                    let op_start_signal = futures::channel::mpsc::unbounded();
6006                    user_ids.push(guest.current_user_id(&guest_cx));
6007                    op_start_signals.push(op_start_signal.0);
6008                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6009                        guest_username.clone(),
6010                        guest_project,
6011                        op_start_signal.1,
6012                        rng.clone(),
6013                        guest_cx,
6014                    )));
6015
6016                    log::info!("Added connection for {}", guest_username);
6017                    operations += 1;
6018                }
6019                20..=29 if clients.len() > 1 => {
6020                    let guest_ix = rng.lock().gen_range(1..clients.len());
6021                    log::info!("Removing guest {}", user_ids[guest_ix]);
6022                    let removed_guest_id = user_ids.remove(guest_ix);
6023                    let guest = clients.remove(guest_ix);
6024                    op_start_signals.remove(guest_ix);
6025                    server.disconnect_client(removed_guest_id);
6026                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6027                    let (guest, mut guest_cx, guest_err) = guest.await;
6028                    if let Some(guest_err) = guest_err {
6029                        log::error!("{} error - {:?}", guest.username, guest_err);
6030                    }
6031                    guest
6032                        .project
6033                        .as_ref()
6034                        .unwrap()
6035                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6036                    // TODO
6037                    // for user_id in &user_ids {
6038                    //     for contact in server.store.read().await.contacts_for_user(*user_id) {
6039                    //         assert_ne!(
6040                    //             contact.user_id, removed_guest_id.0 as u64,
6041                    //             "removed guest is still a contact of another peer"
6042                    //         );
6043                    //         for project in contact.projects {
6044                    //             for project_guest_id in project.guests {
6045                    //                 assert_ne!(
6046                    //                     project_guest_id, removed_guest_id.0 as u64,
6047                    //                     "removed guest appears as still participating on a project"
6048                    //                 );
6049                    //             }
6050                    //         }
6051                    //     }
6052                    // }
6053
6054                    log::info!("{} removed", guest.username);
6055                    available_guests.push(guest.username.clone());
6056                    guest_cx.update(|_| drop(guest));
6057
6058                    operations += 1;
6059                }
6060                _ => {
6061                    while operations < max_operations && rng.lock().gen_bool(0.7) {
6062                        op_start_signals
6063                            .choose(&mut *rng.lock())
6064                            .unwrap()
6065                            .unbounded_send(())
6066                            .unwrap();
6067                        operations += 1;
6068                    }
6069
6070                    if rng.lock().gen_bool(0.8) {
6071                        cx.foreground().run_until_parked();
6072                    }
6073                }
6074            }
6075        }
6076
6077        drop(op_start_signals);
6078        let mut clients = futures::future::join_all(clients).await;
6079        cx.foreground().run_until_parked();
6080
6081        let (host_client, mut host_cx, host_err) = clients.remove(0);
6082        if let Some(host_err) = host_err {
6083            panic!("host error - {:?}", host_err);
6084        }
6085        let host_project = host_client.project.as_ref().unwrap();
6086        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6087            project
6088                .worktrees(cx)
6089                .map(|worktree| {
6090                    let snapshot = worktree.read(cx).snapshot();
6091                    (snapshot.id(), snapshot)
6092                })
6093                .collect::<BTreeMap<_, _>>()
6094        });
6095
6096        host_client
6097            .project
6098            .as_ref()
6099            .unwrap()
6100            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6101
6102        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6103            if let Some(guest_err) = guest_err {
6104                panic!("{} error - {:?}", guest_client.username, guest_err);
6105            }
6106            let worktree_snapshots =
6107                guest_client
6108                    .project
6109                    .as_ref()
6110                    .unwrap()
6111                    .read_with(&guest_cx, |project, cx| {
6112                        project
6113                            .worktrees(cx)
6114                            .map(|worktree| {
6115                                let worktree = worktree.read(cx);
6116                                (worktree.id(), worktree.snapshot())
6117                            })
6118                            .collect::<BTreeMap<_, _>>()
6119                    });
6120
6121            assert_eq!(
6122                worktree_snapshots.keys().collect::<Vec<_>>(),
6123                host_worktree_snapshots.keys().collect::<Vec<_>>(),
6124                "{} has different worktrees than the host",
6125                guest_client.username
6126            );
6127            for (id, host_snapshot) in &host_worktree_snapshots {
6128                let guest_snapshot = &worktree_snapshots[id];
6129                assert_eq!(
6130                    guest_snapshot.root_name(),
6131                    host_snapshot.root_name(),
6132                    "{} has different root name than the host for worktree {}",
6133                    guest_client.username,
6134                    id
6135                );
6136                assert_eq!(
6137                    guest_snapshot.entries(false).collect::<Vec<_>>(),
6138                    host_snapshot.entries(false).collect::<Vec<_>>(),
6139                    "{} has different snapshot than the host for worktree {}",
6140                    guest_client.username,
6141                    id
6142                );
6143                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6144            }
6145
6146            guest_client
6147                .project
6148                .as_ref()
6149                .unwrap()
6150                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6151
6152            for guest_buffer in &guest_client.buffers {
6153                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6154                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6155                    project.buffer_for_id(buffer_id, cx).expect(&format!(
6156                        "host does not have buffer for guest:{}, peer:{}, id:{}",
6157                        guest_client.username, guest_client.peer_id, buffer_id
6158                    ))
6159                });
6160                let path = host_buffer
6161                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6162
6163                assert_eq!(
6164                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6165                    0,
6166                    "{}, buffer {}, path {:?} has deferred operations",
6167                    guest_client.username,
6168                    buffer_id,
6169                    path,
6170                );
6171                assert_eq!(
6172                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6173                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6174                    "{}, buffer {}, path {:?}, differs from the host's buffer",
6175                    guest_client.username,
6176                    buffer_id,
6177                    path
6178                );
6179            }
6180
6181            guest_cx.update(|_| drop(guest_client));
6182        }
6183
6184        host_cx.update(|_| drop(host_client));
6185    }
6186
6187    struct TestServer {
6188        peer: Arc<Peer>,
6189        app_state: Arc<AppState>,
6190        server: Arc<Server>,
6191        foreground: Rc<executor::Foreground>,
6192        notifications: mpsc::UnboundedReceiver<()>,
6193        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6194        forbid_connections: Arc<AtomicBool>,
6195        _test_db: TestDb,
6196    }
6197
6198    impl TestServer {
6199        async fn start(
6200            foreground: Rc<executor::Foreground>,
6201            background: Arc<executor::Background>,
6202        ) -> Self {
6203            let test_db = TestDb::fake(background);
6204            let app_state = Self::build_app_state(&test_db).await;
6205            let peer = Peer::new();
6206            let notifications = mpsc::unbounded();
6207            let server = Server::new(app_state.clone(), Some(notifications.0));
6208            Self {
6209                peer,
6210                app_state,
6211                server,
6212                foreground,
6213                notifications: notifications.1,
6214                connection_killers: Default::default(),
6215                forbid_connections: Default::default(),
6216                _test_db: test_db,
6217            }
6218        }
6219
6220        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6221            cx.update(|cx| {
6222                let settings = Settings::test(cx);
6223                cx.set_global(settings);
6224            });
6225
6226            let http = FakeHttpClient::with_404_response();
6227            let user_id =
6228                if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6229                    user.id
6230                } else {
6231                    self.app_state.db.create_user(name, false).await.unwrap()
6232                };
6233            let client_name = name.to_string();
6234            let mut client = Client::new(http.clone());
6235            let server = self.server.clone();
6236            let connection_killers = self.connection_killers.clone();
6237            let forbid_connections = self.forbid_connections.clone();
6238            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6239
6240            Arc::get_mut(&mut client)
6241                .unwrap()
6242                .override_authenticate(move |cx| {
6243                    cx.spawn(|_| async move {
6244                        let access_token = "the-token".to_string();
6245                        Ok(Credentials {
6246                            user_id: user_id.0 as u64,
6247                            access_token,
6248                        })
6249                    })
6250                })
6251                .override_establish_connection(move |credentials, cx| {
6252                    assert_eq!(credentials.user_id, user_id.0 as u64);
6253                    assert_eq!(credentials.access_token, "the-token");
6254
6255                    let server = server.clone();
6256                    let connection_killers = connection_killers.clone();
6257                    let forbid_connections = forbid_connections.clone();
6258                    let client_name = client_name.clone();
6259                    let connection_id_tx = connection_id_tx.clone();
6260                    cx.spawn(move |cx| async move {
6261                        if forbid_connections.load(SeqCst) {
6262                            Err(EstablishConnectionError::other(anyhow!(
6263                                "server is forbidding connections"
6264                            )))
6265                        } else {
6266                            let (client_conn, server_conn, killed) =
6267                                Connection::in_memory(cx.background());
6268                            connection_killers.lock().insert(user_id, killed);
6269                            cx.background()
6270                                .spawn(server.handle_connection(
6271                                    server_conn,
6272                                    client_name,
6273                                    user_id,
6274                                    Some(connection_id_tx),
6275                                    cx.background(),
6276                                ))
6277                                .detach();
6278                            Ok(client_conn)
6279                        }
6280                    })
6281                });
6282
6283            client
6284                .authenticate_and_connect(false, &cx.to_async())
6285                .await
6286                .unwrap();
6287
6288            Channel::init(&client);
6289            Project::init(&client);
6290            cx.update(|cx| {
6291                workspace::init(&client, cx);
6292            });
6293
6294            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6295            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6296
6297            let client = TestClient {
6298                client,
6299                peer_id,
6300                username: name.to_string(),
6301                user_store,
6302                language_registry: Arc::new(LanguageRegistry::test()),
6303                project: Default::default(),
6304                buffers: Default::default(),
6305            };
6306            client.wait_for_current_user(cx).await;
6307            client
6308        }
6309
6310        fn disconnect_client(&self, user_id: UserId) {
6311            self.connection_killers
6312                .lock()
6313                .remove(&user_id)
6314                .unwrap()
6315                .store(true, SeqCst);
6316        }
6317
6318        fn forbid_connections(&self) {
6319            self.forbid_connections.store(true, SeqCst);
6320        }
6321
6322        fn allow_connections(&self) {
6323            self.forbid_connections.store(false, SeqCst);
6324        }
6325
6326        async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6327            while let Some((client_a, cx_a)) = clients.pop() {
6328                for (client_b, cx_b) in &mut clients {
6329                    client_a
6330                        .user_store
6331                        .update(cx_a, |store, cx| {
6332                            store.request_contact(client_b.user_id().unwrap(), cx)
6333                        })
6334                        .await
6335                        .unwrap();
6336                    cx_a.foreground().run_until_parked();
6337                    client_b
6338                        .user_store
6339                        .update(*cx_b, |store, cx| {
6340                            store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6341                        })
6342                        .await
6343                        .unwrap();
6344                }
6345            }
6346        }
6347
6348        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6349            Arc::new(AppState {
6350                db: test_db.db().clone(),
6351                api_token: Default::default(),
6352            })
6353        }
6354
6355        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6356            self.server.store.read().await
6357        }
6358
6359        async fn condition<F>(&mut self, mut predicate: F)
6360        where
6361            F: FnMut(&Store) -> bool,
6362        {
6363            assert!(
6364                self.foreground.parking_forbidden(),
6365                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6366            );
6367            while !(predicate)(&*self.server.store.read().await) {
6368                self.foreground.start_waiting();
6369                self.notifications.next().await;
6370                self.foreground.finish_waiting();
6371            }
6372        }
6373    }
6374
6375    impl Deref for TestServer {
6376        type Target = Server;
6377
6378        fn deref(&self) -> &Self::Target {
6379            &self.server
6380        }
6381    }
6382
6383    impl Drop for TestServer {
6384        fn drop(&mut self) {
6385            self.peer.reset();
6386        }
6387    }
6388
6389    struct TestClient {
6390        client: Arc<Client>,
6391        username: String,
6392        pub peer_id: PeerId,
6393        pub user_store: ModelHandle<UserStore>,
6394        language_registry: Arc<LanguageRegistry>,
6395        project: Option<ModelHandle<Project>>,
6396        buffers: HashSet<ModelHandle<language::Buffer>>,
6397    }
6398
6399    impl Deref for TestClient {
6400        type Target = Arc<Client>;
6401
6402        fn deref(&self) -> &Self::Target {
6403            &self.client
6404        }
6405    }
6406
6407    struct ContactsSummary {
6408        pub current: Vec<String>,
6409        pub outgoing_requests: Vec<String>,
6410        pub incoming_requests: Vec<String>,
6411    }
6412
6413    impl TestClient {
6414        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6415            UserId::from_proto(
6416                self.user_store
6417                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6418            )
6419        }
6420
6421        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6422            let mut authed_user = self
6423                .user_store
6424                .read_with(cx, |user_store, _| user_store.watch_current_user());
6425            while authed_user.next().await.unwrap().is_none() {}
6426        }
6427
6428        async fn clear_contacts(&self, cx: &mut TestAppContext) {
6429            self.user_store
6430                .update(cx, |store, _| store.clear_contacts())
6431                .await;
6432        }
6433
6434        fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6435            self.user_store.read_with(cx, |store, _| ContactsSummary {
6436                current: store
6437                    .contacts()
6438                    .iter()
6439                    .map(|contact| contact.user.github_login.clone())
6440                    .collect(),
6441                outgoing_requests: store
6442                    .outgoing_contact_requests()
6443                    .iter()
6444                    .map(|user| user.github_login.clone())
6445                    .collect(),
6446                incoming_requests: store
6447                    .incoming_contact_requests()
6448                    .iter()
6449                    .map(|user| user.github_login.clone())
6450                    .collect(),
6451            })
6452        }
6453
6454        async fn build_local_project(
6455            &mut self,
6456            fs: Arc<FakeFs>,
6457            root_path: impl AsRef<Path>,
6458            cx: &mut TestAppContext,
6459        ) -> (ModelHandle<Project>, WorktreeId) {
6460            let project = cx.update(|cx| {
6461                Project::local(
6462                    self.client.clone(),
6463                    self.user_store.clone(),
6464                    self.language_registry.clone(),
6465                    fs,
6466                    cx,
6467                )
6468            });
6469            self.project = Some(project.clone());
6470            let (worktree, _) = project
6471                .update(cx, |p, cx| {
6472                    p.find_or_create_local_worktree(root_path, true, cx)
6473                })
6474                .await
6475                .unwrap();
6476            worktree
6477                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6478                .await;
6479            project
6480                .update(cx, |project, _| project.next_remote_id())
6481                .await;
6482            (project, worktree.read_with(cx, |tree, _| tree.id()))
6483        }
6484
6485        async fn build_remote_project(
6486            &mut self,
6487            host_project: &ModelHandle<Project>,
6488            host_cx: &mut TestAppContext,
6489            guest_cx: &mut TestAppContext,
6490        ) -> ModelHandle<Project> {
6491            let host_project_id = host_project
6492                .read_with(host_cx, |project, _| project.next_remote_id())
6493                .await;
6494            let guest_user_id = self.user_id().unwrap();
6495            let languages =
6496                host_project.read_with(host_cx, |project, _| project.languages().clone());
6497            let project_b = guest_cx.spawn(|mut cx| {
6498                let user_store = self.user_store.clone();
6499                let guest_client = self.client.clone();
6500                async move {
6501                    Project::remote(
6502                        host_project_id,
6503                        guest_client,
6504                        user_store.clone(),
6505                        languages,
6506                        FakeFs::new(cx.background()),
6507                        &mut cx,
6508                    )
6509                    .await
6510                    .unwrap()
6511                }
6512            });
6513            host_cx.foreground().run_until_parked();
6514            host_project.update(host_cx, |project, cx| {
6515                project.respond_to_join_request(guest_user_id, true, cx)
6516            });
6517            let project = project_b.await;
6518            self.project = Some(project.clone());
6519            project
6520        }
6521
6522        fn build_workspace(
6523            &self,
6524            project: &ModelHandle<Project>,
6525            cx: &mut TestAppContext,
6526        ) -> ViewHandle<Workspace> {
6527            let (window_id, _) = cx.add_window(|_| EmptyView);
6528            cx.add_view(window_id, |cx| {
6529                let fs = project.read(cx).fs().clone();
6530                Workspace::new(
6531                    &WorkspaceParams {
6532                        fs,
6533                        project: project.clone(),
6534                        user_store: self.user_store.clone(),
6535                        languages: self.language_registry.clone(),
6536                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6537                        channel_list: cx.add_model(|cx| {
6538                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6539                        }),
6540                        client: self.client.clone(),
6541                    },
6542                    cx,
6543                )
6544            })
6545        }
6546
6547        async fn simulate_host(
6548            mut self,
6549            project: ModelHandle<Project>,
6550            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6551            rng: Arc<Mutex<StdRng>>,
6552            mut cx: TestAppContext,
6553        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6554            async fn simulate_host_internal(
6555                client: &mut TestClient,
6556                project: ModelHandle<Project>,
6557                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6558                rng: Arc<Mutex<StdRng>>,
6559                cx: &mut TestAppContext,
6560            ) -> anyhow::Result<()> {
6561                let fs = project.read_with(cx, |project, _| project.fs().clone());
6562
6563                cx.update(|cx| {
6564                    cx.subscribe(&project, move |project, event, cx| {
6565                        if let project::Event::ContactRequestedJoin(user) = event {
6566                            log::info!("Host: accepting join request from {}", user.github_login);
6567                            project.update(cx, |project, cx| {
6568                                project.respond_to_join_request(user.id, true, cx)
6569                            });
6570                        }
6571                    })
6572                    .detach();
6573                });
6574
6575                while op_start_signal.next().await.is_some() {
6576                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6577                    let files = fs.as_fake().files().await;
6578                    match distribution {
6579                        0..=19 if !files.is_empty() => {
6580                            let path = files.choose(&mut *rng.lock()).unwrap();
6581                            let mut path = path.as_path();
6582                            while let Some(parent_path) = path.parent() {
6583                                path = parent_path;
6584                                if rng.lock().gen() {
6585                                    break;
6586                                }
6587                            }
6588
6589                            log::info!("Host: find/create local worktree {:?}", path);
6590                            let find_or_create_worktree = project.update(cx, |project, cx| {
6591                                project.find_or_create_local_worktree(path, true, cx)
6592                            });
6593                            if rng.lock().gen() {
6594                                cx.background().spawn(find_or_create_worktree).detach();
6595                            } else {
6596                                find_or_create_worktree.await?;
6597                            }
6598                        }
6599                        20..=79 if !files.is_empty() => {
6600                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6601                                let file = files.choose(&mut *rng.lock()).unwrap();
6602                                let (worktree, path) = project
6603                                    .update(cx, |project, cx| {
6604                                        project.find_or_create_local_worktree(
6605                                            file.clone(),
6606                                            true,
6607                                            cx,
6608                                        )
6609                                    })
6610                                    .await?;
6611                                let project_path =
6612                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6613                                log::info!(
6614                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6615                                    file,
6616                                    project_path.0,
6617                                    project_path.1
6618                                );
6619                                let buffer = project
6620                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6621                                    .await
6622                                    .unwrap();
6623                                client.buffers.insert(buffer.clone());
6624                                buffer
6625                            } else {
6626                                client
6627                                    .buffers
6628                                    .iter()
6629                                    .choose(&mut *rng.lock())
6630                                    .unwrap()
6631                                    .clone()
6632                            };
6633
6634                            if rng.lock().gen_bool(0.1) {
6635                                cx.update(|cx| {
6636                                    log::info!(
6637                                        "Host: dropping buffer {:?}",
6638                                        buffer.read(cx).file().unwrap().full_path(cx)
6639                                    );
6640                                    client.buffers.remove(&buffer);
6641                                    drop(buffer);
6642                                });
6643                            } else {
6644                                buffer.update(cx, |buffer, cx| {
6645                                    log::info!(
6646                                        "Host: updating buffer {:?} ({})",
6647                                        buffer.file().unwrap().full_path(cx),
6648                                        buffer.remote_id()
6649                                    );
6650
6651                                    if rng.lock().gen_bool(0.7) {
6652                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6653                                    } else {
6654                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6655                                    }
6656                                });
6657                            }
6658                        }
6659                        _ => loop {
6660                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6661                            let mut path = PathBuf::new();
6662                            path.push("/");
6663                            for _ in 0..path_component_count {
6664                                let letter = rng.lock().gen_range(b'a'..=b'z');
6665                                path.push(std::str::from_utf8(&[letter]).unwrap());
6666                            }
6667                            path.set_extension("rs");
6668                            let parent_path = path.parent().unwrap();
6669
6670                            log::info!("Host: creating file {:?}", path,);
6671
6672                            if fs.create_dir(&parent_path).await.is_ok()
6673                                && fs.create_file(&path, Default::default()).await.is_ok()
6674                            {
6675                                break;
6676                            } else {
6677                                log::info!("Host: cannot create file");
6678                            }
6679                        },
6680                    }
6681
6682                    cx.background().simulate_random_delay().await;
6683                }
6684
6685                Ok(())
6686            }
6687
6688            let result =
6689                simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
6690                    .await;
6691            log::info!("Host done");
6692            self.project = Some(project);
6693            (self, cx, result.err())
6694        }
6695
6696        pub async fn simulate_guest(
6697            mut self,
6698            guest_username: String,
6699            project: ModelHandle<Project>,
6700            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6701            rng: Arc<Mutex<StdRng>>,
6702            mut cx: TestAppContext,
6703        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6704            async fn simulate_guest_internal(
6705                client: &mut TestClient,
6706                guest_username: &str,
6707                project: ModelHandle<Project>,
6708                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6709                rng: Arc<Mutex<StdRng>>,
6710                cx: &mut TestAppContext,
6711            ) -> anyhow::Result<()> {
6712                while op_start_signal.next().await.is_some() {
6713                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6714                        let worktree = if let Some(worktree) =
6715                            project.read_with(cx, |project, cx| {
6716                                project
6717                                    .worktrees(&cx)
6718                                    .filter(|worktree| {
6719                                        let worktree = worktree.read(cx);
6720                                        worktree.is_visible()
6721                                            && worktree.entries(false).any(|e| e.is_file())
6722                                    })
6723                                    .choose(&mut *rng.lock())
6724                            }) {
6725                            worktree
6726                        } else {
6727                            cx.background().simulate_random_delay().await;
6728                            continue;
6729                        };
6730
6731                        let (worktree_root_name, project_path) =
6732                            worktree.read_with(cx, |worktree, _| {
6733                                let entry = worktree
6734                                    .entries(false)
6735                                    .filter(|e| e.is_file())
6736                                    .choose(&mut *rng.lock())
6737                                    .unwrap();
6738                                (
6739                                    worktree.root_name().to_string(),
6740                                    (worktree.id(), entry.path.clone()),
6741                                )
6742                            });
6743                        log::info!(
6744                            "{}: opening path {:?} in worktree {} ({})",
6745                            guest_username,
6746                            project_path.1,
6747                            project_path.0,
6748                            worktree_root_name,
6749                        );
6750                        let buffer = project
6751                            .update(cx, |project, cx| {
6752                                project.open_buffer(project_path.clone(), cx)
6753                            })
6754                            .await?;
6755                        log::info!(
6756                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6757                            guest_username,
6758                            project_path.1,
6759                            project_path.0,
6760                            worktree_root_name,
6761                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6762                        );
6763                        client.buffers.insert(buffer.clone());
6764                        buffer
6765                    } else {
6766                        client
6767                            .buffers
6768                            .iter()
6769                            .choose(&mut *rng.lock())
6770                            .unwrap()
6771                            .clone()
6772                    };
6773
6774                    let choice = rng.lock().gen_range(0..100);
6775                    match choice {
6776                        0..=9 => {
6777                            cx.update(|cx| {
6778                                log::info!(
6779                                    "{}: dropping buffer {:?}",
6780                                    guest_username,
6781                                    buffer.read(cx).file().unwrap().full_path(cx)
6782                                );
6783                                client.buffers.remove(&buffer);
6784                                drop(buffer);
6785                            });
6786                        }
6787                        10..=19 => {
6788                            let completions = project.update(cx, |project, cx| {
6789                                log::info!(
6790                                    "{}: requesting completions for buffer {} ({:?})",
6791                                    guest_username,
6792                                    buffer.read(cx).remote_id(),
6793                                    buffer.read(cx).file().unwrap().full_path(cx)
6794                                );
6795                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6796                                project.completions(&buffer, offset, cx)
6797                            });
6798                            let completions = cx.background().spawn(async move {
6799                                completions
6800                                    .await
6801                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6802                            });
6803                            if rng.lock().gen_bool(0.3) {
6804                                log::info!("{}: detaching completions request", guest_username);
6805                                cx.update(|cx| completions.detach_and_log_err(cx));
6806                            } else {
6807                                completions.await?;
6808                            }
6809                        }
6810                        20..=29 => {
6811                            let code_actions = project.update(cx, |project, cx| {
6812                                log::info!(
6813                                    "{}: requesting code actions for buffer {} ({:?})",
6814                                    guest_username,
6815                                    buffer.read(cx).remote_id(),
6816                                    buffer.read(cx).file().unwrap().full_path(cx)
6817                                );
6818                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6819                                project.code_actions(&buffer, range, cx)
6820                            });
6821                            let code_actions = cx.background().spawn(async move {
6822                                code_actions.await.map_err(|err| {
6823                                    anyhow!("code actions request failed: {:?}", err)
6824                                })
6825                            });
6826                            if rng.lock().gen_bool(0.3) {
6827                                log::info!("{}: detaching code actions request", guest_username);
6828                                cx.update(|cx| code_actions.detach_and_log_err(cx));
6829                            } else {
6830                                code_actions.await?;
6831                            }
6832                        }
6833                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6834                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6835                                log::info!(
6836                                    "{}: saving buffer {} ({:?})",
6837                                    guest_username,
6838                                    buffer.remote_id(),
6839                                    buffer.file().unwrap().full_path(cx)
6840                                );
6841                                (buffer.version(), buffer.save(cx))
6842                            });
6843                            let save = cx.background().spawn(async move {
6844                                let (saved_version, _) = save
6845                                    .await
6846                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6847                                assert!(saved_version.observed_all(&requested_version));
6848                                Ok::<_, anyhow::Error>(())
6849                            });
6850                            if rng.lock().gen_bool(0.3) {
6851                                log::info!("{}: detaching save request", guest_username);
6852                                cx.update(|cx| save.detach_and_log_err(cx));
6853                            } else {
6854                                save.await?;
6855                            }
6856                        }
6857                        40..=44 => {
6858                            let prepare_rename = project.update(cx, |project, cx| {
6859                                log::info!(
6860                                    "{}: preparing rename for buffer {} ({:?})",
6861                                    guest_username,
6862                                    buffer.read(cx).remote_id(),
6863                                    buffer.read(cx).file().unwrap().full_path(cx)
6864                                );
6865                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6866                                project.prepare_rename(buffer, offset, cx)
6867                            });
6868                            let prepare_rename = cx.background().spawn(async move {
6869                                prepare_rename.await.map_err(|err| {
6870                                    anyhow!("prepare rename request failed: {:?}", err)
6871                                })
6872                            });
6873                            if rng.lock().gen_bool(0.3) {
6874                                log::info!("{}: detaching prepare rename request", guest_username);
6875                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6876                            } else {
6877                                prepare_rename.await?;
6878                            }
6879                        }
6880                        45..=49 => {
6881                            let definitions = project.update(cx, |project, cx| {
6882                                log::info!(
6883                                    "{}: requesting definitions for buffer {} ({:?})",
6884                                    guest_username,
6885                                    buffer.read(cx).remote_id(),
6886                                    buffer.read(cx).file().unwrap().full_path(cx)
6887                                );
6888                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6889                                project.definition(&buffer, offset, cx)
6890                            });
6891                            let definitions = cx.background().spawn(async move {
6892                                definitions
6893                                    .await
6894                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6895                            });
6896                            if rng.lock().gen_bool(0.3) {
6897                                log::info!("{}: detaching definitions request", guest_username);
6898                                cx.update(|cx| definitions.detach_and_log_err(cx));
6899                            } else {
6900                                client
6901                                    .buffers
6902                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6903                            }
6904                        }
6905                        50..=54 => {
6906                            let highlights = project.update(cx, |project, cx| {
6907                                log::info!(
6908                                    "{}: requesting highlights for buffer {} ({:?})",
6909                                    guest_username,
6910                                    buffer.read(cx).remote_id(),
6911                                    buffer.read(cx).file().unwrap().full_path(cx)
6912                                );
6913                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6914                                project.document_highlights(&buffer, offset, cx)
6915                            });
6916                            let highlights = cx.background().spawn(async move {
6917                                highlights
6918                                    .await
6919                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6920                            });
6921                            if rng.lock().gen_bool(0.3) {
6922                                log::info!("{}: detaching highlights request", guest_username);
6923                                cx.update(|cx| highlights.detach_and_log_err(cx));
6924                            } else {
6925                                highlights.await?;
6926                            }
6927                        }
6928                        55..=59 => {
6929                            let search = project.update(cx, |project, cx| {
6930                                let query = rng.lock().gen_range('a'..='z');
6931                                log::info!("{}: project-wide search {:?}", guest_username, query);
6932                                project.search(SearchQuery::text(query, false, false), cx)
6933                            });
6934                            let search = cx.background().spawn(async move {
6935                                search
6936                                    .await
6937                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
6938                            });
6939                            if rng.lock().gen_bool(0.3) {
6940                                log::info!("{}: detaching search request", guest_username);
6941                                cx.update(|cx| search.detach_and_log_err(cx));
6942                            } else {
6943                                client.buffers.extend(search.await?.into_keys());
6944                            }
6945                        }
6946                        60..=69 => {
6947                            let worktree = project
6948                                .read_with(cx, |project, cx| {
6949                                    project
6950                                        .worktrees(&cx)
6951                                        .filter(|worktree| {
6952                                            let worktree = worktree.read(cx);
6953                                            worktree.is_visible()
6954                                                && worktree.entries(false).any(|e| e.is_file())
6955                                                && worktree
6956                                                    .root_entry()
6957                                                    .map_or(false, |e| e.is_dir())
6958                                        })
6959                                        .choose(&mut *rng.lock())
6960                                })
6961                                .unwrap();
6962                            let (worktree_id, worktree_root_name) = worktree
6963                                .read_with(cx, |worktree, _| {
6964                                    (worktree.id(), worktree.root_name().to_string())
6965                                });
6966
6967                            let mut new_name = String::new();
6968                            for _ in 0..10 {
6969                                let letter = rng.lock().gen_range('a'..='z');
6970                                new_name.push(letter);
6971                            }
6972                            let mut new_path = PathBuf::new();
6973                            new_path.push(new_name);
6974                            new_path.set_extension("rs");
6975                            log::info!(
6976                                "{}: creating {:?} in worktree {} ({})",
6977                                guest_username,
6978                                new_path,
6979                                worktree_id,
6980                                worktree_root_name,
6981                            );
6982                            project
6983                                .update(cx, |project, cx| {
6984                                    project.create_entry((worktree_id, new_path), false, cx)
6985                                })
6986                                .unwrap()
6987                                .await?;
6988                        }
6989                        _ => {
6990                            buffer.update(cx, |buffer, cx| {
6991                                log::info!(
6992                                    "{}: updating buffer {} ({:?})",
6993                                    guest_username,
6994                                    buffer.remote_id(),
6995                                    buffer.file().unwrap().full_path(cx)
6996                                );
6997                                if rng.lock().gen_bool(0.7) {
6998                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6999                                } else {
7000                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7001                                }
7002                            });
7003                        }
7004                    }
7005                    cx.background().simulate_random_delay().await;
7006                }
7007                Ok(())
7008            }
7009
7010            let result = simulate_guest_internal(
7011                &mut self,
7012                &guest_username,
7013                project.clone(),
7014                op_start_signal,
7015                rng,
7016                &mut cx,
7017            )
7018            .await;
7019            log::info!("{}: done", guest_username);
7020
7021            self.project = Some(project);
7022            (self, cx, result.err())
7023        }
7024    }
7025
7026    impl Drop for TestClient {
7027        fn drop(&mut self) {
7028            self.client.tear_down();
7029        }
7030    }
7031
7032    impl Executor for Arc<gpui::executor::Background> {
7033        type Sleep = gpui::executor::Timer;
7034
7035        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7036            self.spawn(future).detach();
7037        }
7038
7039        fn sleep(&self, duration: Duration) -> Self::Sleep {
7040            self.as_ref().timer(duration)
7041        }
7042    }
7043
7044    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7045        channel
7046            .messages()
7047            .cursor::<()>()
7048            .map(|m| {
7049                (
7050                    m.sender.github_login.clone(),
7051                    m.body.clone(),
7052                    m.is_pending(),
7053                )
7054            })
7055            .collect()
7056    }
7057
7058    struct EmptyView;
7059
7060    impl gpui::Entity for EmptyView {
7061        type Event = ();
7062    }
7063
7064    impl gpui::View for EmptyView {
7065        fn ui_name() -> &'static str {
7066            "empty view"
7067        }
7068
7069        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7070            gpui::Element::boxed(gpui::elements::Empty::new())
7071        }
7072    }
7073}