rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::IntoResponse,
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::HashMap;
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::{
  40        atomic::{AtomicBool, Ordering::SeqCst},
  41        Arc,
  42    },
  43    time::Duration,
  44};
  45use store::{Store, Worktree};
  46use time::OffsetDateTime;
  47use tokio::{
  48    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  49    time::Sleep,
  50};
  51use tower::ServiceBuilder;
  52use tracing::{info_span, Instrument};
  53
  54type MessageHandler =
  55    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  56
  57struct Response<R> {
  58    server: Arc<Server>,
  59    receipt: Receipt<R>,
  60    responded: Arc<AtomicBool>,
  61}
  62
  63impl<R: RequestMessage> Response<R> {
  64    fn send(self, payload: R::Response) -> Result<()> {
  65        self.responded.store(true, SeqCst);
  66        self.server.peer.respond(self.receipt, payload)?;
  67        Ok(())
  68    }
  69}
  70
  71pub struct Server {
  72    peer: Arc<Peer>,
  73    store: RwLock<Store>,
  74    app_state: Arc<AppState>,
  75    handlers: HashMap<TypeId, MessageHandler>,
  76    notifications: Option<mpsc::UnboundedSender<()>>,
  77}
  78
  79pub trait Executor: Send + Clone {
  80    type Sleep: Send + Future;
  81    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  82    fn sleep(&self, duration: Duration) -> Self::Sleep;
  83}
  84
  85#[derive(Clone)]
  86pub struct RealExecutor;
  87
  88const MESSAGE_COUNT_PER_PAGE: usize = 100;
  89const MAX_MESSAGE_LEN: usize = 1024;
  90
  91struct StoreReadGuard<'a> {
  92    guard: RwLockReadGuard<'a, Store>,
  93    _not_send: PhantomData<Rc<()>>,
  94}
  95
  96struct StoreWriteGuard<'a> {
  97    guard: RwLockWriteGuard<'a, Store>,
  98    _not_send: PhantomData<Rc<()>>,
  99}
 100
 101impl Server {
 102    pub fn new(
 103        app_state: Arc<AppState>,
 104        notifications: Option<mpsc::UnboundedSender<()>>,
 105    ) -> Arc<Self> {
 106        let mut server = Self {
 107            peer: Peer::new(),
 108            app_state,
 109            store: Default::default(),
 110            handlers: Default::default(),
 111            notifications,
 112        };
 113
 114        server
 115            .add_request_handler(Server::ping)
 116            .add_request_handler(Server::register_project)
 117            .add_message_handler(Server::unregister_project)
 118            .add_request_handler(Server::share_project)
 119            .add_message_handler(Server::unshare_project)
 120            .add_request_handler(Server::join_project)
 121            .add_message_handler(Server::leave_project)
 122            .add_request_handler(Server::register_worktree)
 123            .add_message_handler(Server::unregister_worktree)
 124            .add_request_handler(Server::update_worktree)
 125            .add_message_handler(Server::start_language_server)
 126            .add_message_handler(Server::update_language_server)
 127            .add_message_handler(Server::update_diagnostic_summary)
 128            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 129            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 130            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 131            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 132            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 133            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 134            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 135            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 136            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 137            .add_request_handler(
 138                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 139            )
 140            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 141            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 142            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 143            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 144            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 145            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 146            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 147            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 148            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 149            .add_request_handler(Server::update_buffer)
 150            .add_message_handler(Server::update_buffer_file)
 151            .add_message_handler(Server::buffer_reloaded)
 152            .add_message_handler(Server::buffer_saved)
 153            .add_request_handler(Server::save_buffer)
 154            .add_request_handler(Server::get_channels)
 155            .add_request_handler(Server::get_users)
 156            .add_request_handler(Server::fuzzy_search_users)
 157            .add_request_handler(Server::request_contact)
 158            .add_request_handler(Server::remove_contact)
 159            .add_request_handler(Server::respond_to_contact_request)
 160            .add_request_handler(Server::join_channel)
 161            .add_message_handler(Server::leave_channel)
 162            .add_request_handler(Server::send_channel_message)
 163            .add_request_handler(Server::follow)
 164            .add_message_handler(Server::unfollow)
 165            .add_message_handler(Server::update_followers)
 166            .add_request_handler(Server::get_channel_messages);
 167
 168        Arc::new(server)
 169    }
 170
 171    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 172    where
 173        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 174        Fut: 'static + Send + Future<Output = Result<()>>,
 175        M: EnvelopedMessage,
 176    {
 177        let prev_handler = self.handlers.insert(
 178            TypeId::of::<M>(),
 179            Box::new(move |server, envelope| {
 180                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 181                let span = info_span!(
 182                    "handle message",
 183                    payload_type = envelope.payload_type_name(),
 184                    payload = format!("{:?}", envelope.payload).as_str(),
 185                );
 186                let future = (handler)(server, *envelope);
 187                async move {
 188                    if let Err(error) = future.await {
 189                        tracing::error!(%error, "error handling message");
 190                    }
 191                }
 192                .instrument(span)
 193                .boxed()
 194            }),
 195        );
 196        if prev_handler.is_some() {
 197            panic!("registered a handler for the same message twice");
 198        }
 199        self
 200    }
 201
 202    /// Handle a request while holding a lock to the store. This is useful when we're registering
 203    /// a connection but we want to respond on the connection before anybody else can send on it.
 204    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 205    where
 206        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
 207        Fut: Send + Future<Output = Result<()>>,
 208        M: RequestMessage,
 209    {
 210        let handler = Arc::new(handler);
 211        self.add_message_handler(move |server, envelope| {
 212            let receipt = envelope.receipt();
 213            let handler = handler.clone();
 214            async move {
 215                let responded = Arc::new(AtomicBool::default());
 216                let response = Response {
 217                    server: server.clone(),
 218                    responded: responded.clone(),
 219                    receipt: envelope.receipt(),
 220                };
 221                match (handler)(server.clone(), envelope, response).await {
 222                    Ok(()) => {
 223                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 224                            Ok(())
 225                        } else {
 226                            Err(anyhow!("handler did not send a response"))?
 227                        }
 228                    }
 229                    Err(error) => {
 230                        server.peer.respond_with_error(
 231                            receipt,
 232                            proto::Error {
 233                                message: error.to_string(),
 234                            },
 235                        )?;
 236                        Err(error)
 237                    }
 238                }
 239            }
 240        })
 241    }
 242
 243    pub fn handle_connection<E: Executor>(
 244        self: &Arc<Self>,
 245        connection: Connection,
 246        address: String,
 247        user_id: UserId,
 248        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 249        executor: E,
 250    ) -> impl Future<Output = Result<()>> {
 251        let mut this = self.clone();
 252        let span = info_span!("handle connection", %user_id, %address);
 253        async move {
 254            let (connection_id, handle_io, mut incoming_rx) = this
 255                .peer
 256                .add_connection(connection, {
 257                    let executor = executor.clone();
 258                    move |duration| {
 259                        let timer = executor.sleep(duration);
 260                        async move {
 261                            timer.await;
 262                        }
 263                    }
 264                })
 265                .await;
 266
 267            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 268
 269            if let Some(send_connection_id) = send_connection_id.as_mut() {
 270                let _ = send_connection_id.send(connection_id).await;
 271            }
 272
 273            let contacts = this.app_state.db.get_contacts(user_id).await?;
 274
 275            {
 276                let mut store = this.store_mut().await;
 277                store.add_connection(connection_id, user_id);
 278                this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 279            }
 280            this.update_user_contacts(user_id).await?;
 281
 282            let handle_io = handle_io.fuse();
 283            futures::pin_mut!(handle_io);
 284            loop {
 285                let next_message = incoming_rx.next().fuse();
 286                futures::pin_mut!(next_message);
 287                futures::select_biased! {
 288                    result = handle_io => {
 289                        if let Err(error) = result {
 290                            tracing::error!(%error, "error handling I/O");
 291                        }
 292                        break;
 293                    }
 294                    message = next_message => {
 295                        if let Some(message) = message {
 296                            let type_name = message.payload_type_name();
 297                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 298                            async {
 299                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 300                                    let notifications = this.notifications.clone();
 301                                    let is_background = message.is_background();
 302                                    let handle_message = (handler)(this.clone(), message);
 303                                    let handle_message = async move {
 304                                        handle_message.await;
 305                                        if let Some(mut notifications) = notifications {
 306                                            let _ = notifications.send(()).await;
 307                                        }
 308                                    };
 309                                    if is_background {
 310                                        executor.spawn_detached(handle_message);
 311                                    } else {
 312                                        handle_message.await;
 313                                    }
 314                                } else {
 315                                    tracing::error!("no message handler");
 316                                }
 317                            }.instrument(span).await;
 318                        } else {
 319                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 320                            break;
 321                        }
 322                    }
 323                }
 324            }
 325
 326            if let Err(error) = this.sign_out(connection_id).await {
 327                tracing::error!(%error, "error signing out");
 328            }
 329
 330            Ok(())
 331        }.instrument(span)
 332    }
 333
 334    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 335        self.peer.disconnect(connection_id);
 336        let removed_connection = self.store_mut().await.remove_connection(connection_id)?;
 337
 338        for (project_id, project) in removed_connection.hosted_projects {
 339            if let Some(share) = project.share {
 340                broadcast(connection_id, share.guests.keys().copied(), |conn_id| {
 341                    self.peer
 342                        .send(conn_id, proto::UnshareProject { project_id })
 343                });
 344            }
 345        }
 346
 347        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 348            broadcast(connection_id, peer_ids, |conn_id| {
 349                self.peer.send(
 350                    conn_id,
 351                    proto::RemoveProjectCollaborator {
 352                        project_id,
 353                        peer_id: connection_id.0,
 354                    },
 355                )
 356            });
 357        }
 358
 359        self.update_user_contacts(removed_connection.user_id)
 360            .await?;
 361
 362        Ok(())
 363    }
 364
 365    async fn ping(
 366        self: Arc<Server>,
 367        _: TypedEnvelope<proto::Ping>,
 368        response: Response<proto::Ping>,
 369    ) -> Result<()> {
 370        response.send(proto::Ack {})?;
 371        Ok(())
 372    }
 373
 374    async fn register_project(
 375        self: Arc<Server>,
 376        request: TypedEnvelope<proto::RegisterProject>,
 377        response: Response<proto::RegisterProject>,
 378    ) -> Result<()> {
 379        let project_id = {
 380            let mut state = self.store_mut().await;
 381            let user_id = state.user_id_for_connection(request.sender_id)?;
 382            state.register_project(request.sender_id, user_id)
 383        };
 384        response.send(proto::RegisterProjectResponse { project_id })?;
 385        Ok(())
 386    }
 387
 388    async fn unregister_project(
 389        self: Arc<Server>,
 390        request: TypedEnvelope<proto::UnregisterProject>,
 391    ) -> Result<()> {
 392        let user_id = {
 393            let mut state = self.store_mut().await;
 394            state.unregister_project(request.payload.project_id, request.sender_id)?;
 395            state.user_id_for_connection(request.sender_id)?
 396        };
 397
 398        self.update_user_contacts(user_id).await?;
 399        Ok(())
 400    }
 401
 402    async fn share_project(
 403        self: Arc<Server>,
 404        request: TypedEnvelope<proto::ShareProject>,
 405        response: Response<proto::ShareProject>,
 406    ) -> Result<()> {
 407        let user_id = {
 408            let mut state = self.store_mut().await;
 409            state.share_project(request.payload.project_id, request.sender_id)?;
 410            state.user_id_for_connection(request.sender_id)?
 411        };
 412        self.update_user_contacts(user_id).await?;
 413        response.send(proto::Ack {})?;
 414        Ok(())
 415    }
 416
 417    async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
 418        let contacts = self.app_state.db.get_contacts(user_id).await?;
 419        let store = self.store().await;
 420        let updated_contact = store.contact_for_user(user_id);
 421        for contact_user_id in contacts.current {
 422            for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
 423                self.peer
 424                    .send(
 425                        contact_conn_id,
 426                        proto::UpdateContacts {
 427                            contacts: vec![updated_contact.clone()],
 428                            remove_contacts: Default::default(),
 429                            incoming_requests: Default::default(),
 430                            remove_incoming_requests: Default::default(),
 431                            outgoing_requests: Default::default(),
 432                            remove_outgoing_requests: Default::default(),
 433                        },
 434                    )
 435                    .trace_err();
 436            }
 437        }
 438        Ok(())
 439    }
 440
 441    async fn unshare_project(
 442        self: Arc<Server>,
 443        request: TypedEnvelope<proto::UnshareProject>,
 444    ) -> Result<()> {
 445        let project_id = request.payload.project_id;
 446        let project;
 447        {
 448            let mut state = self.store_mut().await;
 449            project = state.unshare_project(project_id, request.sender_id)?;
 450            broadcast(request.sender_id, project.connection_ids, |conn_id| {
 451                self.peer
 452                    .send(conn_id, proto::UnshareProject { project_id })
 453            });
 454        }
 455        self.update_user_contacts(project.host_user_id).await?;
 456        Ok(())
 457    }
 458
 459    async fn join_project(
 460        self: Arc<Server>,
 461        request: TypedEnvelope<proto::JoinProject>,
 462        response: Response<proto::JoinProject>,
 463    ) -> Result<()> {
 464        let project_id = request.payload.project_id;
 465        let host_user_id;
 466        let guest_user_id;
 467        {
 468            let state = self.store().await;
 469            host_user_id = state.project(project_id)?.host_user_id;
 470            guest_user_id = state.user_id_for_connection(request.sender_id)?;
 471        };
 472
 473        let guest_contacts = self.app_state.db.get_contacts(guest_user_id).await?;
 474        if !guest_contacts.current.contains(&host_user_id) {
 475            return Err(anyhow!("no such project"))?;
 476        }
 477
 478        {
 479            let state = &mut *self.store_mut().await;
 480            let joined = state.join_project(request.sender_id, guest_user_id, project_id)?;
 481            let share = joined.project.share()?;
 482            let peer_count = share.guests.len();
 483            let mut collaborators = Vec::with_capacity(peer_count);
 484            collaborators.push(proto::Collaborator {
 485                peer_id: joined.project.host_connection_id.0,
 486                replica_id: 0,
 487                user_id: joined.project.host_user_id.to_proto(),
 488            });
 489            let worktrees = share
 490                .worktrees
 491                .iter()
 492                .filter_map(|(id, shared_worktree)| {
 493                    let worktree = joined.project.worktrees.get(&id)?;
 494                    Some(proto::Worktree {
 495                        id: *id,
 496                        root_name: worktree.root_name.clone(),
 497                        entries: shared_worktree.entries.values().cloned().collect(),
 498                        diagnostic_summaries: shared_worktree
 499                            .diagnostic_summaries
 500                            .values()
 501                            .cloned()
 502                            .collect(),
 503                        visible: worktree.visible,
 504                        scan_id: shared_worktree.scan_id,
 505                    })
 506                })
 507                .collect();
 508            for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 509                if *peer_conn_id != request.sender_id {
 510                    collaborators.push(proto::Collaborator {
 511                        peer_id: peer_conn_id.0,
 512                        replica_id: *peer_replica_id as u32,
 513                        user_id: peer_user_id.to_proto(),
 514                    });
 515                }
 516            }
 517            broadcast(
 518                request.sender_id,
 519                joined.project.connection_ids(),
 520                |conn_id| {
 521                    self.peer.send(
 522                        conn_id,
 523                        proto::AddProjectCollaborator {
 524                            project_id,
 525                            collaborator: Some(proto::Collaborator {
 526                                peer_id: request.sender_id.0,
 527                                replica_id: joined.replica_id as u32,
 528                                user_id: guest_user_id.to_proto(),
 529                            }),
 530                        },
 531                    )
 532                },
 533            );
 534            response.send(proto::JoinProjectResponse {
 535                worktrees,
 536                replica_id: joined.replica_id as u32,
 537                collaborators,
 538                language_servers: joined.project.language_servers.clone(),
 539            })?;
 540        }
 541        self.update_user_contacts(host_user_id).await?;
 542        Ok(())
 543    }
 544
 545    async fn leave_project(
 546        self: Arc<Server>,
 547        request: TypedEnvelope<proto::LeaveProject>,
 548    ) -> Result<()> {
 549        let sender_id = request.sender_id;
 550        let project_id = request.payload.project_id;
 551        let project;
 552        {
 553            let mut state = self.store_mut().await;
 554            project = state.leave_project(sender_id, project_id)?;
 555            broadcast(sender_id, project.connection_ids, |conn_id| {
 556                self.peer.send(
 557                    conn_id,
 558                    proto::RemoveProjectCollaborator {
 559                        project_id,
 560                        peer_id: sender_id.0,
 561                    },
 562                )
 563            });
 564        }
 565        self.update_user_contacts(project.host_user_id).await?;
 566        Ok(())
 567    }
 568
 569    async fn register_worktree(
 570        self: Arc<Server>,
 571        request: TypedEnvelope<proto::RegisterWorktree>,
 572        response: Response<proto::RegisterWorktree>,
 573    ) -> Result<()> {
 574        let host_user_id;
 575        {
 576            let mut state = self.store_mut().await;
 577            host_user_id = state.user_id_for_connection(request.sender_id)?;
 578
 579            let guest_connection_ids = state
 580                .read_project(request.payload.project_id, request.sender_id)?
 581                .guest_connection_ids();
 582            state.register_worktree(
 583                request.payload.project_id,
 584                request.payload.worktree_id,
 585                request.sender_id,
 586                Worktree {
 587                    root_name: request.payload.root_name.clone(),
 588                    visible: request.payload.visible,
 589                },
 590            )?;
 591
 592            broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 593                self.peer
 594                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 595            });
 596        }
 597        self.update_user_contacts(host_user_id).await?;
 598        response.send(proto::Ack {})?;
 599        Ok(())
 600    }
 601
 602    async fn unregister_worktree(
 603        self: Arc<Server>,
 604        request: TypedEnvelope<proto::UnregisterWorktree>,
 605    ) -> Result<()> {
 606        let host_user_id;
 607        let project_id = request.payload.project_id;
 608        let worktree_id = request.payload.worktree_id;
 609        {
 610            let mut state = self.store_mut().await;
 611            let (_, guest_connection_ids) =
 612                state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 613            host_user_id = state.user_id_for_connection(request.sender_id)?;
 614            broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 615                self.peer.send(
 616                    conn_id,
 617                    proto::UnregisterWorktree {
 618                        project_id,
 619                        worktree_id,
 620                    },
 621                )
 622            });
 623        }
 624        self.update_user_contacts(host_user_id).await?;
 625        Ok(())
 626    }
 627
 628    async fn update_worktree(
 629        self: Arc<Server>,
 630        request: TypedEnvelope<proto::UpdateWorktree>,
 631        response: Response<proto::UpdateWorktree>,
 632    ) -> Result<()> {
 633        let connection_ids = self.store_mut().await.update_worktree(
 634            request.sender_id,
 635            request.payload.project_id,
 636            request.payload.worktree_id,
 637            &request.payload.removed_entries,
 638            &request.payload.updated_entries,
 639            request.payload.scan_id,
 640        )?;
 641
 642        broadcast(request.sender_id, connection_ids, |connection_id| {
 643            self.peer
 644                .forward_send(request.sender_id, connection_id, request.payload.clone())
 645        });
 646        response.send(proto::Ack {})?;
 647        Ok(())
 648    }
 649
 650    async fn update_diagnostic_summary(
 651        self: Arc<Server>,
 652        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 653    ) -> Result<()> {
 654        let summary = request
 655            .payload
 656            .summary
 657            .clone()
 658            .ok_or_else(|| anyhow!("invalid summary"))?;
 659        let receiver_ids = self.store_mut().await.update_diagnostic_summary(
 660            request.payload.project_id,
 661            request.payload.worktree_id,
 662            request.sender_id,
 663            summary,
 664        )?;
 665
 666        broadcast(request.sender_id, receiver_ids, |connection_id| {
 667            self.peer
 668                .forward_send(request.sender_id, connection_id, request.payload.clone())
 669        });
 670        Ok(())
 671    }
 672
 673    async fn start_language_server(
 674        self: Arc<Server>,
 675        request: TypedEnvelope<proto::StartLanguageServer>,
 676    ) -> Result<()> {
 677        let receiver_ids = self.store_mut().await.start_language_server(
 678            request.payload.project_id,
 679            request.sender_id,
 680            request
 681                .payload
 682                .server
 683                .clone()
 684                .ok_or_else(|| anyhow!("invalid language server"))?,
 685        )?;
 686        broadcast(request.sender_id, receiver_ids, |connection_id| {
 687            self.peer
 688                .forward_send(request.sender_id, connection_id, request.payload.clone())
 689        });
 690        Ok(())
 691    }
 692
 693    async fn update_language_server(
 694        self: Arc<Server>,
 695        request: TypedEnvelope<proto::UpdateLanguageServer>,
 696    ) -> Result<()> {
 697        let receiver_ids = self
 698            .store()
 699            .await
 700            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 701        broadcast(request.sender_id, receiver_ids, |connection_id| {
 702            self.peer
 703                .forward_send(request.sender_id, connection_id, request.payload.clone())
 704        });
 705        Ok(())
 706    }
 707
 708    async fn forward_project_request<T>(
 709        self: Arc<Server>,
 710        request: TypedEnvelope<T>,
 711        response: Response<T>,
 712    ) -> Result<()>
 713    where
 714        T: EntityMessage + RequestMessage,
 715    {
 716        let host_connection_id = self
 717            .store()
 718            .await
 719            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 720            .host_connection_id;
 721
 722        response.send(
 723            self.peer
 724                .forward_request(request.sender_id, host_connection_id, request.payload)
 725                .await?,
 726        )?;
 727        Ok(())
 728    }
 729
 730    async fn save_buffer(
 731        self: Arc<Server>,
 732        request: TypedEnvelope<proto::SaveBuffer>,
 733        response: Response<proto::SaveBuffer>,
 734    ) -> Result<()> {
 735        let host = self
 736            .store()
 737            .await
 738            .read_project(request.payload.project_id, request.sender_id)?
 739            .host_connection_id;
 740        let response_payload = self
 741            .peer
 742            .forward_request(request.sender_id, host, request.payload.clone())
 743            .await?;
 744
 745        let mut guests = self
 746            .store()
 747            .await
 748            .read_project(request.payload.project_id, request.sender_id)?
 749            .connection_ids();
 750        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 751        broadcast(host, guests, |conn_id| {
 752            self.peer
 753                .forward_send(host, conn_id, response_payload.clone())
 754        });
 755        response.send(response_payload)?;
 756        Ok(())
 757    }
 758
 759    async fn update_buffer(
 760        self: Arc<Server>,
 761        request: TypedEnvelope<proto::UpdateBuffer>,
 762        response: Response<proto::UpdateBuffer>,
 763    ) -> Result<()> {
 764        let receiver_ids = self
 765            .store()
 766            .await
 767            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 768        broadcast(request.sender_id, receiver_ids, |connection_id| {
 769            self.peer
 770                .forward_send(request.sender_id, connection_id, request.payload.clone())
 771        });
 772        response.send(proto::Ack {})?;
 773        Ok(())
 774    }
 775
 776    async fn update_buffer_file(
 777        self: Arc<Server>,
 778        request: TypedEnvelope<proto::UpdateBufferFile>,
 779    ) -> Result<()> {
 780        let receiver_ids = self
 781            .store()
 782            .await
 783            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 784        broadcast(request.sender_id, receiver_ids, |connection_id| {
 785            self.peer
 786                .forward_send(request.sender_id, connection_id, request.payload.clone())
 787        });
 788        Ok(())
 789    }
 790
 791    async fn buffer_reloaded(
 792        self: Arc<Server>,
 793        request: TypedEnvelope<proto::BufferReloaded>,
 794    ) -> Result<()> {
 795        let receiver_ids = self
 796            .store()
 797            .await
 798            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 799        broadcast(request.sender_id, receiver_ids, |connection_id| {
 800            self.peer
 801                .forward_send(request.sender_id, connection_id, request.payload.clone())
 802        });
 803        Ok(())
 804    }
 805
 806    async fn buffer_saved(
 807        self: Arc<Server>,
 808        request: TypedEnvelope<proto::BufferSaved>,
 809    ) -> Result<()> {
 810        let receiver_ids = self
 811            .store()
 812            .await
 813            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 814        broadcast(request.sender_id, receiver_ids, |connection_id| {
 815            self.peer
 816                .forward_send(request.sender_id, connection_id, request.payload.clone())
 817        });
 818        Ok(())
 819    }
 820
 821    async fn follow(
 822        self: Arc<Self>,
 823        request: TypedEnvelope<proto::Follow>,
 824        response: Response<proto::Follow>,
 825    ) -> Result<()> {
 826        let leader_id = ConnectionId(request.payload.leader_id);
 827        let follower_id = request.sender_id;
 828        if !self
 829            .store()
 830            .await
 831            .project_connection_ids(request.payload.project_id, follower_id)?
 832            .contains(&leader_id)
 833        {
 834            Err(anyhow!("no such peer"))?;
 835        }
 836        let mut response_payload = self
 837            .peer
 838            .forward_request(request.sender_id, leader_id, request.payload)
 839            .await?;
 840        response_payload
 841            .views
 842            .retain(|view| view.leader_id != Some(follower_id.0));
 843        response.send(response_payload)?;
 844        Ok(())
 845    }
 846
 847    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 848        let leader_id = ConnectionId(request.payload.leader_id);
 849        if !self
 850            .store()
 851            .await
 852            .project_connection_ids(request.payload.project_id, request.sender_id)?
 853            .contains(&leader_id)
 854        {
 855            Err(anyhow!("no such peer"))?;
 856        }
 857        self.peer
 858            .forward_send(request.sender_id, leader_id, request.payload)?;
 859        Ok(())
 860    }
 861
 862    async fn update_followers(
 863        self: Arc<Self>,
 864        request: TypedEnvelope<proto::UpdateFollowers>,
 865    ) -> Result<()> {
 866        let connection_ids = self
 867            .store()
 868            .await
 869            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 870        let leader_id = request
 871            .payload
 872            .variant
 873            .as_ref()
 874            .and_then(|variant| match variant {
 875                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 876                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 877                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 878            });
 879        for follower_id in &request.payload.follower_ids {
 880            let follower_id = ConnectionId(*follower_id);
 881            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 882                self.peer
 883                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 884            }
 885        }
 886        Ok(())
 887    }
 888
 889    async fn get_channels(
 890        self: Arc<Server>,
 891        request: TypedEnvelope<proto::GetChannels>,
 892        response: Response<proto::GetChannels>,
 893    ) -> Result<()> {
 894        let user_id = self
 895            .store()
 896            .await
 897            .user_id_for_connection(request.sender_id)?;
 898        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 899        response.send(proto::GetChannelsResponse {
 900            channels: channels
 901                .into_iter()
 902                .map(|chan| proto::Channel {
 903                    id: chan.id.to_proto(),
 904                    name: chan.name,
 905                })
 906                .collect(),
 907        })?;
 908        Ok(())
 909    }
 910
 911    async fn get_users(
 912        self: Arc<Server>,
 913        request: TypedEnvelope<proto::GetUsers>,
 914        response: Response<proto::GetUsers>,
 915    ) -> Result<()> {
 916        let user_ids = request
 917            .payload
 918            .user_ids
 919            .into_iter()
 920            .map(UserId::from_proto)
 921            .collect();
 922        let users = self
 923            .app_state
 924            .db
 925            .get_users_by_ids(user_ids)
 926            .await?
 927            .into_iter()
 928            .map(|user| proto::User {
 929                id: user.id.to_proto(),
 930                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 931                github_login: user.github_login,
 932            })
 933            .collect();
 934        response.send(proto::UsersResponse { users })?;
 935        Ok(())
 936    }
 937
 938    async fn fuzzy_search_users(
 939        self: Arc<Server>,
 940        request: TypedEnvelope<proto::FuzzySearchUsers>,
 941        response: Response<proto::FuzzySearchUsers>,
 942    ) -> Result<()> {
 943        let user_id = self
 944            .store()
 945            .await
 946            .user_id_for_connection(request.sender_id)?;
 947        let query = request.payload.query;
 948        let db = &self.app_state.db;
 949        let users = match query.len() {
 950            0 => vec![],
 951            1 | 2 => db
 952                .get_user_by_github_login(&query)
 953                .await?
 954                .into_iter()
 955                .collect(),
 956            _ => db.fuzzy_search_users(&query, 10).await?,
 957        };
 958        let users = users
 959            .into_iter()
 960            .filter(|user| user.id != user_id)
 961            .map(|user| proto::User {
 962                id: user.id.to_proto(),
 963                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 964                github_login: user.github_login,
 965            })
 966            .collect();
 967        response.send(proto::UsersResponse { users })?;
 968        Ok(())
 969    }
 970
 971    async fn request_contact(
 972        self: Arc<Server>,
 973        request: TypedEnvelope<proto::RequestContact>,
 974        response: Response<proto::RequestContact>,
 975    ) -> Result<()> {
 976        let requester_id = self
 977            .store()
 978            .await
 979            .user_id_for_connection(request.sender_id)?;
 980        let responder_id = UserId::from_proto(request.payload.responder_id);
 981        if requester_id == responder_id {
 982            return Err(anyhow!("cannot add yourself as a contact"))?;
 983        }
 984
 985        self.app_state
 986            .db
 987            .send_contact_request(requester_id, responder_id)
 988            .await?;
 989
 990        // Update outgoing contact requests of requester
 991        let mut update = proto::UpdateContacts::default();
 992        update.outgoing_requests.push(responder_id.to_proto());
 993        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
 994            self.peer.send(connection_id, update.clone())?;
 995        }
 996
 997        // Update incoming contact requests of responder
 998        let mut update = proto::UpdateContacts::default();
 999        update
1000            .incoming_requests
1001            .push(proto::IncomingContactRequest {
1002                requester_id: requester_id.to_proto(),
1003                should_notify: true,
1004            });
1005        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1006            self.peer.send(connection_id, update.clone())?;
1007        }
1008
1009        response.send(proto::Ack {})?;
1010        Ok(())
1011    }
1012
1013    async fn respond_to_contact_request(
1014        self: Arc<Server>,
1015        request: TypedEnvelope<proto::RespondToContactRequest>,
1016        response: Response<proto::RespondToContactRequest>,
1017    ) -> Result<()> {
1018        let responder_id = self
1019            .store()
1020            .await
1021            .user_id_for_connection(request.sender_id)?;
1022        let requester_id = UserId::from_proto(request.payload.requester_id);
1023        let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1024        self.app_state
1025            .db
1026            .respond_to_contact_request(responder_id, requester_id, accept)
1027            .await?;
1028
1029        let store = self.store().await;
1030        // Update responder with new contact
1031        let mut update = proto::UpdateContacts::default();
1032        if accept {
1033            update.contacts.push(store.contact_for_user(requester_id));
1034        }
1035        update
1036            .remove_incoming_requests
1037            .push(requester_id.to_proto());
1038        for connection_id in store.connection_ids_for_user(responder_id) {
1039            self.peer.send(connection_id, update.clone())?;
1040        }
1041
1042        // Update requester with new contact
1043        let mut update = proto::UpdateContacts::default();
1044        if accept {
1045            update.contacts.push(store.contact_for_user(responder_id));
1046        }
1047        update
1048            .remove_outgoing_requests
1049            .push(responder_id.to_proto());
1050        for connection_id in store.connection_ids_for_user(requester_id) {
1051            self.peer.send(connection_id, update.clone())?;
1052        }
1053
1054        response.send(proto::Ack {})?;
1055        Ok(())
1056    }
1057
1058    async fn remove_contact(
1059        self: Arc<Server>,
1060        request: TypedEnvelope<proto::RemoveContact>,
1061        response: Response<proto::RemoveContact>,
1062    ) -> Result<()> {
1063        let requester_id = self
1064            .store()
1065            .await
1066            .user_id_for_connection(request.sender_id)?;
1067        let responder_id = UserId::from_proto(request.payload.user_id);
1068        self.app_state
1069            .db
1070            .remove_contact(requester_id, responder_id)
1071            .await?;
1072
1073        // Update outgoing contact requests of requester
1074        let mut update = proto::UpdateContacts::default();
1075        update
1076            .remove_outgoing_requests
1077            .push(responder_id.to_proto());
1078        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1079            self.peer.send(connection_id, update.clone())?;
1080        }
1081
1082        // Update incoming contact requests of responder
1083        let mut update = proto::UpdateContacts::default();
1084        update
1085            .remove_incoming_requests
1086            .push(requester_id.to_proto());
1087        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1088            self.peer.send(connection_id, update.clone())?;
1089        }
1090
1091        response.send(proto::Ack {})?;
1092        Ok(())
1093    }
1094
1095    // #[instrument(skip(self, state, user_ids))]
1096    // fn update_contacts_for_users<'a>(
1097    //     self: &Arc<Self>,
1098    //     state: &Store,
1099    //     user_ids: impl IntoIterator<Item = &'a UserId>,
1100    // ) {
1101    //     for user_id in user_ids {
1102    //         let contacts = state.contacts_for_user(*user_id);
1103    //         for connection_id in state.connection_ids_for_user(*user_id) {
1104    //             self.peer
1105    //                 .send(
1106    //                     connection_id,
1107    //                     proto::UpdateContacts {
1108    //                         contacts: contacts.clone(),
1109    //                         pending_requests_from_user_ids: Default::default(),
1110    //                         pending_requests_to_user_ids: Default::default(),
1111    //                     },
1112    //                 )
1113    //                 .trace_err();
1114    //         }
1115    //     }
1116    // }
1117
1118    async fn join_channel(
1119        self: Arc<Self>,
1120        request: TypedEnvelope<proto::JoinChannel>,
1121        response: Response<proto::JoinChannel>,
1122    ) -> Result<()> {
1123        let user_id = self
1124            .store()
1125            .await
1126            .user_id_for_connection(request.sender_id)?;
1127        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1128        if !self
1129            .app_state
1130            .db
1131            .can_user_access_channel(user_id, channel_id)
1132            .await?
1133        {
1134            Err(anyhow!("access denied"))?;
1135        }
1136
1137        self.store_mut()
1138            .await
1139            .join_channel(request.sender_id, channel_id);
1140        let messages = self
1141            .app_state
1142            .db
1143            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1144            .await?
1145            .into_iter()
1146            .map(|msg| proto::ChannelMessage {
1147                id: msg.id.to_proto(),
1148                body: msg.body,
1149                timestamp: msg.sent_at.unix_timestamp() as u64,
1150                sender_id: msg.sender_id.to_proto(),
1151                nonce: Some(msg.nonce.as_u128().into()),
1152            })
1153            .collect::<Vec<_>>();
1154        response.send(proto::JoinChannelResponse {
1155            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1156            messages,
1157        })?;
1158        Ok(())
1159    }
1160
1161    async fn leave_channel(
1162        self: Arc<Self>,
1163        request: TypedEnvelope<proto::LeaveChannel>,
1164    ) -> Result<()> {
1165        let user_id = self
1166            .store()
1167            .await
1168            .user_id_for_connection(request.sender_id)?;
1169        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1170        if !self
1171            .app_state
1172            .db
1173            .can_user_access_channel(user_id, channel_id)
1174            .await?
1175        {
1176            Err(anyhow!("access denied"))?;
1177        }
1178
1179        self.store_mut()
1180            .await
1181            .leave_channel(request.sender_id, channel_id);
1182
1183        Ok(())
1184    }
1185
1186    async fn send_channel_message(
1187        self: Arc<Self>,
1188        request: TypedEnvelope<proto::SendChannelMessage>,
1189        response: Response<proto::SendChannelMessage>,
1190    ) -> Result<()> {
1191        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1192        let user_id;
1193        let connection_ids;
1194        {
1195            let state = self.store().await;
1196            user_id = state.user_id_for_connection(request.sender_id)?;
1197            connection_ids = state.channel_connection_ids(channel_id)?;
1198        }
1199
1200        // Validate the message body.
1201        let body = request.payload.body.trim().to_string();
1202        if body.len() > MAX_MESSAGE_LEN {
1203            return Err(anyhow!("message is too long"))?;
1204        }
1205        if body.is_empty() {
1206            return Err(anyhow!("message can't be blank"))?;
1207        }
1208
1209        let timestamp = OffsetDateTime::now_utc();
1210        let nonce = request
1211            .payload
1212            .nonce
1213            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1214
1215        let message_id = self
1216            .app_state
1217            .db
1218            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1219            .await?
1220            .to_proto();
1221        let message = proto::ChannelMessage {
1222            sender_id: user_id.to_proto(),
1223            id: message_id,
1224            body,
1225            timestamp: timestamp.unix_timestamp() as u64,
1226            nonce: Some(nonce),
1227        };
1228        broadcast(request.sender_id, connection_ids, |conn_id| {
1229            self.peer.send(
1230                conn_id,
1231                proto::ChannelMessageSent {
1232                    channel_id: channel_id.to_proto(),
1233                    message: Some(message.clone()),
1234                },
1235            )
1236        });
1237        response.send(proto::SendChannelMessageResponse {
1238            message: Some(message),
1239        })?;
1240        Ok(())
1241    }
1242
1243    async fn get_channel_messages(
1244        self: Arc<Self>,
1245        request: TypedEnvelope<proto::GetChannelMessages>,
1246        response: Response<proto::GetChannelMessages>,
1247    ) -> Result<()> {
1248        let user_id = self
1249            .store()
1250            .await
1251            .user_id_for_connection(request.sender_id)?;
1252        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1253        if !self
1254            .app_state
1255            .db
1256            .can_user_access_channel(user_id, channel_id)
1257            .await?
1258        {
1259            Err(anyhow!("access denied"))?;
1260        }
1261
1262        let messages = self
1263            .app_state
1264            .db
1265            .get_channel_messages(
1266                channel_id,
1267                MESSAGE_COUNT_PER_PAGE,
1268                Some(MessageId::from_proto(request.payload.before_message_id)),
1269            )
1270            .await?
1271            .into_iter()
1272            .map(|msg| proto::ChannelMessage {
1273                id: msg.id.to_proto(),
1274                body: msg.body,
1275                timestamp: msg.sent_at.unix_timestamp() as u64,
1276                sender_id: msg.sender_id.to_proto(),
1277                nonce: Some(msg.nonce.as_u128().into()),
1278            })
1279            .collect::<Vec<_>>();
1280        response.send(proto::GetChannelMessagesResponse {
1281            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1282            messages,
1283        })?;
1284        Ok(())
1285    }
1286
1287    async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1288        #[cfg(test)]
1289        tokio::task::yield_now().await;
1290        let guard = self.store.read().await;
1291        #[cfg(test)]
1292        tokio::task::yield_now().await;
1293        StoreReadGuard {
1294            guard,
1295            _not_send: PhantomData,
1296        }
1297    }
1298
1299    async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1300        #[cfg(test)]
1301        tokio::task::yield_now().await;
1302        let guard = self.store.write().await;
1303        #[cfg(test)]
1304        tokio::task::yield_now().await;
1305        StoreWriteGuard {
1306            guard,
1307            _not_send: PhantomData,
1308        }
1309    }
1310}
1311
1312impl<'a> Deref for StoreReadGuard<'a> {
1313    type Target = Store;
1314
1315    fn deref(&self) -> &Self::Target {
1316        &*self.guard
1317    }
1318}
1319
1320impl<'a> Deref for StoreWriteGuard<'a> {
1321    type Target = Store;
1322
1323    fn deref(&self) -> &Self::Target {
1324        &*self.guard
1325    }
1326}
1327
1328impl<'a> DerefMut for StoreWriteGuard<'a> {
1329    fn deref_mut(&mut self) -> &mut Self::Target {
1330        &mut *self.guard
1331    }
1332}
1333
1334impl<'a> Drop for StoreWriteGuard<'a> {
1335    fn drop(&mut self) {
1336        #[cfg(test)]
1337        self.check_invariants();
1338
1339        let metrics = self.metrics();
1340        tracing::info!(
1341            connections = metrics.connections,
1342            registered_projects = metrics.registered_projects,
1343            shared_projects = metrics.shared_projects,
1344            "metrics"
1345        );
1346    }
1347}
1348
1349impl Executor for RealExecutor {
1350    type Sleep = Sleep;
1351
1352    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1353        tokio::task::spawn(future);
1354    }
1355
1356    fn sleep(&self, duration: Duration) -> Self::Sleep {
1357        tokio::time::sleep(duration)
1358    }
1359}
1360
1361fn broadcast<F>(
1362    sender_id: ConnectionId,
1363    receiver_ids: impl IntoIterator<Item = ConnectionId>,
1364    mut f: F,
1365) where
1366    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1367{
1368    for receiver_id in receiver_ids {
1369        if receiver_id != sender_id {
1370            f(receiver_id).trace_err();
1371        }
1372    }
1373}
1374
1375lazy_static! {
1376    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1377}
1378
1379pub struct ProtocolVersion(u32);
1380
1381impl Header for ProtocolVersion {
1382    fn name() -> &'static HeaderName {
1383        &ZED_PROTOCOL_VERSION
1384    }
1385
1386    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1387    where
1388        Self: Sized,
1389        I: Iterator<Item = &'i axum::http::HeaderValue>,
1390    {
1391        let version = values
1392            .next()
1393            .ok_or_else(|| axum::headers::Error::invalid())?
1394            .to_str()
1395            .map_err(|_| axum::headers::Error::invalid())?
1396            .parse()
1397            .map_err(|_| axum::headers::Error::invalid())?;
1398        Ok(Self(version))
1399    }
1400
1401    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1402        values.extend([self.0.to_string().parse().unwrap()]);
1403    }
1404}
1405
1406pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1407    let server = Server::new(app_state.clone(), None);
1408    Router::new()
1409        .route("/rpc", get(handle_websocket_request))
1410        .layer(
1411            ServiceBuilder::new()
1412                .layer(Extension(app_state))
1413                .layer(middleware::from_fn(auth::validate_header))
1414                .layer(Extension(server)),
1415        )
1416}
1417
1418pub async fn handle_websocket_request(
1419    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1420    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1421    Extension(server): Extension<Arc<Server>>,
1422    Extension(user_id): Extension<UserId>,
1423    ws: WebSocketUpgrade,
1424) -> axum::response::Response {
1425    if protocol_version != rpc::PROTOCOL_VERSION {
1426        return (
1427            StatusCode::UPGRADE_REQUIRED,
1428            "client must be upgraded".to_string(),
1429        )
1430            .into_response();
1431    }
1432    let socket_address = socket_address.to_string();
1433    ws.on_upgrade(move |socket| {
1434        use util::ResultExt;
1435        let socket = socket
1436            .map_ok(to_tungstenite_message)
1437            .err_into()
1438            .with(|message| async move { Ok(to_axum_message(message)) });
1439        let connection = Connection::new(Box::pin(socket));
1440        async move {
1441            server
1442                .handle_connection(connection, socket_address, user_id, None, RealExecutor)
1443                .await
1444                .log_err();
1445        }
1446    })
1447}
1448
1449fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1450    match message {
1451        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1452        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1453        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1454        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1455        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1456            code: frame.code.into(),
1457            reason: frame.reason,
1458        })),
1459    }
1460}
1461
1462fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1463    match message {
1464        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1465        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1466        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1467        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1468        AxumMessage::Close(frame) => {
1469            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1470                code: frame.code.into(),
1471                reason: frame.reason,
1472            }))
1473        }
1474    }
1475}
1476
1477pub trait ResultExt {
1478    type Ok;
1479
1480    fn trace_err(self) -> Option<Self::Ok>;
1481}
1482
1483impl<T, E> ResultExt for Result<T, E>
1484where
1485    E: std::fmt::Debug,
1486{
1487    type Ok = T;
1488
1489    fn trace_err(self) -> Option<T> {
1490        match self {
1491            Ok(value) => Some(value),
1492            Err(error) => {
1493                tracing::error!("{:?}", error);
1494                None
1495            }
1496        }
1497    }
1498}
1499
1500#[cfg(test)]
1501mod tests {
1502    use super::*;
1503    use crate::{
1504        db::{tests::TestDb, UserId},
1505        AppState,
1506    };
1507    use ::rpc::Peer;
1508    use client::{
1509        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1510        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1511    };
1512    use collections::{BTreeMap, HashSet};
1513    use editor::{
1514        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1515        ToOffset, ToggleCodeActions, Undo,
1516    };
1517    use gpui::{
1518        executor::{self, Deterministic},
1519        geometry::vector::vec2f,
1520        ModelHandle, TestAppContext, ViewHandle,
1521    };
1522    use language::{
1523        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1524        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1525    };
1526    use lsp::{self, FakeLanguageServer};
1527    use parking_lot::Mutex;
1528    use project::{
1529        fs::{FakeFs, Fs as _},
1530        search::SearchQuery,
1531        worktree::WorktreeHandle,
1532        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1533    };
1534    use rand::prelude::*;
1535    use rpc::PeerId;
1536    use serde_json::json;
1537    use settings::Settings;
1538    use sqlx::types::time::OffsetDateTime;
1539    use std::{
1540        env,
1541        ops::Deref,
1542        path::{Path, PathBuf},
1543        rc::Rc,
1544        sync::{
1545            atomic::{AtomicBool, Ordering::SeqCst},
1546            Arc,
1547        },
1548        time::Duration,
1549    };
1550    use theme::ThemeRegistry;
1551    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1552
1553    #[cfg(test)]
1554    #[ctor::ctor]
1555    fn init_logger() {
1556        if std::env::var("RUST_LOG").is_ok() {
1557            env_logger::init();
1558        }
1559    }
1560
1561    #[gpui::test(iterations = 10)]
1562    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1563        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1564        let lang_registry = Arc::new(LanguageRegistry::test());
1565        let fs = FakeFs::new(cx_a.background());
1566        cx_a.foreground().forbid_parking();
1567
1568        // Connect to a server as 2 clients.
1569        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1570        let client_a = server.create_client(cx_a, "user_a").await;
1571        let client_b = server.create_client(cx_b, "user_b").await;
1572        server
1573            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1574            .await;
1575
1576        // Share a project as client A
1577        fs.insert_tree(
1578            "/a",
1579            json!({
1580                "a.txt": "a-contents",
1581                "b.txt": "b-contents",
1582            }),
1583        )
1584        .await;
1585        let project_a = cx_a.update(|cx| {
1586            Project::local(
1587                client_a.clone(),
1588                client_a.user_store.clone(),
1589                lang_registry.clone(),
1590                fs.clone(),
1591                cx,
1592            )
1593        });
1594        let (worktree_a, _) = project_a
1595            .update(cx_a, |p, cx| {
1596                p.find_or_create_local_worktree("/a", true, cx)
1597            })
1598            .await
1599            .unwrap();
1600        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1601        worktree_a
1602            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1603            .await;
1604        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1605        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1606
1607        // Join that project as client B
1608        let project_b = Project::remote(
1609            project_id,
1610            client_b.clone(),
1611            client_b.user_store.clone(),
1612            lang_registry.clone(),
1613            fs.clone(),
1614            &mut cx_b.to_async(),
1615        )
1616        .await
1617        .unwrap();
1618
1619        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1620            assert_eq!(
1621                project
1622                    .collaborators()
1623                    .get(&client_a.peer_id)
1624                    .unwrap()
1625                    .user
1626                    .github_login,
1627                "user_a"
1628            );
1629            project.replica_id()
1630        });
1631        project_a
1632            .condition(&cx_a, |tree, _| {
1633                tree.collaborators()
1634                    .get(&client_b.peer_id)
1635                    .map_or(false, |collaborator| {
1636                        collaborator.replica_id == replica_id_b
1637                            && collaborator.user.github_login == "user_b"
1638                    })
1639            })
1640            .await;
1641
1642        // Open the same file as client B and client A.
1643        let buffer_b = project_b
1644            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1645            .await
1646            .unwrap();
1647        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1648        project_a.read_with(cx_a, |project, cx| {
1649            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1650        });
1651        let buffer_a = project_a
1652            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1653            .await
1654            .unwrap();
1655
1656        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1657
1658        // TODO
1659        // // Create a selection set as client B and see that selection set as client A.
1660        // buffer_a
1661        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1662        //     .await;
1663
1664        // Edit the buffer as client B and see that edit as client A.
1665        editor_b.update(cx_b, |editor, cx| {
1666            editor.handle_input(&Input("ok, ".into()), cx)
1667        });
1668        buffer_a
1669            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1670            .await;
1671
1672        // TODO
1673        // // Remove the selection set as client B, see those selections disappear as client A.
1674        cx_b.update(move |_| drop(editor_b));
1675        // buffer_a
1676        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1677        //     .await;
1678
1679        // Dropping the client B's project removes client B from client A's collaborators.
1680        cx_b.update(move |_| drop(project_b));
1681        project_a
1682            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1683            .await;
1684    }
1685
1686    #[gpui::test(iterations = 10)]
1687    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1688        let lang_registry = Arc::new(LanguageRegistry::test());
1689        let fs = FakeFs::new(cx_a.background());
1690        cx_a.foreground().forbid_parking();
1691
1692        // Connect to a server as 2 clients.
1693        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1694        let client_a = server.create_client(cx_a, "user_a").await;
1695        let client_b = server.create_client(cx_b, "user_b").await;
1696        server
1697            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1698            .await;
1699
1700        // Share a project as client A
1701        fs.insert_tree(
1702            "/a",
1703            json!({
1704                "a.txt": "a-contents",
1705                "b.txt": "b-contents",
1706            }),
1707        )
1708        .await;
1709        let project_a = cx_a.update(|cx| {
1710            Project::local(
1711                client_a.clone(),
1712                client_a.user_store.clone(),
1713                lang_registry.clone(),
1714                fs.clone(),
1715                cx,
1716            )
1717        });
1718        let (worktree_a, _) = project_a
1719            .update(cx_a, |p, cx| {
1720                p.find_or_create_local_worktree("/a", true, cx)
1721            })
1722            .await
1723            .unwrap();
1724        worktree_a
1725            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1726            .await;
1727        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1728        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1729        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1730        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1731
1732        // Join that project as client B
1733        let project_b = Project::remote(
1734            project_id,
1735            client_b.clone(),
1736            client_b.user_store.clone(),
1737            lang_registry.clone(),
1738            fs.clone(),
1739            &mut cx_b.to_async(),
1740        )
1741        .await
1742        .unwrap();
1743        project_b
1744            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1745            .await
1746            .unwrap();
1747
1748        // Unshare the project as client A
1749        project_a.update(cx_a, |project, cx| project.unshare(cx));
1750        project_b
1751            .condition(cx_b, |project, _| project.is_read_only())
1752            .await;
1753        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1754        cx_b.update(|_| {
1755            drop(project_b);
1756        });
1757
1758        // Share the project again and ensure guests can still join.
1759        project_a
1760            .update(cx_a, |project, cx| project.share(cx))
1761            .await
1762            .unwrap();
1763        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1764
1765        let project_b2 = Project::remote(
1766            project_id,
1767            client_b.clone(),
1768            client_b.user_store.clone(),
1769            lang_registry.clone(),
1770            fs.clone(),
1771            &mut cx_b.to_async(),
1772        )
1773        .await
1774        .unwrap();
1775        project_b2
1776            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1777            .await
1778            .unwrap();
1779    }
1780
1781    #[gpui::test(iterations = 10)]
1782    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1783        let lang_registry = Arc::new(LanguageRegistry::test());
1784        let fs = FakeFs::new(cx_a.background());
1785        cx_a.foreground().forbid_parking();
1786
1787        // Connect to a server as 2 clients.
1788        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1789        let client_a = server.create_client(cx_a, "user_a").await;
1790        let client_b = server.create_client(cx_b, "user_b").await;
1791        server
1792            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1793            .await;
1794
1795        // Share a project as client A
1796        fs.insert_tree(
1797            "/a",
1798            json!({
1799                "a.txt": "a-contents",
1800                "b.txt": "b-contents",
1801            }),
1802        )
1803        .await;
1804        let project_a = cx_a.update(|cx| {
1805            Project::local(
1806                client_a.clone(),
1807                client_a.user_store.clone(),
1808                lang_registry.clone(),
1809                fs.clone(),
1810                cx,
1811            )
1812        });
1813        let (worktree_a, _) = project_a
1814            .update(cx_a, |p, cx| {
1815                p.find_or_create_local_worktree("/a", true, cx)
1816            })
1817            .await
1818            .unwrap();
1819        worktree_a
1820            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1821            .await;
1822        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1823        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1824        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1825        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1826
1827        // Join that project as client B
1828        let project_b = Project::remote(
1829            project_id,
1830            client_b.clone(),
1831            client_b.user_store.clone(),
1832            lang_registry.clone(),
1833            fs.clone(),
1834            &mut cx_b.to_async(),
1835        )
1836        .await
1837        .unwrap();
1838        project_b
1839            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1840            .await
1841            .unwrap();
1842
1843        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1844        server.disconnect_client(client_a.current_user_id(cx_a));
1845        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1846        project_a
1847            .condition(cx_a, |project, _| project.collaborators().is_empty())
1848            .await;
1849        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1850        project_b
1851            .condition(cx_b, |project, _| project.is_read_only())
1852            .await;
1853        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1854        cx_b.update(|_| {
1855            drop(project_b);
1856        });
1857
1858        // Await reconnection
1859        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1860
1861        // Share the project again and ensure guests can still join.
1862        project_a
1863            .update(cx_a, |project, cx| project.share(cx))
1864            .await
1865            .unwrap();
1866        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1867
1868        let project_b2 = Project::remote(
1869            project_id,
1870            client_b.clone(),
1871            client_b.user_store.clone(),
1872            lang_registry.clone(),
1873            fs.clone(),
1874            &mut cx_b.to_async(),
1875        )
1876        .await
1877        .unwrap();
1878        project_b2
1879            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1880            .await
1881            .unwrap();
1882    }
1883
1884    #[gpui::test(iterations = 10)]
1885    async fn test_propagate_saves_and_fs_changes(
1886        cx_a: &mut TestAppContext,
1887        cx_b: &mut TestAppContext,
1888        cx_c: &mut TestAppContext,
1889    ) {
1890        let lang_registry = Arc::new(LanguageRegistry::test());
1891        let fs = FakeFs::new(cx_a.background());
1892        cx_a.foreground().forbid_parking();
1893
1894        // Connect to a server as 3 clients.
1895        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1896        let client_a = server.create_client(cx_a, "user_a").await;
1897        let client_b = server.create_client(cx_b, "user_b").await;
1898        let client_c = server.create_client(cx_c, "user_c").await;
1899        server
1900            .make_contacts(vec![
1901                (&client_a, cx_a),
1902                (&client_b, cx_b),
1903                (&client_c, cx_c),
1904            ])
1905            .await;
1906
1907        // Share a worktree as client A.
1908        fs.insert_tree(
1909            "/a",
1910            json!({
1911                "file1": "",
1912                "file2": ""
1913            }),
1914        )
1915        .await;
1916        let project_a = cx_a.update(|cx| {
1917            Project::local(
1918                client_a.clone(),
1919                client_a.user_store.clone(),
1920                lang_registry.clone(),
1921                fs.clone(),
1922                cx,
1923            )
1924        });
1925        let (worktree_a, _) = project_a
1926            .update(cx_a, |p, cx| {
1927                p.find_or_create_local_worktree("/a", true, cx)
1928            })
1929            .await
1930            .unwrap();
1931        worktree_a
1932            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1933            .await;
1934        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1935        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1936        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1937
1938        // Join that worktree as clients B and C.
1939        let project_b = Project::remote(
1940            project_id,
1941            client_b.clone(),
1942            client_b.user_store.clone(),
1943            lang_registry.clone(),
1944            fs.clone(),
1945            &mut cx_b.to_async(),
1946        )
1947        .await
1948        .unwrap();
1949        let project_c = Project::remote(
1950            project_id,
1951            client_c.clone(),
1952            client_c.user_store.clone(),
1953            lang_registry.clone(),
1954            fs.clone(),
1955            &mut cx_c.to_async(),
1956        )
1957        .await
1958        .unwrap();
1959        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1960        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1961
1962        // Open and edit a buffer as both guests B and C.
1963        let buffer_b = project_b
1964            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1965            .await
1966            .unwrap();
1967        let buffer_c = project_c
1968            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1969            .await
1970            .unwrap();
1971        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1972        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1973
1974        // Open and edit that buffer as the host.
1975        let buffer_a = project_a
1976            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1977            .await
1978            .unwrap();
1979
1980        buffer_a
1981            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1982            .await;
1983        buffer_a.update(cx_a, |buf, cx| {
1984            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1985        });
1986
1987        // Wait for edits to propagate
1988        buffer_a
1989            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1990            .await;
1991        buffer_b
1992            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1993            .await;
1994        buffer_c
1995            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1996            .await;
1997
1998        // Edit the buffer as the host and concurrently save as guest B.
1999        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2000        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2001        save_b.await.unwrap();
2002        assert_eq!(
2003            fs.load("/a/file1".as_ref()).await.unwrap(),
2004            "hi-a, i-am-c, i-am-b, i-am-a"
2005        );
2006        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2007        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2008        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2009
2010        worktree_a.flush_fs_events(cx_a).await;
2011
2012        // Make changes on host's file system, see those changes on guest worktrees.
2013        fs.rename(
2014            "/a/file1".as_ref(),
2015            "/a/file1-renamed".as_ref(),
2016            Default::default(),
2017        )
2018        .await
2019        .unwrap();
2020
2021        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2022            .await
2023            .unwrap();
2024        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2025
2026        worktree_a
2027            .condition(&cx_a, |tree, _| {
2028                tree.paths()
2029                    .map(|p| p.to_string_lossy())
2030                    .collect::<Vec<_>>()
2031                    == ["file1-renamed", "file3", "file4"]
2032            })
2033            .await;
2034        worktree_b
2035            .condition(&cx_b, |tree, _| {
2036                tree.paths()
2037                    .map(|p| p.to_string_lossy())
2038                    .collect::<Vec<_>>()
2039                    == ["file1-renamed", "file3", "file4"]
2040            })
2041            .await;
2042        worktree_c
2043            .condition(&cx_c, |tree, _| {
2044                tree.paths()
2045                    .map(|p| p.to_string_lossy())
2046                    .collect::<Vec<_>>()
2047                    == ["file1-renamed", "file3", "file4"]
2048            })
2049            .await;
2050
2051        // Ensure buffer files are updated as well.
2052        buffer_a
2053            .condition(&cx_a, |buf, _| {
2054                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2055            })
2056            .await;
2057        buffer_b
2058            .condition(&cx_b, |buf, _| {
2059                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2060            })
2061            .await;
2062        buffer_c
2063            .condition(&cx_c, |buf, _| {
2064                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2065            })
2066            .await;
2067    }
2068
2069    #[gpui::test(iterations = 10)]
2070    async fn test_fs_operations(
2071        executor: Arc<Deterministic>,
2072        cx_a: &mut TestAppContext,
2073        cx_b: &mut TestAppContext,
2074    ) {
2075        executor.forbid_parking();
2076        let fs = FakeFs::new(cx_a.background());
2077
2078        // Connect to a server as 2 clients.
2079        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2080        let mut client_a = server.create_client(cx_a, "user_a").await;
2081        let mut client_b = server.create_client(cx_b, "user_b").await;
2082        server
2083            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2084            .await;
2085
2086        // Share a project as client A
2087        fs.insert_tree(
2088            "/dir",
2089            json!({
2090                "a.txt": "a-contents",
2091                "b.txt": "b-contents",
2092            }),
2093        )
2094        .await;
2095
2096        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2097        let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
2098        project_a
2099            .update(cx_a, |project, cx| project.share(cx))
2100            .await
2101            .unwrap();
2102
2103        let project_b = client_b.build_remote_project(project_id, cx_b).await;
2104
2105        let worktree_a =
2106            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2107        let worktree_b =
2108            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2109
2110        let entry = project_b
2111            .update(cx_b, |project, cx| {
2112                project
2113                    .create_entry((worktree_id, "c.txt"), false, cx)
2114                    .unwrap()
2115            })
2116            .await
2117            .unwrap();
2118        worktree_a.read_with(cx_a, |worktree, _| {
2119            assert_eq!(
2120                worktree
2121                    .paths()
2122                    .map(|p| p.to_string_lossy())
2123                    .collect::<Vec<_>>(),
2124                ["a.txt", "b.txt", "c.txt"]
2125            );
2126        });
2127        worktree_b.read_with(cx_b, |worktree, _| {
2128            assert_eq!(
2129                worktree
2130                    .paths()
2131                    .map(|p| p.to_string_lossy())
2132                    .collect::<Vec<_>>(),
2133                ["a.txt", "b.txt", "c.txt"]
2134            );
2135        });
2136
2137        project_b
2138            .update(cx_b, |project, cx| {
2139                project.rename_entry(entry.id, Path::new("d.txt"), cx)
2140            })
2141            .unwrap()
2142            .await
2143            .unwrap();
2144        worktree_a.read_with(cx_a, |worktree, _| {
2145            assert_eq!(
2146                worktree
2147                    .paths()
2148                    .map(|p| p.to_string_lossy())
2149                    .collect::<Vec<_>>(),
2150                ["a.txt", "b.txt", "d.txt"]
2151            );
2152        });
2153        worktree_b.read_with(cx_b, |worktree, _| {
2154            assert_eq!(
2155                worktree
2156                    .paths()
2157                    .map(|p| p.to_string_lossy())
2158                    .collect::<Vec<_>>(),
2159                ["a.txt", "b.txt", "d.txt"]
2160            );
2161        });
2162
2163        let dir_entry = project_b
2164            .update(cx_b, |project, cx| {
2165                project
2166                    .create_entry((worktree_id, "DIR"), true, cx)
2167                    .unwrap()
2168            })
2169            .await
2170            .unwrap();
2171        worktree_a.read_with(cx_a, |worktree, _| {
2172            assert_eq!(
2173                worktree
2174                    .paths()
2175                    .map(|p| p.to_string_lossy())
2176                    .collect::<Vec<_>>(),
2177                ["DIR", "a.txt", "b.txt", "d.txt"]
2178            );
2179        });
2180        worktree_b.read_with(cx_b, |worktree, _| {
2181            assert_eq!(
2182                worktree
2183                    .paths()
2184                    .map(|p| p.to_string_lossy())
2185                    .collect::<Vec<_>>(),
2186                ["DIR", "a.txt", "b.txt", "d.txt"]
2187            );
2188        });
2189
2190        project_b
2191            .update(cx_b, |project, cx| {
2192                project.delete_entry(dir_entry.id, cx).unwrap()
2193            })
2194            .await
2195            .unwrap();
2196        worktree_a.read_with(cx_a, |worktree, _| {
2197            assert_eq!(
2198                worktree
2199                    .paths()
2200                    .map(|p| p.to_string_lossy())
2201                    .collect::<Vec<_>>(),
2202                ["a.txt", "b.txt", "d.txt"]
2203            );
2204        });
2205        worktree_b.read_with(cx_b, |worktree, _| {
2206            assert_eq!(
2207                worktree
2208                    .paths()
2209                    .map(|p| p.to_string_lossy())
2210                    .collect::<Vec<_>>(),
2211                ["a.txt", "b.txt", "d.txt"]
2212            );
2213        });
2214
2215        project_b
2216            .update(cx_b, |project, cx| {
2217                project.delete_entry(entry.id, cx).unwrap()
2218            })
2219            .await
2220            .unwrap();
2221        worktree_a.read_with(cx_a, |worktree, _| {
2222            assert_eq!(
2223                worktree
2224                    .paths()
2225                    .map(|p| p.to_string_lossy())
2226                    .collect::<Vec<_>>(),
2227                ["a.txt", "b.txt"]
2228            );
2229        });
2230        worktree_b.read_with(cx_b, |worktree, _| {
2231            assert_eq!(
2232                worktree
2233                    .paths()
2234                    .map(|p| p.to_string_lossy())
2235                    .collect::<Vec<_>>(),
2236                ["a.txt", "b.txt"]
2237            );
2238        });
2239    }
2240
2241    #[gpui::test(iterations = 10)]
2242    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2243        cx_a.foreground().forbid_parking();
2244        let lang_registry = Arc::new(LanguageRegistry::test());
2245        let fs = FakeFs::new(cx_a.background());
2246
2247        // Connect to a server as 2 clients.
2248        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2249        let client_a = server.create_client(cx_a, "user_a").await;
2250        let client_b = server.create_client(cx_b, "user_b").await;
2251        server
2252            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2253            .await;
2254
2255        // Share a project as client A
2256        fs.insert_tree(
2257            "/dir",
2258            json!({
2259                "a.txt": "a-contents",
2260            }),
2261        )
2262        .await;
2263
2264        let project_a = cx_a.update(|cx| {
2265            Project::local(
2266                client_a.clone(),
2267                client_a.user_store.clone(),
2268                lang_registry.clone(),
2269                fs.clone(),
2270                cx,
2271            )
2272        });
2273        let (worktree_a, _) = project_a
2274            .update(cx_a, |p, cx| {
2275                p.find_or_create_local_worktree("/dir", true, cx)
2276            })
2277            .await
2278            .unwrap();
2279        worktree_a
2280            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2281            .await;
2282        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2283        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2284        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2285
2286        // Join that project as client B
2287        let project_b = Project::remote(
2288            project_id,
2289            client_b.clone(),
2290            client_b.user_store.clone(),
2291            lang_registry.clone(),
2292            fs.clone(),
2293            &mut cx_b.to_async(),
2294        )
2295        .await
2296        .unwrap();
2297
2298        // Open a buffer as client B
2299        let buffer_b = project_b
2300            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2301            .await
2302            .unwrap();
2303
2304        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2305        buffer_b.read_with(cx_b, |buf, _| {
2306            assert!(buf.is_dirty());
2307            assert!(!buf.has_conflict());
2308        });
2309
2310        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2311        buffer_b
2312            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2313            .await;
2314        buffer_b.read_with(cx_b, |buf, _| {
2315            assert!(!buf.has_conflict());
2316        });
2317
2318        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2319        buffer_b.read_with(cx_b, |buf, _| {
2320            assert!(buf.is_dirty());
2321            assert!(!buf.has_conflict());
2322        });
2323    }
2324
2325    #[gpui::test(iterations = 10)]
2326    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2327        cx_a.foreground().forbid_parking();
2328        let lang_registry = Arc::new(LanguageRegistry::test());
2329        let fs = FakeFs::new(cx_a.background());
2330
2331        // Connect to a server as 2 clients.
2332        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2333        let client_a = server.create_client(cx_a, "user_a").await;
2334        let client_b = server.create_client(cx_b, "user_b").await;
2335        server
2336            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2337            .await;
2338
2339        // Share a project as client A
2340        fs.insert_tree(
2341            "/dir",
2342            json!({
2343                "a.txt": "a-contents",
2344            }),
2345        )
2346        .await;
2347
2348        let project_a = cx_a.update(|cx| {
2349            Project::local(
2350                client_a.clone(),
2351                client_a.user_store.clone(),
2352                lang_registry.clone(),
2353                fs.clone(),
2354                cx,
2355            )
2356        });
2357        let (worktree_a, _) = project_a
2358            .update(cx_a, |p, cx| {
2359                p.find_or_create_local_worktree("/dir", true, cx)
2360            })
2361            .await
2362            .unwrap();
2363        worktree_a
2364            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2365            .await;
2366        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2367        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2368        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2369
2370        // Join that project as client B
2371        let project_b = Project::remote(
2372            project_id,
2373            client_b.clone(),
2374            client_b.user_store.clone(),
2375            lang_registry.clone(),
2376            fs.clone(),
2377            &mut cx_b.to_async(),
2378        )
2379        .await
2380        .unwrap();
2381        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2382
2383        // Open a buffer as client B
2384        let buffer_b = project_b
2385            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2386            .await
2387            .unwrap();
2388        buffer_b.read_with(cx_b, |buf, _| {
2389            assert!(!buf.is_dirty());
2390            assert!(!buf.has_conflict());
2391        });
2392
2393        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2394            .await
2395            .unwrap();
2396        buffer_b
2397            .condition(&cx_b, |buf, _| {
2398                buf.text() == "new contents" && !buf.is_dirty()
2399            })
2400            .await;
2401        buffer_b.read_with(cx_b, |buf, _| {
2402            assert!(!buf.has_conflict());
2403        });
2404    }
2405
2406    #[gpui::test(iterations = 10)]
2407    async fn test_editing_while_guest_opens_buffer(
2408        cx_a: &mut TestAppContext,
2409        cx_b: &mut TestAppContext,
2410    ) {
2411        cx_a.foreground().forbid_parking();
2412        let lang_registry = Arc::new(LanguageRegistry::test());
2413        let fs = FakeFs::new(cx_a.background());
2414
2415        // Connect to a server as 2 clients.
2416        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2417        let client_a = server.create_client(cx_a, "user_a").await;
2418        let client_b = server.create_client(cx_b, "user_b").await;
2419        server
2420            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2421            .await;
2422
2423        // Share a project as client A
2424        fs.insert_tree(
2425            "/dir",
2426            json!({
2427                "a.txt": "a-contents",
2428            }),
2429        )
2430        .await;
2431        let project_a = cx_a.update(|cx| {
2432            Project::local(
2433                client_a.clone(),
2434                client_a.user_store.clone(),
2435                lang_registry.clone(),
2436                fs.clone(),
2437                cx,
2438            )
2439        });
2440        let (worktree_a, _) = project_a
2441            .update(cx_a, |p, cx| {
2442                p.find_or_create_local_worktree("/dir", true, cx)
2443            })
2444            .await
2445            .unwrap();
2446        worktree_a
2447            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2448            .await;
2449        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2450        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2451        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2452
2453        // Join that project as client B
2454        let project_b = Project::remote(
2455            project_id,
2456            client_b.clone(),
2457            client_b.user_store.clone(),
2458            lang_registry.clone(),
2459            fs.clone(),
2460            &mut cx_b.to_async(),
2461        )
2462        .await
2463        .unwrap();
2464
2465        // Open a buffer as client A
2466        let buffer_a = project_a
2467            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2468            .await
2469            .unwrap();
2470
2471        // Start opening the same buffer as client B
2472        let buffer_b = cx_b
2473            .background()
2474            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2475
2476        // Edit the buffer as client A while client B is still opening it.
2477        cx_b.background().simulate_random_delay().await;
2478        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2479        cx_b.background().simulate_random_delay().await;
2480        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2481
2482        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2483        let buffer_b = buffer_b.await.unwrap();
2484        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2485    }
2486
2487    #[gpui::test(iterations = 10)]
2488    async fn test_leaving_worktree_while_opening_buffer(
2489        cx_a: &mut TestAppContext,
2490        cx_b: &mut TestAppContext,
2491    ) {
2492        cx_a.foreground().forbid_parking();
2493        let lang_registry = Arc::new(LanguageRegistry::test());
2494        let fs = FakeFs::new(cx_a.background());
2495
2496        // Connect to a server as 2 clients.
2497        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2498        let client_a = server.create_client(cx_a, "user_a").await;
2499        let client_b = server.create_client(cx_b, "user_b").await;
2500        server
2501            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2502            .await;
2503
2504        // Share a project as client A
2505        fs.insert_tree(
2506            "/dir",
2507            json!({
2508                "a.txt": "a-contents",
2509            }),
2510        )
2511        .await;
2512        let project_a = cx_a.update(|cx| {
2513            Project::local(
2514                client_a.clone(),
2515                client_a.user_store.clone(),
2516                lang_registry.clone(),
2517                fs.clone(),
2518                cx,
2519            )
2520        });
2521        let (worktree_a, _) = project_a
2522            .update(cx_a, |p, cx| {
2523                p.find_or_create_local_worktree("/dir", true, cx)
2524            })
2525            .await
2526            .unwrap();
2527        worktree_a
2528            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2529            .await;
2530        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2531        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2532        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2533
2534        // Join that project as client B
2535        let project_b = Project::remote(
2536            project_id,
2537            client_b.clone(),
2538            client_b.user_store.clone(),
2539            lang_registry.clone(),
2540            fs.clone(),
2541            &mut cx_b.to_async(),
2542        )
2543        .await
2544        .unwrap();
2545
2546        // See that a guest has joined as client A.
2547        project_a
2548            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2549            .await;
2550
2551        // Begin opening a buffer as client B, but leave the project before the open completes.
2552        let buffer_b = cx_b
2553            .background()
2554            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2555        cx_b.update(|_| drop(project_b));
2556        drop(buffer_b);
2557
2558        // See that the guest has left.
2559        project_a
2560            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2561            .await;
2562    }
2563
2564    #[gpui::test(iterations = 10)]
2565    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2566        cx_a.foreground().forbid_parking();
2567        let lang_registry = Arc::new(LanguageRegistry::test());
2568        let fs = FakeFs::new(cx_a.background());
2569
2570        // Connect to a server as 2 clients.
2571        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2572        let client_a = server.create_client(cx_a, "user_a").await;
2573        let client_b = server.create_client(cx_b, "user_b").await;
2574        server
2575            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2576            .await;
2577
2578        // Share a project as client A
2579        fs.insert_tree(
2580            "/a",
2581            json!({
2582                "a.txt": "a-contents",
2583                "b.txt": "b-contents",
2584            }),
2585        )
2586        .await;
2587        let project_a = cx_a.update(|cx| {
2588            Project::local(
2589                client_a.clone(),
2590                client_a.user_store.clone(),
2591                lang_registry.clone(),
2592                fs.clone(),
2593                cx,
2594            )
2595        });
2596        let (worktree_a, _) = project_a
2597            .update(cx_a, |p, cx| {
2598                p.find_or_create_local_worktree("/a", true, cx)
2599            })
2600            .await
2601            .unwrap();
2602        worktree_a
2603            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2604            .await;
2605        let project_id = project_a
2606            .update(cx_a, |project, _| project.next_remote_id())
2607            .await;
2608        project_a
2609            .update(cx_a, |project, cx| project.share(cx))
2610            .await
2611            .unwrap();
2612
2613        // Join that project as client B
2614        let _project_b = Project::remote(
2615            project_id,
2616            client_b.clone(),
2617            client_b.user_store.clone(),
2618            lang_registry.clone(),
2619            fs.clone(),
2620            &mut cx_b.to_async(),
2621        )
2622        .await
2623        .unwrap();
2624
2625        // Client A sees that a guest has joined.
2626        project_a
2627            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2628            .await;
2629
2630        // Drop client B's connection and ensure client A observes client B leaving the project.
2631        client_b.disconnect(&cx_b.to_async()).unwrap();
2632        project_a
2633            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2634            .await;
2635
2636        // Rejoin the project as client B
2637        let _project_b = Project::remote(
2638            project_id,
2639            client_b.clone(),
2640            client_b.user_store.clone(),
2641            lang_registry.clone(),
2642            fs.clone(),
2643            &mut cx_b.to_async(),
2644        )
2645        .await
2646        .unwrap();
2647
2648        // Client A sees that a guest has re-joined.
2649        project_a
2650            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2651            .await;
2652
2653        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2654        client_b.wait_for_current_user(cx_b).await;
2655        server.disconnect_client(client_b.current_user_id(cx_b));
2656        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2657        project_a
2658            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2659            .await;
2660    }
2661
2662    #[gpui::test(iterations = 10)]
2663    async fn test_collaborating_with_diagnostics(
2664        cx_a: &mut TestAppContext,
2665        cx_b: &mut TestAppContext,
2666    ) {
2667        cx_a.foreground().forbid_parking();
2668        let lang_registry = Arc::new(LanguageRegistry::test());
2669        let fs = FakeFs::new(cx_a.background());
2670
2671        // Set up a fake language server.
2672        let mut language = Language::new(
2673            LanguageConfig {
2674                name: "Rust".into(),
2675                path_suffixes: vec!["rs".to_string()],
2676                ..Default::default()
2677            },
2678            Some(tree_sitter_rust::language()),
2679        );
2680        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2681        lang_registry.add(Arc::new(language));
2682
2683        // Connect to a server as 2 clients.
2684        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2685        let client_a = server.create_client(cx_a, "user_a").await;
2686        let client_b = server.create_client(cx_b, "user_b").await;
2687        server
2688            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2689            .await;
2690
2691        // Share a project as client A
2692        fs.insert_tree(
2693            "/a",
2694            json!({
2695                "a.rs": "let one = two",
2696                "other.rs": "",
2697            }),
2698        )
2699        .await;
2700        let project_a = cx_a.update(|cx| {
2701            Project::local(
2702                client_a.clone(),
2703                client_a.user_store.clone(),
2704                lang_registry.clone(),
2705                fs.clone(),
2706                cx,
2707            )
2708        });
2709        let (worktree_a, _) = project_a
2710            .update(cx_a, |p, cx| {
2711                p.find_or_create_local_worktree("/a", true, cx)
2712            })
2713            .await
2714            .unwrap();
2715        worktree_a
2716            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2717            .await;
2718        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2719        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2720        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2721
2722        // Cause the language server to start.
2723        let _ = cx_a
2724            .background()
2725            .spawn(project_a.update(cx_a, |project, cx| {
2726                project.open_buffer(
2727                    ProjectPath {
2728                        worktree_id,
2729                        path: Path::new("other.rs").into(),
2730                    },
2731                    cx,
2732                )
2733            }))
2734            .await
2735            .unwrap();
2736
2737        // Simulate a language server reporting errors for a file.
2738        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2739        fake_language_server
2740            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2741            .await;
2742        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2743            lsp::PublishDiagnosticsParams {
2744                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2745                version: None,
2746                diagnostics: vec![lsp::Diagnostic {
2747                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2748                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2749                    message: "message 1".to_string(),
2750                    ..Default::default()
2751                }],
2752            },
2753        );
2754
2755        // Wait for server to see the diagnostics update.
2756        server
2757            .condition(|store| {
2758                let worktree = store
2759                    .project(project_id)
2760                    .unwrap()
2761                    .share
2762                    .as_ref()
2763                    .unwrap()
2764                    .worktrees
2765                    .get(&worktree_id.to_proto())
2766                    .unwrap();
2767
2768                !worktree.diagnostic_summaries.is_empty()
2769            })
2770            .await;
2771
2772        // Join the worktree as client B.
2773        let project_b = Project::remote(
2774            project_id,
2775            client_b.clone(),
2776            client_b.user_store.clone(),
2777            lang_registry.clone(),
2778            fs.clone(),
2779            &mut cx_b.to_async(),
2780        )
2781        .await
2782        .unwrap();
2783
2784        project_b.read_with(cx_b, |project, cx| {
2785            assert_eq!(
2786                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2787                &[(
2788                    ProjectPath {
2789                        worktree_id,
2790                        path: Arc::from(Path::new("a.rs")),
2791                    },
2792                    DiagnosticSummary {
2793                        error_count: 1,
2794                        warning_count: 0,
2795                        ..Default::default()
2796                    },
2797                )]
2798            )
2799        });
2800
2801        // Simulate a language server reporting more errors for a file.
2802        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2803            lsp::PublishDiagnosticsParams {
2804                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2805                version: None,
2806                diagnostics: vec![
2807                    lsp::Diagnostic {
2808                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2809                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2810                        message: "message 1".to_string(),
2811                        ..Default::default()
2812                    },
2813                    lsp::Diagnostic {
2814                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2815                        range: lsp::Range::new(
2816                            lsp::Position::new(0, 10),
2817                            lsp::Position::new(0, 13),
2818                        ),
2819                        message: "message 2".to_string(),
2820                        ..Default::default()
2821                    },
2822                ],
2823            },
2824        );
2825
2826        // Client b gets the updated summaries
2827        project_b
2828            .condition(&cx_b, |project, cx| {
2829                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2830                    == &[(
2831                        ProjectPath {
2832                            worktree_id,
2833                            path: Arc::from(Path::new("a.rs")),
2834                        },
2835                        DiagnosticSummary {
2836                            error_count: 1,
2837                            warning_count: 1,
2838                            ..Default::default()
2839                        },
2840                    )]
2841            })
2842            .await;
2843
2844        // Open the file with the errors on client B. They should be present.
2845        let buffer_b = cx_b
2846            .background()
2847            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2848            .await
2849            .unwrap();
2850
2851        buffer_b.read_with(cx_b, |buffer, _| {
2852            assert_eq!(
2853                buffer
2854                    .snapshot()
2855                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2856                    .map(|entry| entry)
2857                    .collect::<Vec<_>>(),
2858                &[
2859                    DiagnosticEntry {
2860                        range: Point::new(0, 4)..Point::new(0, 7),
2861                        diagnostic: Diagnostic {
2862                            group_id: 0,
2863                            message: "message 1".to_string(),
2864                            severity: lsp::DiagnosticSeverity::ERROR,
2865                            is_primary: true,
2866                            ..Default::default()
2867                        }
2868                    },
2869                    DiagnosticEntry {
2870                        range: Point::new(0, 10)..Point::new(0, 13),
2871                        diagnostic: Diagnostic {
2872                            group_id: 1,
2873                            severity: lsp::DiagnosticSeverity::WARNING,
2874                            message: "message 2".to_string(),
2875                            is_primary: true,
2876                            ..Default::default()
2877                        }
2878                    }
2879                ]
2880            );
2881        });
2882
2883        // Simulate a language server reporting no errors for a file.
2884        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2885            lsp::PublishDiagnosticsParams {
2886                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2887                version: None,
2888                diagnostics: vec![],
2889            },
2890        );
2891        project_a
2892            .condition(cx_a, |project, cx| {
2893                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2894            })
2895            .await;
2896        project_b
2897            .condition(cx_b, |project, cx| {
2898                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2899            })
2900            .await;
2901    }
2902
2903    #[gpui::test(iterations = 10)]
2904    async fn test_collaborating_with_completion(
2905        cx_a: &mut TestAppContext,
2906        cx_b: &mut TestAppContext,
2907    ) {
2908        cx_a.foreground().forbid_parking();
2909        let lang_registry = Arc::new(LanguageRegistry::test());
2910        let fs = FakeFs::new(cx_a.background());
2911
2912        // Set up a fake language server.
2913        let mut language = Language::new(
2914            LanguageConfig {
2915                name: "Rust".into(),
2916                path_suffixes: vec!["rs".to_string()],
2917                ..Default::default()
2918            },
2919            Some(tree_sitter_rust::language()),
2920        );
2921        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2922            capabilities: lsp::ServerCapabilities {
2923                completion_provider: Some(lsp::CompletionOptions {
2924                    trigger_characters: Some(vec![".".to_string()]),
2925                    ..Default::default()
2926                }),
2927                ..Default::default()
2928            },
2929            ..Default::default()
2930        });
2931        lang_registry.add(Arc::new(language));
2932
2933        // Connect to a server as 2 clients.
2934        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2935        let client_a = server.create_client(cx_a, "user_a").await;
2936        let client_b = server.create_client(cx_b, "user_b").await;
2937        server
2938            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2939            .await;
2940
2941        // Share a project as client A
2942        fs.insert_tree(
2943            "/a",
2944            json!({
2945                "main.rs": "fn main() { a }",
2946                "other.rs": "",
2947            }),
2948        )
2949        .await;
2950        let project_a = cx_a.update(|cx| {
2951            Project::local(
2952                client_a.clone(),
2953                client_a.user_store.clone(),
2954                lang_registry.clone(),
2955                fs.clone(),
2956                cx,
2957            )
2958        });
2959        let (worktree_a, _) = project_a
2960            .update(cx_a, |p, cx| {
2961                p.find_or_create_local_worktree("/a", true, cx)
2962            })
2963            .await
2964            .unwrap();
2965        worktree_a
2966            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2967            .await;
2968        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2969        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2970        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2971
2972        // Join the worktree as client B.
2973        let project_b = Project::remote(
2974            project_id,
2975            client_b.clone(),
2976            client_b.user_store.clone(),
2977            lang_registry.clone(),
2978            fs.clone(),
2979            &mut cx_b.to_async(),
2980        )
2981        .await
2982        .unwrap();
2983
2984        // Open a file in an editor as the guest.
2985        let buffer_b = project_b
2986            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2987            .await
2988            .unwrap();
2989        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2990        let editor_b = cx_b.add_view(window_b, |cx| {
2991            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2992        });
2993
2994        let fake_language_server = fake_language_servers.next().await.unwrap();
2995        buffer_b
2996            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2997            .await;
2998
2999        // Type a completion trigger character as the guest.
3000        editor_b.update(cx_b, |editor, cx| {
3001            editor.select_ranges([13..13], None, cx);
3002            editor.handle_input(&Input(".".into()), cx);
3003            cx.focus(&editor_b);
3004        });
3005
3006        // Receive a completion request as the host's language server.
3007        // Return some completions from the host's language server.
3008        cx_a.foreground().start_waiting();
3009        fake_language_server
3010            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3011                assert_eq!(
3012                    params.text_document_position.text_document.uri,
3013                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3014                );
3015                assert_eq!(
3016                    params.text_document_position.position,
3017                    lsp::Position::new(0, 14),
3018                );
3019
3020                Ok(Some(lsp::CompletionResponse::Array(vec![
3021                    lsp::CompletionItem {
3022                        label: "first_method(…)".into(),
3023                        detail: Some("fn(&mut self, B) -> C".into()),
3024                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3025                            new_text: "first_method($1)".to_string(),
3026                            range: lsp::Range::new(
3027                                lsp::Position::new(0, 14),
3028                                lsp::Position::new(0, 14),
3029                            ),
3030                        })),
3031                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3032                        ..Default::default()
3033                    },
3034                    lsp::CompletionItem {
3035                        label: "second_method(…)".into(),
3036                        detail: Some("fn(&mut self, C) -> D<E>".into()),
3037                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3038                            new_text: "second_method()".to_string(),
3039                            range: lsp::Range::new(
3040                                lsp::Position::new(0, 14),
3041                                lsp::Position::new(0, 14),
3042                            ),
3043                        })),
3044                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3045                        ..Default::default()
3046                    },
3047                ])))
3048            })
3049            .next()
3050            .await
3051            .unwrap();
3052        cx_a.foreground().finish_waiting();
3053
3054        // Open the buffer on the host.
3055        let buffer_a = project_a
3056            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3057            .await
3058            .unwrap();
3059        buffer_a
3060            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3061            .await;
3062
3063        // Confirm a completion on the guest.
3064        editor_b
3065            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3066            .await;
3067        editor_b.update(cx_b, |editor, cx| {
3068            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3069            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3070        });
3071
3072        // Return a resolved completion from the host's language server.
3073        // The resolved completion has an additional text edit.
3074        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3075            |params, _| async move {
3076                assert_eq!(params.label, "first_method(…)");
3077                Ok(lsp::CompletionItem {
3078                    label: "first_method(…)".into(),
3079                    detail: Some("fn(&mut self, B) -> C".into()),
3080                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3081                        new_text: "first_method($1)".to_string(),
3082                        range: lsp::Range::new(
3083                            lsp::Position::new(0, 14),
3084                            lsp::Position::new(0, 14),
3085                        ),
3086                    })),
3087                    additional_text_edits: Some(vec![lsp::TextEdit {
3088                        new_text: "use d::SomeTrait;\n".to_string(),
3089                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3090                    }]),
3091                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3092                    ..Default::default()
3093                })
3094            },
3095        );
3096
3097        // The additional edit is applied.
3098        buffer_a
3099            .condition(&cx_a, |buffer, _| {
3100                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3101            })
3102            .await;
3103        buffer_b
3104            .condition(&cx_b, |buffer, _| {
3105                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3106            })
3107            .await;
3108    }
3109
3110    #[gpui::test(iterations = 10)]
3111    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3112        cx_a.foreground().forbid_parking();
3113        let lang_registry = Arc::new(LanguageRegistry::test());
3114        let fs = FakeFs::new(cx_a.background());
3115
3116        // Connect to a server as 2 clients.
3117        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3118        let client_a = server.create_client(cx_a, "user_a").await;
3119        let client_b = server.create_client(cx_b, "user_b").await;
3120        server
3121            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3122            .await;
3123
3124        // Share a project as client A
3125        fs.insert_tree(
3126            "/a",
3127            json!({
3128                "a.rs": "let one = 1;",
3129            }),
3130        )
3131        .await;
3132        let project_a = cx_a.update(|cx| {
3133            Project::local(
3134                client_a.clone(),
3135                client_a.user_store.clone(),
3136                lang_registry.clone(),
3137                fs.clone(),
3138                cx,
3139            )
3140        });
3141        let (worktree_a, _) = project_a
3142            .update(cx_a, |p, cx| {
3143                p.find_or_create_local_worktree("/a", true, cx)
3144            })
3145            .await
3146            .unwrap();
3147        worktree_a
3148            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3149            .await;
3150        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3151        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3152        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3153        let buffer_a = project_a
3154            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3155            .await
3156            .unwrap();
3157
3158        // Join the worktree as client B.
3159        let project_b = Project::remote(
3160            project_id,
3161            client_b.clone(),
3162            client_b.user_store.clone(),
3163            lang_registry.clone(),
3164            fs.clone(),
3165            &mut cx_b.to_async(),
3166        )
3167        .await
3168        .unwrap();
3169
3170        let buffer_b = cx_b
3171            .background()
3172            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3173            .await
3174            .unwrap();
3175        buffer_b.update(cx_b, |buffer, cx| {
3176            buffer.edit([(4..7, "six")], cx);
3177            buffer.edit([(10..11, "6")], cx);
3178            assert_eq!(buffer.text(), "let six = 6;");
3179            assert!(buffer.is_dirty());
3180            assert!(!buffer.has_conflict());
3181        });
3182        buffer_a
3183            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3184            .await;
3185
3186        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3187            .await
3188            .unwrap();
3189        buffer_a
3190            .condition(cx_a, |buffer, _| buffer.has_conflict())
3191            .await;
3192        buffer_b
3193            .condition(cx_b, |buffer, _| buffer.has_conflict())
3194            .await;
3195
3196        project_b
3197            .update(cx_b, |project, cx| {
3198                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3199            })
3200            .await
3201            .unwrap();
3202        buffer_a.read_with(cx_a, |buffer, _| {
3203            assert_eq!(buffer.text(), "let seven = 7;");
3204            assert!(!buffer.is_dirty());
3205            assert!(!buffer.has_conflict());
3206        });
3207        buffer_b.read_with(cx_b, |buffer, _| {
3208            assert_eq!(buffer.text(), "let seven = 7;");
3209            assert!(!buffer.is_dirty());
3210            assert!(!buffer.has_conflict());
3211        });
3212
3213        buffer_a.update(cx_a, |buffer, cx| {
3214            // Undoing on the host is a no-op when the reload was initiated by the guest.
3215            buffer.undo(cx);
3216            assert_eq!(buffer.text(), "let seven = 7;");
3217            assert!(!buffer.is_dirty());
3218            assert!(!buffer.has_conflict());
3219        });
3220        buffer_b.update(cx_b, |buffer, cx| {
3221            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3222            buffer.undo(cx);
3223            assert_eq!(buffer.text(), "let six = 6;");
3224            assert!(buffer.is_dirty());
3225            assert!(!buffer.has_conflict());
3226        });
3227    }
3228
3229    #[gpui::test(iterations = 10)]
3230    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3231        cx_a.foreground().forbid_parking();
3232        let lang_registry = Arc::new(LanguageRegistry::test());
3233        let fs = FakeFs::new(cx_a.background());
3234
3235        // Set up a fake language server.
3236        let mut language = Language::new(
3237            LanguageConfig {
3238                name: "Rust".into(),
3239                path_suffixes: vec!["rs".to_string()],
3240                ..Default::default()
3241            },
3242            Some(tree_sitter_rust::language()),
3243        );
3244        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3245        lang_registry.add(Arc::new(language));
3246
3247        // Connect to a server as 2 clients.
3248        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3249        let client_a = server.create_client(cx_a, "user_a").await;
3250        let client_b = server.create_client(cx_b, "user_b").await;
3251        server
3252            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3253            .await;
3254
3255        // Share a project as client A
3256        fs.insert_tree(
3257            "/a",
3258            json!({
3259                "a.rs": "let one = two",
3260            }),
3261        )
3262        .await;
3263        let project_a = cx_a.update(|cx| {
3264            Project::local(
3265                client_a.clone(),
3266                client_a.user_store.clone(),
3267                lang_registry.clone(),
3268                fs.clone(),
3269                cx,
3270            )
3271        });
3272        let (worktree_a, _) = project_a
3273            .update(cx_a, |p, cx| {
3274                p.find_or_create_local_worktree("/a", true, cx)
3275            })
3276            .await
3277            .unwrap();
3278        worktree_a
3279            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3280            .await;
3281        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3282        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3283        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3284
3285        // Join the worktree as client B.
3286        let project_b = Project::remote(
3287            project_id,
3288            client_b.clone(),
3289            client_b.user_store.clone(),
3290            lang_registry.clone(),
3291            fs.clone(),
3292            &mut cx_b.to_async(),
3293        )
3294        .await
3295        .unwrap();
3296
3297        let buffer_b = cx_b
3298            .background()
3299            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3300            .await
3301            .unwrap();
3302
3303        let fake_language_server = fake_language_servers.next().await.unwrap();
3304        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3305            Ok(Some(vec![
3306                lsp::TextEdit {
3307                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3308                    new_text: "h".to_string(),
3309                },
3310                lsp::TextEdit {
3311                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3312                    new_text: "y".to_string(),
3313                },
3314            ]))
3315        });
3316
3317        project_b
3318            .update(cx_b, |project, cx| {
3319                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3320            })
3321            .await
3322            .unwrap();
3323        assert_eq!(
3324            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3325            "let honey = two"
3326        );
3327    }
3328
3329    #[gpui::test(iterations = 10)]
3330    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3331        cx_a.foreground().forbid_parking();
3332        let lang_registry = Arc::new(LanguageRegistry::test());
3333        let fs = FakeFs::new(cx_a.background());
3334        fs.insert_tree(
3335            "/root-1",
3336            json!({
3337                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3338            }),
3339        )
3340        .await;
3341        fs.insert_tree(
3342            "/root-2",
3343            json!({
3344                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3345            }),
3346        )
3347        .await;
3348
3349        // Set up a fake language server.
3350        let mut language = Language::new(
3351            LanguageConfig {
3352                name: "Rust".into(),
3353                path_suffixes: vec!["rs".to_string()],
3354                ..Default::default()
3355            },
3356            Some(tree_sitter_rust::language()),
3357        );
3358        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3359        lang_registry.add(Arc::new(language));
3360
3361        // Connect to a server as 2 clients.
3362        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3363        let client_a = server.create_client(cx_a, "user_a").await;
3364        let client_b = server.create_client(cx_b, "user_b").await;
3365        server
3366            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3367            .await;
3368
3369        // Share a project as client A
3370        let project_a = cx_a.update(|cx| {
3371            Project::local(
3372                client_a.clone(),
3373                client_a.user_store.clone(),
3374                lang_registry.clone(),
3375                fs.clone(),
3376                cx,
3377            )
3378        });
3379        let (worktree_a, _) = project_a
3380            .update(cx_a, |p, cx| {
3381                p.find_or_create_local_worktree("/root-1", true, cx)
3382            })
3383            .await
3384            .unwrap();
3385        worktree_a
3386            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3387            .await;
3388        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3389        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3390        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3391
3392        // Join the worktree as client B.
3393        let project_b = Project::remote(
3394            project_id,
3395            client_b.clone(),
3396            client_b.user_store.clone(),
3397            lang_registry.clone(),
3398            fs.clone(),
3399            &mut cx_b.to_async(),
3400        )
3401        .await
3402        .unwrap();
3403
3404        // Open the file on client B.
3405        let buffer_b = cx_b
3406            .background()
3407            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3408            .await
3409            .unwrap();
3410
3411        // Request the definition of a symbol as the guest.
3412        let fake_language_server = fake_language_servers.next().await.unwrap();
3413        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3414            |_, _| async move {
3415                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3416                    lsp::Location::new(
3417                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3418                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3419                    ),
3420                )))
3421            },
3422        );
3423
3424        let definitions_1 = project_b
3425            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3426            .await
3427            .unwrap();
3428        cx_b.read(|cx| {
3429            assert_eq!(definitions_1.len(), 1);
3430            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3431            let target_buffer = definitions_1[0].buffer.read(cx);
3432            assert_eq!(
3433                target_buffer.text(),
3434                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3435            );
3436            assert_eq!(
3437                definitions_1[0].range.to_point(target_buffer),
3438                Point::new(0, 6)..Point::new(0, 9)
3439            );
3440        });
3441
3442        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3443        // the previous call to `definition`.
3444        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3445            |_, _| async move {
3446                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3447                    lsp::Location::new(
3448                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3449                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3450                    ),
3451                )))
3452            },
3453        );
3454
3455        let definitions_2 = project_b
3456            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3457            .await
3458            .unwrap();
3459        cx_b.read(|cx| {
3460            assert_eq!(definitions_2.len(), 1);
3461            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3462            let target_buffer = definitions_2[0].buffer.read(cx);
3463            assert_eq!(
3464                target_buffer.text(),
3465                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3466            );
3467            assert_eq!(
3468                definitions_2[0].range.to_point(target_buffer),
3469                Point::new(1, 6)..Point::new(1, 11)
3470            );
3471        });
3472        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3473    }
3474
3475    #[gpui::test(iterations = 10)]
3476    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3477        cx_a.foreground().forbid_parking();
3478        let lang_registry = Arc::new(LanguageRegistry::test());
3479        let fs = FakeFs::new(cx_a.background());
3480        fs.insert_tree(
3481            "/root-1",
3482            json!({
3483                "one.rs": "const ONE: usize = 1;",
3484                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3485            }),
3486        )
3487        .await;
3488        fs.insert_tree(
3489            "/root-2",
3490            json!({
3491                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3492            }),
3493        )
3494        .await;
3495
3496        // Set up a fake language server.
3497        let mut language = Language::new(
3498            LanguageConfig {
3499                name: "Rust".into(),
3500                path_suffixes: vec!["rs".to_string()],
3501                ..Default::default()
3502            },
3503            Some(tree_sitter_rust::language()),
3504        );
3505        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3506        lang_registry.add(Arc::new(language));
3507
3508        // Connect to a server as 2 clients.
3509        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3510        let client_a = server.create_client(cx_a, "user_a").await;
3511        let client_b = server.create_client(cx_b, "user_b").await;
3512        server
3513            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3514            .await;
3515
3516        // Share a project as client A
3517        let project_a = cx_a.update(|cx| {
3518            Project::local(
3519                client_a.clone(),
3520                client_a.user_store.clone(),
3521                lang_registry.clone(),
3522                fs.clone(),
3523                cx,
3524            )
3525        });
3526        let (worktree_a, _) = project_a
3527            .update(cx_a, |p, cx| {
3528                p.find_or_create_local_worktree("/root-1", true, cx)
3529            })
3530            .await
3531            .unwrap();
3532        worktree_a
3533            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3534            .await;
3535        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3536        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3537        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3538
3539        // Join the worktree as client B.
3540        let project_b = Project::remote(
3541            project_id,
3542            client_b.clone(),
3543            client_b.user_store.clone(),
3544            lang_registry.clone(),
3545            fs.clone(),
3546            &mut cx_b.to_async(),
3547        )
3548        .await
3549        .unwrap();
3550
3551        // Open the file on client B.
3552        let buffer_b = cx_b
3553            .background()
3554            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3555            .await
3556            .unwrap();
3557
3558        // Request references to a symbol as the guest.
3559        let fake_language_server = fake_language_servers.next().await.unwrap();
3560        fake_language_server.handle_request::<lsp::request::References, _, _>(
3561            |params, _| async move {
3562                assert_eq!(
3563                    params.text_document_position.text_document.uri.as_str(),
3564                    "file:///root-1/one.rs"
3565                );
3566                Ok(Some(vec![
3567                    lsp::Location {
3568                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3569                        range: lsp::Range::new(
3570                            lsp::Position::new(0, 24),
3571                            lsp::Position::new(0, 27),
3572                        ),
3573                    },
3574                    lsp::Location {
3575                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3576                        range: lsp::Range::new(
3577                            lsp::Position::new(0, 35),
3578                            lsp::Position::new(0, 38),
3579                        ),
3580                    },
3581                    lsp::Location {
3582                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3583                        range: lsp::Range::new(
3584                            lsp::Position::new(0, 37),
3585                            lsp::Position::new(0, 40),
3586                        ),
3587                    },
3588                ]))
3589            },
3590        );
3591
3592        let references = project_b
3593            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3594            .await
3595            .unwrap();
3596        cx_b.read(|cx| {
3597            assert_eq!(references.len(), 3);
3598            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3599
3600            let two_buffer = references[0].buffer.read(cx);
3601            let three_buffer = references[2].buffer.read(cx);
3602            assert_eq!(
3603                two_buffer.file().unwrap().path().as_ref(),
3604                Path::new("two.rs")
3605            );
3606            assert_eq!(references[1].buffer, references[0].buffer);
3607            assert_eq!(
3608                three_buffer.file().unwrap().full_path(cx),
3609                Path::new("three.rs")
3610            );
3611
3612            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3613            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3614            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3615        });
3616    }
3617
3618    #[gpui::test(iterations = 10)]
3619    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3620        cx_a.foreground().forbid_parking();
3621        let lang_registry = Arc::new(LanguageRegistry::test());
3622        let fs = FakeFs::new(cx_a.background());
3623        fs.insert_tree(
3624            "/root-1",
3625            json!({
3626                "a": "hello world",
3627                "b": "goodnight moon",
3628                "c": "a world of goo",
3629                "d": "world champion of clown world",
3630            }),
3631        )
3632        .await;
3633        fs.insert_tree(
3634            "/root-2",
3635            json!({
3636                "e": "disney world is fun",
3637            }),
3638        )
3639        .await;
3640
3641        // Connect to a server as 2 clients.
3642        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3643        let client_a = server.create_client(cx_a, "user_a").await;
3644        let client_b = server.create_client(cx_b, "user_b").await;
3645        server
3646            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3647            .await;
3648
3649        // Share a project as client A
3650        let project_a = cx_a.update(|cx| {
3651            Project::local(
3652                client_a.clone(),
3653                client_a.user_store.clone(),
3654                lang_registry.clone(),
3655                fs.clone(),
3656                cx,
3657            )
3658        });
3659        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3660
3661        let (worktree_1, _) = project_a
3662            .update(cx_a, |p, cx| {
3663                p.find_or_create_local_worktree("/root-1", true, cx)
3664            })
3665            .await
3666            .unwrap();
3667        worktree_1
3668            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3669            .await;
3670        let (worktree_2, _) = project_a
3671            .update(cx_a, |p, cx| {
3672                p.find_or_create_local_worktree("/root-2", true, cx)
3673            })
3674            .await
3675            .unwrap();
3676        worktree_2
3677            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3678            .await;
3679
3680        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3681
3682        // Join the worktree as client B.
3683        let project_b = Project::remote(
3684            project_id,
3685            client_b.clone(),
3686            client_b.user_store.clone(),
3687            lang_registry.clone(),
3688            fs.clone(),
3689            &mut cx_b.to_async(),
3690        )
3691        .await
3692        .unwrap();
3693
3694        let results = project_b
3695            .update(cx_b, |project, cx| {
3696                project.search(SearchQuery::text("world", false, false), cx)
3697            })
3698            .await
3699            .unwrap();
3700
3701        let mut ranges_by_path = results
3702            .into_iter()
3703            .map(|(buffer, ranges)| {
3704                buffer.read_with(cx_b, |buffer, cx| {
3705                    let path = buffer.file().unwrap().full_path(cx);
3706                    let offset_ranges = ranges
3707                        .into_iter()
3708                        .map(|range| range.to_offset(buffer))
3709                        .collect::<Vec<_>>();
3710                    (path, offset_ranges)
3711                })
3712            })
3713            .collect::<Vec<_>>();
3714        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3715
3716        assert_eq!(
3717            ranges_by_path,
3718            &[
3719                (PathBuf::from("root-1/a"), vec![6..11]),
3720                (PathBuf::from("root-1/c"), vec![2..7]),
3721                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3722                (PathBuf::from("root-2/e"), vec![7..12]),
3723            ]
3724        );
3725    }
3726
3727    #[gpui::test(iterations = 10)]
3728    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3729        cx_a.foreground().forbid_parking();
3730        let lang_registry = Arc::new(LanguageRegistry::test());
3731        let fs = FakeFs::new(cx_a.background());
3732        fs.insert_tree(
3733            "/root-1",
3734            json!({
3735                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3736            }),
3737        )
3738        .await;
3739
3740        // Set up a fake language server.
3741        let mut language = Language::new(
3742            LanguageConfig {
3743                name: "Rust".into(),
3744                path_suffixes: vec!["rs".to_string()],
3745                ..Default::default()
3746            },
3747            Some(tree_sitter_rust::language()),
3748        );
3749        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3750        lang_registry.add(Arc::new(language));
3751
3752        // Connect to a server as 2 clients.
3753        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3754        let client_a = server.create_client(cx_a, "user_a").await;
3755        let client_b = server.create_client(cx_b, "user_b").await;
3756        server
3757            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3758            .await;
3759
3760        // Share a project as client A
3761        let project_a = cx_a.update(|cx| {
3762            Project::local(
3763                client_a.clone(),
3764                client_a.user_store.clone(),
3765                lang_registry.clone(),
3766                fs.clone(),
3767                cx,
3768            )
3769        });
3770        let (worktree_a, _) = project_a
3771            .update(cx_a, |p, cx| {
3772                p.find_or_create_local_worktree("/root-1", true, cx)
3773            })
3774            .await
3775            .unwrap();
3776        worktree_a
3777            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3778            .await;
3779        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3780        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3781        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3782
3783        // Join the worktree as client B.
3784        let project_b = Project::remote(
3785            project_id,
3786            client_b.clone(),
3787            client_b.user_store.clone(),
3788            lang_registry.clone(),
3789            fs.clone(),
3790            &mut cx_b.to_async(),
3791        )
3792        .await
3793        .unwrap();
3794
3795        // Open the file on client B.
3796        let buffer_b = cx_b
3797            .background()
3798            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3799            .await
3800            .unwrap();
3801
3802        // Request document highlights as the guest.
3803        let fake_language_server = fake_language_servers.next().await.unwrap();
3804        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3805            |params, _| async move {
3806                assert_eq!(
3807                    params
3808                        .text_document_position_params
3809                        .text_document
3810                        .uri
3811                        .as_str(),
3812                    "file:///root-1/main.rs"
3813                );
3814                assert_eq!(
3815                    params.text_document_position_params.position,
3816                    lsp::Position::new(0, 34)
3817                );
3818                Ok(Some(vec![
3819                    lsp::DocumentHighlight {
3820                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3821                        range: lsp::Range::new(
3822                            lsp::Position::new(0, 10),
3823                            lsp::Position::new(0, 16),
3824                        ),
3825                    },
3826                    lsp::DocumentHighlight {
3827                        kind: Some(lsp::DocumentHighlightKind::READ),
3828                        range: lsp::Range::new(
3829                            lsp::Position::new(0, 32),
3830                            lsp::Position::new(0, 38),
3831                        ),
3832                    },
3833                    lsp::DocumentHighlight {
3834                        kind: Some(lsp::DocumentHighlightKind::READ),
3835                        range: lsp::Range::new(
3836                            lsp::Position::new(0, 41),
3837                            lsp::Position::new(0, 47),
3838                        ),
3839                    },
3840                ]))
3841            },
3842        );
3843
3844        let highlights = project_b
3845            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3846            .await
3847            .unwrap();
3848        buffer_b.read_with(cx_b, |buffer, _| {
3849            let snapshot = buffer.snapshot();
3850
3851            let highlights = highlights
3852                .into_iter()
3853                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3854                .collect::<Vec<_>>();
3855            assert_eq!(
3856                highlights,
3857                &[
3858                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3859                    (lsp::DocumentHighlightKind::READ, 32..38),
3860                    (lsp::DocumentHighlightKind::READ, 41..47)
3861                ]
3862            )
3863        });
3864    }
3865
3866    #[gpui::test(iterations = 10)]
3867    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3868        cx_a.foreground().forbid_parking();
3869        let lang_registry = Arc::new(LanguageRegistry::test());
3870        let fs = FakeFs::new(cx_a.background());
3871        fs.insert_tree(
3872            "/code",
3873            json!({
3874                "crate-1": {
3875                    "one.rs": "const ONE: usize = 1;",
3876                },
3877                "crate-2": {
3878                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3879                },
3880                "private": {
3881                    "passwords.txt": "the-password",
3882                }
3883            }),
3884        )
3885        .await;
3886
3887        // Set up a fake language server.
3888        let mut language = Language::new(
3889            LanguageConfig {
3890                name: "Rust".into(),
3891                path_suffixes: vec!["rs".to_string()],
3892                ..Default::default()
3893            },
3894            Some(tree_sitter_rust::language()),
3895        );
3896        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3897        lang_registry.add(Arc::new(language));
3898
3899        // Connect to a server as 2 clients.
3900        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3901        let client_a = server.create_client(cx_a, "user_a").await;
3902        let client_b = server.create_client(cx_b, "user_b").await;
3903        server
3904            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3905            .await;
3906
3907        // Share a project as client A
3908        let project_a = cx_a.update(|cx| {
3909            Project::local(
3910                client_a.clone(),
3911                client_a.user_store.clone(),
3912                lang_registry.clone(),
3913                fs.clone(),
3914                cx,
3915            )
3916        });
3917        let (worktree_a, _) = project_a
3918            .update(cx_a, |p, cx| {
3919                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3920            })
3921            .await
3922            .unwrap();
3923        worktree_a
3924            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3925            .await;
3926        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3927        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3928        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3929
3930        // Join the worktree as client B.
3931        let project_b = Project::remote(
3932            project_id,
3933            client_b.clone(),
3934            client_b.user_store.clone(),
3935            lang_registry.clone(),
3936            fs.clone(),
3937            &mut cx_b.to_async(),
3938        )
3939        .await
3940        .unwrap();
3941
3942        // Cause the language server to start.
3943        let _buffer = cx_b
3944            .background()
3945            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3946            .await
3947            .unwrap();
3948
3949        let fake_language_server = fake_language_servers.next().await.unwrap();
3950        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3951            |_, _| async move {
3952                #[allow(deprecated)]
3953                Ok(Some(vec![lsp::SymbolInformation {
3954                    name: "TWO".into(),
3955                    location: lsp::Location {
3956                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3957                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3958                    },
3959                    kind: lsp::SymbolKind::CONSTANT,
3960                    tags: None,
3961                    container_name: None,
3962                    deprecated: None,
3963                }]))
3964            },
3965        );
3966
3967        // Request the definition of a symbol as the guest.
3968        let symbols = project_b
3969            .update(cx_b, |p, cx| p.symbols("two", cx))
3970            .await
3971            .unwrap();
3972        assert_eq!(symbols.len(), 1);
3973        assert_eq!(symbols[0].name, "TWO");
3974
3975        // Open one of the returned symbols.
3976        let buffer_b_2 = project_b
3977            .update(cx_b, |project, cx| {
3978                project.open_buffer_for_symbol(&symbols[0], cx)
3979            })
3980            .await
3981            .unwrap();
3982        buffer_b_2.read_with(cx_b, |buffer, _| {
3983            assert_eq!(
3984                buffer.file().unwrap().path().as_ref(),
3985                Path::new("../crate-2/two.rs")
3986            );
3987        });
3988
3989        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3990        let mut fake_symbol = symbols[0].clone();
3991        fake_symbol.path = Path::new("/code/secrets").into();
3992        let error = project_b
3993            .update(cx_b, |project, cx| {
3994                project.open_buffer_for_symbol(&fake_symbol, cx)
3995            })
3996            .await
3997            .unwrap_err();
3998        assert!(error.to_string().contains("invalid symbol signature"));
3999    }
4000
4001    #[gpui::test(iterations = 10)]
4002    async fn test_open_buffer_while_getting_definition_pointing_to_it(
4003        cx_a: &mut TestAppContext,
4004        cx_b: &mut TestAppContext,
4005        mut rng: StdRng,
4006    ) {
4007        cx_a.foreground().forbid_parking();
4008        let lang_registry = Arc::new(LanguageRegistry::test());
4009        let fs = FakeFs::new(cx_a.background());
4010        fs.insert_tree(
4011            "/root",
4012            json!({
4013                "a.rs": "const ONE: usize = b::TWO;",
4014                "b.rs": "const TWO: usize = 2",
4015            }),
4016        )
4017        .await;
4018
4019        // Set up a fake language server.
4020        let mut language = Language::new(
4021            LanguageConfig {
4022                name: "Rust".into(),
4023                path_suffixes: vec!["rs".to_string()],
4024                ..Default::default()
4025            },
4026            Some(tree_sitter_rust::language()),
4027        );
4028        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4029        lang_registry.add(Arc::new(language));
4030
4031        // Connect to a server as 2 clients.
4032        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4033        let client_a = server.create_client(cx_a, "user_a").await;
4034        let client_b = server.create_client(cx_b, "user_b").await;
4035        server
4036            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4037            .await;
4038
4039        // Share a project as client A
4040        let project_a = cx_a.update(|cx| {
4041            Project::local(
4042                client_a.clone(),
4043                client_a.user_store.clone(),
4044                lang_registry.clone(),
4045                fs.clone(),
4046                cx,
4047            )
4048        });
4049
4050        let (worktree_a, _) = project_a
4051            .update(cx_a, |p, cx| {
4052                p.find_or_create_local_worktree("/root", true, cx)
4053            })
4054            .await
4055            .unwrap();
4056        worktree_a
4057            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4058            .await;
4059        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4060        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4061        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4062
4063        // Join the worktree as client B.
4064        let project_b = Project::remote(
4065            project_id,
4066            client_b.clone(),
4067            client_b.user_store.clone(),
4068            lang_registry.clone(),
4069            fs.clone(),
4070            &mut cx_b.to_async(),
4071        )
4072        .await
4073        .unwrap();
4074
4075        let buffer_b1 = cx_b
4076            .background()
4077            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4078            .await
4079            .unwrap();
4080
4081        let fake_language_server = fake_language_servers.next().await.unwrap();
4082        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4083            |_, _| async move {
4084                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4085                    lsp::Location::new(
4086                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
4087                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4088                    ),
4089                )))
4090            },
4091        );
4092
4093        let definitions;
4094        let buffer_b2;
4095        if rng.gen() {
4096            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4097            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4098        } else {
4099            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4100            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4101        }
4102
4103        let buffer_b2 = buffer_b2.await.unwrap();
4104        let definitions = definitions.await.unwrap();
4105        assert_eq!(definitions.len(), 1);
4106        assert_eq!(definitions[0].buffer, buffer_b2);
4107    }
4108
4109    #[gpui::test(iterations = 10)]
4110    async fn test_collaborating_with_code_actions(
4111        cx_a: &mut TestAppContext,
4112        cx_b: &mut TestAppContext,
4113    ) {
4114        cx_a.foreground().forbid_parking();
4115        let lang_registry = Arc::new(LanguageRegistry::test());
4116        let fs = FakeFs::new(cx_a.background());
4117        cx_b.update(|cx| editor::init(cx));
4118
4119        // Set up a fake language server.
4120        let mut language = Language::new(
4121            LanguageConfig {
4122                name: "Rust".into(),
4123                path_suffixes: vec!["rs".to_string()],
4124                ..Default::default()
4125            },
4126            Some(tree_sitter_rust::language()),
4127        );
4128        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4129        lang_registry.add(Arc::new(language));
4130
4131        // Connect to a server as 2 clients.
4132        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4133        let client_a = server.create_client(cx_a, "user_a").await;
4134        let client_b = server.create_client(cx_b, "user_b").await;
4135        server
4136            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4137            .await;
4138
4139        // Share a project as client A
4140        fs.insert_tree(
4141            "/a",
4142            json!({
4143                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4144                "other.rs": "pub fn foo() -> usize { 4 }",
4145            }),
4146        )
4147        .await;
4148        let project_a = cx_a.update(|cx| {
4149            Project::local(
4150                client_a.clone(),
4151                client_a.user_store.clone(),
4152                lang_registry.clone(),
4153                fs.clone(),
4154                cx,
4155            )
4156        });
4157        let (worktree_a, _) = project_a
4158            .update(cx_a, |p, cx| {
4159                p.find_or_create_local_worktree("/a", true, cx)
4160            })
4161            .await
4162            .unwrap();
4163        worktree_a
4164            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4165            .await;
4166        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4167        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4168        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4169
4170        // Join the worktree as client B.
4171        let project_b = Project::remote(
4172            project_id,
4173            client_b.clone(),
4174            client_b.user_store.clone(),
4175            lang_registry.clone(),
4176            fs.clone(),
4177            &mut cx_b.to_async(),
4178        )
4179        .await
4180        .unwrap();
4181        let mut params = cx_b.update(WorkspaceParams::test);
4182        params.languages = lang_registry.clone();
4183        params.client = client_b.client.clone();
4184        params.user_store = client_b.user_store.clone();
4185        params.project = project_b;
4186
4187        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4188        let editor_b = workspace_b
4189            .update(cx_b, |workspace, cx| {
4190                workspace.open_path((worktree_id, "main.rs"), true, cx)
4191            })
4192            .await
4193            .unwrap()
4194            .downcast::<Editor>()
4195            .unwrap();
4196
4197        let mut fake_language_server = fake_language_servers.next().await.unwrap();
4198        fake_language_server
4199            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4200                assert_eq!(
4201                    params.text_document.uri,
4202                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4203                );
4204                assert_eq!(params.range.start, lsp::Position::new(0, 0));
4205                assert_eq!(params.range.end, lsp::Position::new(0, 0));
4206                Ok(None)
4207            })
4208            .next()
4209            .await;
4210
4211        // Move cursor to a location that contains code actions.
4212        editor_b.update(cx_b, |editor, cx| {
4213            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4214            cx.focus(&editor_b);
4215        });
4216
4217        fake_language_server
4218            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4219                assert_eq!(
4220                    params.text_document.uri,
4221                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4222                );
4223                assert_eq!(params.range.start, lsp::Position::new(1, 31));
4224                assert_eq!(params.range.end, lsp::Position::new(1, 31));
4225
4226                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4227                    lsp::CodeAction {
4228                        title: "Inline into all callers".to_string(),
4229                        edit: Some(lsp::WorkspaceEdit {
4230                            changes: Some(
4231                                [
4232                                    (
4233                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
4234                                        vec![lsp::TextEdit::new(
4235                                            lsp::Range::new(
4236                                                lsp::Position::new(1, 22),
4237                                                lsp::Position::new(1, 34),
4238                                            ),
4239                                            "4".to_string(),
4240                                        )],
4241                                    ),
4242                                    (
4243                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
4244                                        vec![lsp::TextEdit::new(
4245                                            lsp::Range::new(
4246                                                lsp::Position::new(0, 0),
4247                                                lsp::Position::new(0, 27),
4248                                            ),
4249                                            "".to_string(),
4250                                        )],
4251                                    ),
4252                                ]
4253                                .into_iter()
4254                                .collect(),
4255                            ),
4256                            ..Default::default()
4257                        }),
4258                        data: Some(json!({
4259                            "codeActionParams": {
4260                                "range": {
4261                                    "start": {"line": 1, "column": 31},
4262                                    "end": {"line": 1, "column": 31},
4263                                }
4264                            }
4265                        })),
4266                        ..Default::default()
4267                    },
4268                )]))
4269            })
4270            .next()
4271            .await;
4272
4273        // Toggle code actions and wait for them to display.
4274        editor_b.update(cx_b, |editor, cx| {
4275            editor.toggle_code_actions(
4276                &ToggleCodeActions {
4277                    deployed_from_indicator: false,
4278                },
4279                cx,
4280            );
4281        });
4282        editor_b
4283            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4284            .await;
4285
4286        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4287
4288        // Confirming the code action will trigger a resolve request.
4289        let confirm_action = workspace_b
4290            .update(cx_b, |workspace, cx| {
4291                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4292            })
4293            .unwrap();
4294        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4295            |_, _| async move {
4296                Ok(lsp::CodeAction {
4297                    title: "Inline into all callers".to_string(),
4298                    edit: Some(lsp::WorkspaceEdit {
4299                        changes: Some(
4300                            [
4301                                (
4302                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4303                                    vec![lsp::TextEdit::new(
4304                                        lsp::Range::new(
4305                                            lsp::Position::new(1, 22),
4306                                            lsp::Position::new(1, 34),
4307                                        ),
4308                                        "4".to_string(),
4309                                    )],
4310                                ),
4311                                (
4312                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4313                                    vec![lsp::TextEdit::new(
4314                                        lsp::Range::new(
4315                                            lsp::Position::new(0, 0),
4316                                            lsp::Position::new(0, 27),
4317                                        ),
4318                                        "".to_string(),
4319                                    )],
4320                                ),
4321                            ]
4322                            .into_iter()
4323                            .collect(),
4324                        ),
4325                        ..Default::default()
4326                    }),
4327                    ..Default::default()
4328                })
4329            },
4330        );
4331
4332        // After the action is confirmed, an editor containing both modified files is opened.
4333        confirm_action.await.unwrap();
4334        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4335            workspace
4336                .active_item(cx)
4337                .unwrap()
4338                .downcast::<Editor>()
4339                .unwrap()
4340        });
4341        code_action_editor.update(cx_b, |editor, cx| {
4342            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4343            editor.undo(&Undo, cx);
4344            assert_eq!(
4345                editor.text(cx),
4346                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4347            );
4348            editor.redo(&Redo, cx);
4349            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4350        });
4351    }
4352
4353    #[gpui::test(iterations = 10)]
4354    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4355        cx_a.foreground().forbid_parking();
4356        let lang_registry = Arc::new(LanguageRegistry::test());
4357        let fs = FakeFs::new(cx_a.background());
4358        cx_b.update(|cx| editor::init(cx));
4359
4360        // Set up a fake language server.
4361        let mut language = Language::new(
4362            LanguageConfig {
4363                name: "Rust".into(),
4364                path_suffixes: vec!["rs".to_string()],
4365                ..Default::default()
4366            },
4367            Some(tree_sitter_rust::language()),
4368        );
4369        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4370            capabilities: lsp::ServerCapabilities {
4371                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4372                    prepare_provider: Some(true),
4373                    work_done_progress_options: Default::default(),
4374                })),
4375                ..Default::default()
4376            },
4377            ..Default::default()
4378        });
4379        lang_registry.add(Arc::new(language));
4380
4381        // Connect to a server as 2 clients.
4382        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4383        let client_a = server.create_client(cx_a, "user_a").await;
4384        let client_b = server.create_client(cx_b, "user_b").await;
4385        server
4386            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4387            .await;
4388
4389        // Share a project as client A
4390        fs.insert_tree(
4391            "/dir",
4392            json!({
4393                "one.rs": "const ONE: usize = 1;",
4394                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4395            }),
4396        )
4397        .await;
4398        let project_a = cx_a.update(|cx| {
4399            Project::local(
4400                client_a.clone(),
4401                client_a.user_store.clone(),
4402                lang_registry.clone(),
4403                fs.clone(),
4404                cx,
4405            )
4406        });
4407        let (worktree_a, _) = project_a
4408            .update(cx_a, |p, cx| {
4409                p.find_or_create_local_worktree("/dir", true, cx)
4410            })
4411            .await
4412            .unwrap();
4413        worktree_a
4414            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4415            .await;
4416        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4417        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4418        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4419
4420        // Join the worktree as client B.
4421        let project_b = Project::remote(
4422            project_id,
4423            client_b.clone(),
4424            client_b.user_store.clone(),
4425            lang_registry.clone(),
4426            fs.clone(),
4427            &mut cx_b.to_async(),
4428        )
4429        .await
4430        .unwrap();
4431        let mut params = cx_b.update(WorkspaceParams::test);
4432        params.languages = lang_registry.clone();
4433        params.client = client_b.client.clone();
4434        params.user_store = client_b.user_store.clone();
4435        params.project = project_b;
4436
4437        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4438        let editor_b = workspace_b
4439            .update(cx_b, |workspace, cx| {
4440                workspace.open_path((worktree_id, "one.rs"), true, cx)
4441            })
4442            .await
4443            .unwrap()
4444            .downcast::<Editor>()
4445            .unwrap();
4446        let fake_language_server = fake_language_servers.next().await.unwrap();
4447
4448        // Move cursor to a location that can be renamed.
4449        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4450            editor.select_ranges([7..7], None, cx);
4451            editor.rename(&Rename, cx).unwrap()
4452        });
4453
4454        fake_language_server
4455            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4456                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4457                assert_eq!(params.position, lsp::Position::new(0, 7));
4458                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4459                    lsp::Position::new(0, 6),
4460                    lsp::Position::new(0, 9),
4461                ))))
4462            })
4463            .next()
4464            .await
4465            .unwrap();
4466        prepare_rename.await.unwrap();
4467        editor_b.update(cx_b, |editor, cx| {
4468            let rename = editor.pending_rename().unwrap();
4469            let buffer = editor.buffer().read(cx).snapshot(cx);
4470            assert_eq!(
4471                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4472                6..9
4473            );
4474            rename.editor.update(cx, |rename_editor, cx| {
4475                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4476                    rename_buffer.edit([(0..3, "THREE")], cx);
4477                });
4478            });
4479        });
4480
4481        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4482            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4483        });
4484        fake_language_server
4485            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4486                assert_eq!(
4487                    params.text_document_position.text_document.uri.as_str(),
4488                    "file:///dir/one.rs"
4489                );
4490                assert_eq!(
4491                    params.text_document_position.position,
4492                    lsp::Position::new(0, 6)
4493                );
4494                assert_eq!(params.new_name, "THREE");
4495                Ok(Some(lsp::WorkspaceEdit {
4496                    changes: Some(
4497                        [
4498                            (
4499                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4500                                vec![lsp::TextEdit::new(
4501                                    lsp::Range::new(
4502                                        lsp::Position::new(0, 6),
4503                                        lsp::Position::new(0, 9),
4504                                    ),
4505                                    "THREE".to_string(),
4506                                )],
4507                            ),
4508                            (
4509                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4510                                vec![
4511                                    lsp::TextEdit::new(
4512                                        lsp::Range::new(
4513                                            lsp::Position::new(0, 24),
4514                                            lsp::Position::new(0, 27),
4515                                        ),
4516                                        "THREE".to_string(),
4517                                    ),
4518                                    lsp::TextEdit::new(
4519                                        lsp::Range::new(
4520                                            lsp::Position::new(0, 35),
4521                                            lsp::Position::new(0, 38),
4522                                        ),
4523                                        "THREE".to_string(),
4524                                    ),
4525                                ],
4526                            ),
4527                        ]
4528                        .into_iter()
4529                        .collect(),
4530                    ),
4531                    ..Default::default()
4532                }))
4533            })
4534            .next()
4535            .await
4536            .unwrap();
4537        confirm_rename.await.unwrap();
4538
4539        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4540            workspace
4541                .active_item(cx)
4542                .unwrap()
4543                .downcast::<Editor>()
4544                .unwrap()
4545        });
4546        rename_editor.update(cx_b, |editor, cx| {
4547            assert_eq!(
4548                editor.text(cx),
4549                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4550            );
4551            editor.undo(&Undo, cx);
4552            assert_eq!(
4553                editor.text(cx),
4554                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4555            );
4556            editor.redo(&Redo, cx);
4557            assert_eq!(
4558                editor.text(cx),
4559                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4560            );
4561        });
4562
4563        // Ensure temporary rename edits cannot be undone/redone.
4564        editor_b.update(cx_b, |editor, cx| {
4565            editor.undo(&Undo, cx);
4566            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4567            editor.undo(&Undo, cx);
4568            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4569            editor.redo(&Redo, cx);
4570            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4571        })
4572    }
4573
4574    #[gpui::test(iterations = 10)]
4575    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4576        cx_a.foreground().forbid_parking();
4577
4578        // Connect to a server as 2 clients.
4579        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4580        let client_a = server.create_client(cx_a, "user_a").await;
4581        let client_b = server.create_client(cx_b, "user_b").await;
4582
4583        // Create an org that includes these 2 users.
4584        let db = &server.app_state.db;
4585        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4586        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4587            .await
4588            .unwrap();
4589        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4590            .await
4591            .unwrap();
4592
4593        // Create a channel that includes all the users.
4594        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4595        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4596            .await
4597            .unwrap();
4598        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4599            .await
4600            .unwrap();
4601        db.create_channel_message(
4602            channel_id,
4603            client_b.current_user_id(&cx_b),
4604            "hello A, it's B.",
4605            OffsetDateTime::now_utc(),
4606            1,
4607        )
4608        .await
4609        .unwrap();
4610
4611        let channels_a = cx_a
4612            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4613        channels_a
4614            .condition(cx_a, |list, _| list.available_channels().is_some())
4615            .await;
4616        channels_a.read_with(cx_a, |list, _| {
4617            assert_eq!(
4618                list.available_channels().unwrap(),
4619                &[ChannelDetails {
4620                    id: channel_id.to_proto(),
4621                    name: "test-channel".to_string()
4622                }]
4623            )
4624        });
4625        let channel_a = channels_a.update(cx_a, |this, cx| {
4626            this.get_channel(channel_id.to_proto(), cx).unwrap()
4627        });
4628        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4629        channel_a
4630            .condition(&cx_a, |channel, _| {
4631                channel_messages(channel)
4632                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4633            })
4634            .await;
4635
4636        let channels_b = cx_b
4637            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4638        channels_b
4639            .condition(cx_b, |list, _| list.available_channels().is_some())
4640            .await;
4641        channels_b.read_with(cx_b, |list, _| {
4642            assert_eq!(
4643                list.available_channels().unwrap(),
4644                &[ChannelDetails {
4645                    id: channel_id.to_proto(),
4646                    name: "test-channel".to_string()
4647                }]
4648            )
4649        });
4650
4651        let channel_b = channels_b.update(cx_b, |this, cx| {
4652            this.get_channel(channel_id.to_proto(), cx).unwrap()
4653        });
4654        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4655        channel_b
4656            .condition(&cx_b, |channel, _| {
4657                channel_messages(channel)
4658                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4659            })
4660            .await;
4661
4662        channel_a
4663            .update(cx_a, |channel, cx| {
4664                channel
4665                    .send_message("oh, hi B.".to_string(), cx)
4666                    .unwrap()
4667                    .detach();
4668                let task = channel.send_message("sup".to_string(), cx).unwrap();
4669                assert_eq!(
4670                    channel_messages(channel),
4671                    &[
4672                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4673                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4674                        ("user_a".to_string(), "sup".to_string(), true)
4675                    ]
4676                );
4677                task
4678            })
4679            .await
4680            .unwrap();
4681
4682        channel_b
4683            .condition(&cx_b, |channel, _| {
4684                channel_messages(channel)
4685                    == [
4686                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4687                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4688                        ("user_a".to_string(), "sup".to_string(), false),
4689                    ]
4690            })
4691            .await;
4692
4693        assert_eq!(
4694            server
4695                .state()
4696                .await
4697                .channel(channel_id)
4698                .unwrap()
4699                .connection_ids
4700                .len(),
4701            2
4702        );
4703        cx_b.update(|_| drop(channel_b));
4704        server
4705            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4706            .await;
4707
4708        cx_a.update(|_| drop(channel_a));
4709        server
4710            .condition(|state| state.channel(channel_id).is_none())
4711            .await;
4712    }
4713
4714    #[gpui::test(iterations = 10)]
4715    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4716        cx_a.foreground().forbid_parking();
4717
4718        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4719        let client_a = server.create_client(cx_a, "user_a").await;
4720
4721        let db = &server.app_state.db;
4722        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4723        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4724        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4725            .await
4726            .unwrap();
4727        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4728            .await
4729            .unwrap();
4730
4731        let channels_a = cx_a
4732            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4733        channels_a
4734            .condition(cx_a, |list, _| list.available_channels().is_some())
4735            .await;
4736        let channel_a = channels_a.update(cx_a, |this, cx| {
4737            this.get_channel(channel_id.to_proto(), cx).unwrap()
4738        });
4739
4740        // Messages aren't allowed to be too long.
4741        channel_a
4742            .update(cx_a, |channel, cx| {
4743                let long_body = "this is long.\n".repeat(1024);
4744                channel.send_message(long_body, cx).unwrap()
4745            })
4746            .await
4747            .unwrap_err();
4748
4749        // Messages aren't allowed to be blank.
4750        channel_a.update(cx_a, |channel, cx| {
4751            channel.send_message(String::new(), cx).unwrap_err()
4752        });
4753
4754        // Leading and trailing whitespace are trimmed.
4755        channel_a
4756            .update(cx_a, |channel, cx| {
4757                channel
4758                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4759                    .unwrap()
4760            })
4761            .await
4762            .unwrap();
4763        assert_eq!(
4764            db.get_channel_messages(channel_id, 10, None)
4765                .await
4766                .unwrap()
4767                .iter()
4768                .map(|m| &m.body)
4769                .collect::<Vec<_>>(),
4770            &["surrounded by whitespace"]
4771        );
4772    }
4773
4774    #[gpui::test(iterations = 10)]
4775    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4776        cx_a.foreground().forbid_parking();
4777
4778        // Connect to a server as 2 clients.
4779        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4780        let client_a = server.create_client(cx_a, "user_a").await;
4781        let client_b = server.create_client(cx_b, "user_b").await;
4782        let mut status_b = client_b.status();
4783
4784        // Create an org that includes these 2 users.
4785        let db = &server.app_state.db;
4786        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4787        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4788            .await
4789            .unwrap();
4790        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4791            .await
4792            .unwrap();
4793
4794        // Create a channel that includes all the users.
4795        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4796        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4797            .await
4798            .unwrap();
4799        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4800            .await
4801            .unwrap();
4802        db.create_channel_message(
4803            channel_id,
4804            client_b.current_user_id(&cx_b),
4805            "hello A, it's B.",
4806            OffsetDateTime::now_utc(),
4807            2,
4808        )
4809        .await
4810        .unwrap();
4811
4812        let channels_a = cx_a
4813            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4814        channels_a
4815            .condition(cx_a, |list, _| list.available_channels().is_some())
4816            .await;
4817
4818        channels_a.read_with(cx_a, |list, _| {
4819            assert_eq!(
4820                list.available_channels().unwrap(),
4821                &[ChannelDetails {
4822                    id: channel_id.to_proto(),
4823                    name: "test-channel".to_string()
4824                }]
4825            )
4826        });
4827        let channel_a = channels_a.update(cx_a, |this, cx| {
4828            this.get_channel(channel_id.to_proto(), cx).unwrap()
4829        });
4830        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4831        channel_a
4832            .condition(&cx_a, |channel, _| {
4833                channel_messages(channel)
4834                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4835            })
4836            .await;
4837
4838        let channels_b = cx_b
4839            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4840        channels_b
4841            .condition(cx_b, |list, _| list.available_channels().is_some())
4842            .await;
4843        channels_b.read_with(cx_b, |list, _| {
4844            assert_eq!(
4845                list.available_channels().unwrap(),
4846                &[ChannelDetails {
4847                    id: channel_id.to_proto(),
4848                    name: "test-channel".to_string()
4849                }]
4850            )
4851        });
4852
4853        let channel_b = channels_b.update(cx_b, |this, cx| {
4854            this.get_channel(channel_id.to_proto(), cx).unwrap()
4855        });
4856        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4857        channel_b
4858            .condition(&cx_b, |channel, _| {
4859                channel_messages(channel)
4860                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4861            })
4862            .await;
4863
4864        // Disconnect client B, ensuring we can still access its cached channel data.
4865        server.forbid_connections();
4866        server.disconnect_client(client_b.current_user_id(&cx_b));
4867        cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4868        while !matches!(
4869            status_b.next().await,
4870            Some(client::Status::ReconnectionError { .. })
4871        ) {}
4872
4873        channels_b.read_with(cx_b, |channels, _| {
4874            assert_eq!(
4875                channels.available_channels().unwrap(),
4876                [ChannelDetails {
4877                    id: channel_id.to_proto(),
4878                    name: "test-channel".to_string()
4879                }]
4880            )
4881        });
4882        channel_b.read_with(cx_b, |channel, _| {
4883            assert_eq!(
4884                channel_messages(channel),
4885                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4886            )
4887        });
4888
4889        // Send a message from client B while it is disconnected.
4890        channel_b
4891            .update(cx_b, |channel, cx| {
4892                let task = channel
4893                    .send_message("can you see this?".to_string(), cx)
4894                    .unwrap();
4895                assert_eq!(
4896                    channel_messages(channel),
4897                    &[
4898                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4899                        ("user_b".to_string(), "can you see this?".to_string(), true)
4900                    ]
4901                );
4902                task
4903            })
4904            .await
4905            .unwrap_err();
4906
4907        // Send a message from client A while B is disconnected.
4908        channel_a
4909            .update(cx_a, |channel, cx| {
4910                channel
4911                    .send_message("oh, hi B.".to_string(), cx)
4912                    .unwrap()
4913                    .detach();
4914                let task = channel.send_message("sup".to_string(), cx).unwrap();
4915                assert_eq!(
4916                    channel_messages(channel),
4917                    &[
4918                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4919                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4920                        ("user_a".to_string(), "sup".to_string(), true)
4921                    ]
4922                );
4923                task
4924            })
4925            .await
4926            .unwrap();
4927
4928        // Give client B a chance to reconnect.
4929        server.allow_connections();
4930        cx_b.foreground().advance_clock(Duration::from_secs(10));
4931
4932        // Verify that B sees the new messages upon reconnection, as well as the message client B
4933        // sent while offline.
4934        channel_b
4935            .condition(&cx_b, |channel, _| {
4936                channel_messages(channel)
4937                    == [
4938                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4939                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4940                        ("user_a".to_string(), "sup".to_string(), false),
4941                        ("user_b".to_string(), "can you see this?".to_string(), false),
4942                    ]
4943            })
4944            .await;
4945
4946        // Ensure client A and B can communicate normally after reconnection.
4947        channel_a
4948            .update(cx_a, |channel, cx| {
4949                channel.send_message("you online?".to_string(), cx).unwrap()
4950            })
4951            .await
4952            .unwrap();
4953        channel_b
4954            .condition(&cx_b, |channel, _| {
4955                channel_messages(channel)
4956                    == [
4957                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4958                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4959                        ("user_a".to_string(), "sup".to_string(), false),
4960                        ("user_b".to_string(), "can you see this?".to_string(), false),
4961                        ("user_a".to_string(), "you online?".to_string(), false),
4962                    ]
4963            })
4964            .await;
4965
4966        channel_b
4967            .update(cx_b, |channel, cx| {
4968                channel.send_message("yep".to_string(), cx).unwrap()
4969            })
4970            .await
4971            .unwrap();
4972        channel_a
4973            .condition(&cx_a, |channel, _| {
4974                channel_messages(channel)
4975                    == [
4976                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4977                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4978                        ("user_a".to_string(), "sup".to_string(), false),
4979                        ("user_b".to_string(), "can you see this?".to_string(), false),
4980                        ("user_a".to_string(), "you online?".to_string(), false),
4981                        ("user_b".to_string(), "yep".to_string(), false),
4982                    ]
4983            })
4984            .await;
4985    }
4986
4987    #[gpui::test(iterations = 10)]
4988    async fn test_contacts(
4989        deterministic: Arc<Deterministic>,
4990        cx_a: &mut TestAppContext,
4991        cx_b: &mut TestAppContext,
4992        cx_c: &mut TestAppContext,
4993    ) {
4994        cx_a.foreground().forbid_parking();
4995        let lang_registry = Arc::new(LanguageRegistry::test());
4996        let fs = FakeFs::new(cx_a.background());
4997
4998        // Connect to a server as 3 clients.
4999        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5000        let client_a = server.create_client(cx_a, "user_a").await;
5001        let client_b = server.create_client(cx_b, "user_b").await;
5002        let client_c = server.create_client(cx_c, "user_c").await;
5003        server
5004            .make_contacts(vec![
5005                (&client_a, cx_a),
5006                (&client_b, cx_b),
5007                (&client_c, cx_c),
5008            ])
5009            .await;
5010        deterministic.run_until_parked();
5011        client_a.user_store.read_with(cx_a, |store, _| {
5012            assert_eq!(
5013                contacts(store),
5014                [("user_b", true, vec![]), ("user_c", true, vec![])]
5015            )
5016        });
5017        client_b.user_store.read_with(cx_b, |store, _| {
5018            assert_eq!(
5019                contacts(store),
5020                [("user_a", true, vec![]), ("user_c", true, vec![])]
5021            )
5022        });
5023        client_c.user_store.read_with(cx_c, |store, _| {
5024            assert_eq!(
5025                contacts(store),
5026                [("user_a", true, vec![]), ("user_b", true, vec![])]
5027            )
5028        });
5029
5030        // Share a worktree as client A.
5031        fs.create_dir(Path::new("/a")).await.unwrap();
5032
5033        let project_a = cx_a.update(|cx| {
5034            Project::local(
5035                client_a.clone(),
5036                client_a.user_store.clone(),
5037                lang_registry.clone(),
5038                fs.clone(),
5039                cx,
5040            )
5041        });
5042        let (worktree_a, _) = project_a
5043            .update(cx_a, |p, cx| {
5044                p.find_or_create_local_worktree("/a", true, cx)
5045            })
5046            .await
5047            .unwrap();
5048        worktree_a
5049            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
5050            .await;
5051
5052        deterministic.run_until_parked();
5053        client_a.user_store.read_with(cx_a, |store, _| {
5054            assert_eq!(
5055                contacts(store),
5056                [("user_b", true, vec![]), ("user_c", true, vec![])]
5057            )
5058        });
5059        client_b.user_store.read_with(cx_b, |store, _| {
5060            assert_eq!(
5061                contacts(store),
5062                [
5063                    ("user_a", true, vec![("a", false, vec![])]),
5064                    ("user_c", true, vec![])
5065                ]
5066            )
5067        });
5068        client_c.user_store.read_with(cx_c, |store, _| {
5069            assert_eq!(
5070                contacts(store),
5071                [
5072                    ("user_a", true, vec![("a", false, vec![])]),
5073                    ("user_b", true, vec![])
5074                ]
5075            )
5076        });
5077
5078        let project_id = project_a
5079            .update(cx_a, |project, _| project.next_remote_id())
5080            .await;
5081        project_a
5082            .update(cx_a, |project, cx| project.share(cx))
5083            .await
5084            .unwrap();
5085        deterministic.run_until_parked();
5086        client_a.user_store.read_with(cx_a, |store, _| {
5087            assert_eq!(
5088                contacts(store),
5089                [("user_b", true, vec![]), ("user_c", true, vec![])]
5090            )
5091        });
5092        client_b.user_store.read_with(cx_b, |store, _| {
5093            assert_eq!(
5094                contacts(store),
5095                [
5096                    ("user_a", true, vec![("a", true, vec![])]),
5097                    ("user_c", true, vec![])
5098                ]
5099            )
5100        });
5101        client_c.user_store.read_with(cx_c, |store, _| {
5102            assert_eq!(
5103                contacts(store),
5104                [
5105                    ("user_a", true, vec![("a", true, vec![])]),
5106                    ("user_b", true, vec![])
5107                ]
5108            )
5109        });
5110
5111        let _project_b = Project::remote(
5112            project_id,
5113            client_b.clone(),
5114            client_b.user_store.clone(),
5115            lang_registry.clone(),
5116            fs.clone(),
5117            &mut cx_b.to_async(),
5118        )
5119        .await
5120        .unwrap();
5121        deterministic.run_until_parked();
5122        client_a.user_store.read_with(cx_a, |store, _| {
5123            assert_eq!(
5124                contacts(store),
5125                [("user_b", true, vec![]), ("user_c", true, vec![])]
5126            )
5127        });
5128        client_b.user_store.read_with(cx_b, |store, _| {
5129            assert_eq!(
5130                contacts(store),
5131                [
5132                    ("user_a", true, vec![("a", true, vec!["user_b"])]),
5133                    ("user_c", true, vec![])
5134                ]
5135            )
5136        });
5137        client_c.user_store.read_with(cx_c, |store, _| {
5138            assert_eq!(
5139                contacts(store),
5140                [
5141                    ("user_a", true, vec![("a", true, vec!["user_b"])]),
5142                    ("user_b", true, vec![])
5143                ]
5144            )
5145        });
5146
5147        project_a
5148            .condition(&cx_a, |project, _| {
5149                project.collaborators().contains_key(&client_b.peer_id)
5150            })
5151            .await;
5152
5153        cx_a.update(move |_| drop(project_a));
5154        deterministic.run_until_parked();
5155        client_a.user_store.read_with(cx_a, |store, _| {
5156            assert_eq!(
5157                contacts(store),
5158                [("user_b", true, vec![]), ("user_c", true, vec![])]
5159            )
5160        });
5161        client_b.user_store.read_with(cx_b, |store, _| {
5162            assert_eq!(
5163                contacts(store),
5164                [("user_a", true, vec![]), ("user_c", true, vec![])]
5165            )
5166        });
5167        client_c.user_store.read_with(cx_c, |store, _| {
5168            assert_eq!(
5169                contacts(store),
5170                [("user_a", true, vec![]), ("user_b", true, vec![])]
5171            )
5172        });
5173
5174        server.disconnect_client(client_c.current_user_id(cx_c));
5175        server.forbid_connections();
5176        deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5177
5178        client_a.user_store.read_with(cx_a, |store, _| {
5179            assert_eq!(
5180                contacts(store),
5181                [("user_b", true, vec![]), ("user_c", false, vec![])]
5182            )
5183        });
5184        client_b.user_store.read_with(cx_b, |store, _| {
5185            assert_eq!(
5186                contacts(store),
5187                [("user_a", true, vec![]), ("user_c", false, vec![])]
5188            )
5189        });
5190        client_c
5191            .user_store
5192            .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5193
5194        server.allow_connections();
5195        client_c
5196            .authenticate_and_connect(false, &cx_c.to_async())
5197            .await
5198            .unwrap();
5199        deterministic.run_until_parked();
5200        client_a.user_store.read_with(cx_a, |store, _| {
5201            assert_eq!(
5202                contacts(store),
5203                [("user_b", true, vec![]), ("user_c", true, vec![])]
5204            )
5205        });
5206        client_b.user_store.read_with(cx_b, |store, _| {
5207            assert_eq!(
5208                contacts(store),
5209                [("user_a", true, vec![]), ("user_c", true, vec![])]
5210            )
5211        });
5212        client_c.user_store.read_with(cx_c, |store, _| {
5213            assert_eq!(
5214                contacts(store),
5215                [("user_a", true, vec![]), ("user_b", true, vec![])]
5216            )
5217        });
5218
5219        fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, bool, Vec<&str>)>)> {
5220            user_store
5221                .contacts()
5222                .iter()
5223                .map(|contact| {
5224                    let worktrees = contact
5225                        .projects
5226                        .iter()
5227                        .map(|p| {
5228                            (
5229                                p.worktree_root_names[0].as_str(),
5230                                p.is_shared,
5231                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5232                            )
5233                        })
5234                        .collect();
5235                    (
5236                        contact.user.github_login.as_str(),
5237                        contact.online,
5238                        worktrees,
5239                    )
5240                })
5241                .collect()
5242        }
5243    }
5244
5245    #[gpui::test(iterations = 10)]
5246    async fn test_contact_requests(
5247        executor: Arc<Deterministic>,
5248        cx_a: &mut TestAppContext,
5249        cx_a2: &mut TestAppContext,
5250        cx_b: &mut TestAppContext,
5251        cx_b2: &mut TestAppContext,
5252        cx_c: &mut TestAppContext,
5253        cx_c2: &mut TestAppContext,
5254    ) {
5255        cx_a.foreground().forbid_parking();
5256
5257        // Connect to a server as 3 clients.
5258        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5259        let client_a = server.create_client(cx_a, "user_a").await;
5260        let client_a2 = server.create_client(cx_a2, "user_a").await;
5261        let client_b = server.create_client(cx_b, "user_b").await;
5262        let client_b2 = server.create_client(cx_b2, "user_b").await;
5263        let client_c = server.create_client(cx_c, "user_c").await;
5264        let client_c2 = server.create_client(cx_c2, "user_c").await;
5265
5266        assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5267        assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5268        assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5269
5270        // User A and User C request that user B become their contact.
5271        client_a
5272            .user_store
5273            .update(cx_a, |store, cx| {
5274                store.request_contact(client_b.user_id().unwrap(), cx)
5275            })
5276            .await
5277            .unwrap();
5278        client_c
5279            .user_store
5280            .update(cx_c, |store, cx| {
5281                store.request_contact(client_b.user_id().unwrap(), cx)
5282            })
5283            .await
5284            .unwrap();
5285        executor.run_until_parked();
5286
5287        // All users see the pending request appear in all their clients.
5288        assert_eq!(
5289            client_a.summarize_contacts(&cx_a).outgoing_requests,
5290            &["user_b"]
5291        );
5292        assert_eq!(
5293            client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5294            &["user_b"]
5295        );
5296        assert_eq!(
5297            client_b.summarize_contacts(&cx_b).incoming_requests,
5298            &["user_a", "user_c"]
5299        );
5300        assert_eq!(
5301            client_b2.summarize_contacts(&cx_b2).incoming_requests,
5302            &["user_a", "user_c"]
5303        );
5304        assert_eq!(
5305            client_c.summarize_contacts(&cx_c).outgoing_requests,
5306            &["user_b"]
5307        );
5308        assert_eq!(
5309            client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5310            &["user_b"]
5311        );
5312
5313        // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5314        disconnect_and_reconnect(&client_a, cx_a).await;
5315        disconnect_and_reconnect(&client_b, cx_b).await;
5316        disconnect_and_reconnect(&client_c, cx_c).await;
5317        executor.run_until_parked();
5318        assert_eq!(
5319            client_a.summarize_contacts(&cx_a).outgoing_requests,
5320            &["user_b"]
5321        );
5322        assert_eq!(
5323            client_b.summarize_contacts(&cx_b).incoming_requests,
5324            &["user_a", "user_c"]
5325        );
5326        assert_eq!(
5327            client_c.summarize_contacts(&cx_c).outgoing_requests,
5328            &["user_b"]
5329        );
5330
5331        // User B accepts the request from user A.
5332        client_b
5333            .user_store
5334            .update(cx_b, |store, cx| {
5335                store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5336            })
5337            .await
5338            .unwrap();
5339
5340        executor.run_until_parked();
5341
5342        // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5343        let contacts_b = client_b.summarize_contacts(&cx_b);
5344        assert_eq!(contacts_b.current, &["user_a"]);
5345        assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5346        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5347        assert_eq!(contacts_b2.current, &["user_a"]);
5348        assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5349
5350        // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5351        let contacts_a = client_a.summarize_contacts(&cx_a);
5352        assert_eq!(contacts_a.current, &["user_b"]);
5353        assert!(contacts_a.outgoing_requests.is_empty());
5354        let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5355        assert_eq!(contacts_a2.current, &["user_b"]);
5356        assert!(contacts_a2.outgoing_requests.is_empty());
5357
5358        // Contacts are present upon connecting (tested here via disconnect/reconnect)
5359        disconnect_and_reconnect(&client_a, cx_a).await;
5360        disconnect_and_reconnect(&client_b, cx_b).await;
5361        disconnect_and_reconnect(&client_c, cx_c).await;
5362        executor.run_until_parked();
5363        assert_eq!(client_a.summarize_contacts(&cx_a).current, &["user_b"]);
5364        assert_eq!(client_b.summarize_contacts(&cx_b).current, &["user_a"]);
5365        assert_eq!(
5366            client_b.summarize_contacts(&cx_b).incoming_requests,
5367            &["user_c"]
5368        );
5369        assert!(client_c.summarize_contacts(&cx_c).current.is_empty());
5370        assert_eq!(
5371            client_c.summarize_contacts(&cx_c).outgoing_requests,
5372            &["user_b"]
5373        );
5374
5375        // User B rejects the request from user C.
5376        client_b
5377            .user_store
5378            .update(cx_b, |store, cx| {
5379                store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5380            })
5381            .await
5382            .unwrap();
5383
5384        executor.run_until_parked();
5385
5386        // User B doesn't see user C as their contact, and the incoming request from them is removed.
5387        let contacts_b = client_b.summarize_contacts(&cx_b);
5388        assert_eq!(contacts_b.current, &["user_a"]);
5389        assert!(contacts_b.incoming_requests.is_empty());
5390        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5391        assert_eq!(contacts_b2.current, &["user_a"]);
5392        assert!(contacts_b2.incoming_requests.is_empty());
5393
5394        // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5395        let contacts_c = client_c.summarize_contacts(&cx_c);
5396        assert!(contacts_c.current.is_empty());
5397        assert!(contacts_c.outgoing_requests.is_empty());
5398        let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5399        assert!(contacts_c2.current.is_empty());
5400        assert!(contacts_c2.outgoing_requests.is_empty());
5401
5402        // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5403        disconnect_and_reconnect(&client_a, cx_a).await;
5404        disconnect_and_reconnect(&client_b, cx_b).await;
5405        disconnect_and_reconnect(&client_c, cx_c).await;
5406        executor.run_until_parked();
5407        assert_eq!(client_a.summarize_contacts(&cx_a).current, &["user_b"]);
5408        assert_eq!(client_b.summarize_contacts(&cx_b).current, &["user_a"]);
5409        assert!(client_b
5410            .summarize_contacts(&cx_b)
5411            .incoming_requests
5412            .is_empty());
5413        assert!(client_c.summarize_contacts(&cx_c).current.is_empty());
5414        assert!(client_c
5415            .summarize_contacts(&cx_c)
5416            .outgoing_requests
5417            .is_empty());
5418
5419        async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5420            client.disconnect(&cx.to_async()).unwrap();
5421            client.clear_contacts(cx).await;
5422            client
5423                .authenticate_and_connect(false, &cx.to_async())
5424                .await
5425                .unwrap();
5426        }
5427    }
5428
5429    #[gpui::test(iterations = 10)]
5430    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5431        cx_a.foreground().forbid_parking();
5432        let fs = FakeFs::new(cx_a.background());
5433
5434        // 2 clients connect to a server.
5435        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5436        let mut client_a = server.create_client(cx_a, "user_a").await;
5437        let mut client_b = server.create_client(cx_b, "user_b").await;
5438        server
5439            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5440            .await;
5441        cx_a.update(editor::init);
5442        cx_b.update(editor::init);
5443
5444        // Client A shares a project.
5445        fs.insert_tree(
5446            "/a",
5447            json!({
5448                "1.txt": "one",
5449                "2.txt": "two",
5450                "3.txt": "three",
5451            }),
5452        )
5453        .await;
5454        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5455        project_a
5456            .update(cx_a, |project, cx| project.share(cx))
5457            .await
5458            .unwrap();
5459
5460        // Client B joins the project.
5461        let project_b = client_b
5462            .build_remote_project(
5463                project_a
5464                    .read_with(cx_a, |project, _| project.remote_id())
5465                    .unwrap(),
5466                cx_b,
5467            )
5468            .await;
5469
5470        // Client A opens some editors.
5471        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5472        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5473        let editor_a1 = workspace_a
5474            .update(cx_a, |workspace, cx| {
5475                workspace.open_path((worktree_id, "1.txt"), true, cx)
5476            })
5477            .await
5478            .unwrap()
5479            .downcast::<Editor>()
5480            .unwrap();
5481        let editor_a2 = workspace_a
5482            .update(cx_a, |workspace, cx| {
5483                workspace.open_path((worktree_id, "2.txt"), true, cx)
5484            })
5485            .await
5486            .unwrap()
5487            .downcast::<Editor>()
5488            .unwrap();
5489
5490        // Client B opens an editor.
5491        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5492        let editor_b1 = workspace_b
5493            .update(cx_b, |workspace, cx| {
5494                workspace.open_path((worktree_id, "1.txt"), true, cx)
5495            })
5496            .await
5497            .unwrap()
5498            .downcast::<Editor>()
5499            .unwrap();
5500
5501        let client_a_id = project_b.read_with(cx_b, |project, _| {
5502            project.collaborators().values().next().unwrap().peer_id
5503        });
5504        let client_b_id = project_a.read_with(cx_a, |project, _| {
5505            project.collaborators().values().next().unwrap().peer_id
5506        });
5507
5508        // When client B starts following client A, all visible view states are replicated to client B.
5509        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5510        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5511        workspace_b
5512            .update(cx_b, |workspace, cx| {
5513                workspace
5514                    .toggle_follow(&ToggleFollow(client_a_id), cx)
5515                    .unwrap()
5516            })
5517            .await
5518            .unwrap();
5519        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5520            workspace
5521                .active_item(cx)
5522                .unwrap()
5523                .downcast::<Editor>()
5524                .unwrap()
5525        });
5526        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5527        assert_eq!(
5528            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5529            Some((worktree_id, "2.txt").into())
5530        );
5531        assert_eq!(
5532            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5533            vec![2..3]
5534        );
5535        assert_eq!(
5536            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5537            vec![0..1]
5538        );
5539
5540        // When client A activates a different editor, client B does so as well.
5541        workspace_a.update(cx_a, |workspace, cx| {
5542            workspace.activate_item(&editor_a1, cx)
5543        });
5544        workspace_b
5545            .condition(cx_b, |workspace, cx| {
5546                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5547            })
5548            .await;
5549
5550        // When client A navigates back and forth, client B does so as well.
5551        workspace_a
5552            .update(cx_a, |workspace, cx| {
5553                workspace::Pane::go_back(workspace, None, cx)
5554            })
5555            .await;
5556        workspace_b
5557            .condition(cx_b, |workspace, cx| {
5558                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5559            })
5560            .await;
5561
5562        workspace_a
5563            .update(cx_a, |workspace, cx| {
5564                workspace::Pane::go_forward(workspace, None, cx)
5565            })
5566            .await;
5567        workspace_b
5568            .condition(cx_b, |workspace, cx| {
5569                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5570            })
5571            .await;
5572
5573        // Changes to client A's editor are reflected on client B.
5574        editor_a1.update(cx_a, |editor, cx| {
5575            editor.select_ranges([1..1, 2..2], None, cx);
5576        });
5577        editor_b1
5578            .condition(cx_b, |editor, cx| {
5579                editor.selected_ranges(cx) == vec![1..1, 2..2]
5580            })
5581            .await;
5582
5583        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5584        editor_b1
5585            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5586            .await;
5587
5588        editor_a1.update(cx_a, |editor, cx| {
5589            editor.select_ranges([3..3], None, cx);
5590            editor.set_scroll_position(vec2f(0., 100.), cx);
5591        });
5592        editor_b1
5593            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5594            .await;
5595
5596        // After unfollowing, client B stops receiving updates from client A.
5597        workspace_b.update(cx_b, |workspace, cx| {
5598            workspace.unfollow(&workspace.active_pane().clone(), cx)
5599        });
5600        workspace_a.update(cx_a, |workspace, cx| {
5601            workspace.activate_item(&editor_a2, cx)
5602        });
5603        cx_a.foreground().run_until_parked();
5604        assert_eq!(
5605            workspace_b.read_with(cx_b, |workspace, cx| workspace
5606                .active_item(cx)
5607                .unwrap()
5608                .id()),
5609            editor_b1.id()
5610        );
5611
5612        // Client A starts following client B.
5613        workspace_a
5614            .update(cx_a, |workspace, cx| {
5615                workspace
5616                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5617                    .unwrap()
5618            })
5619            .await
5620            .unwrap();
5621        assert_eq!(
5622            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5623            Some(client_b_id)
5624        );
5625        assert_eq!(
5626            workspace_a.read_with(cx_a, |workspace, cx| workspace
5627                .active_item(cx)
5628                .unwrap()
5629                .id()),
5630            editor_a1.id()
5631        );
5632
5633        // Following interrupts when client B disconnects.
5634        client_b.disconnect(&cx_b.to_async()).unwrap();
5635        cx_a.foreground().run_until_parked();
5636        assert_eq!(
5637            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5638            None
5639        );
5640    }
5641
5642    #[gpui::test(iterations = 10)]
5643    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5644        cx_a.foreground().forbid_parking();
5645        let fs = FakeFs::new(cx_a.background());
5646
5647        // 2 clients connect to a server.
5648        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5649        let mut client_a = server.create_client(cx_a, "user_a").await;
5650        let mut client_b = server.create_client(cx_b, "user_b").await;
5651        server
5652            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5653            .await;
5654        cx_a.update(editor::init);
5655        cx_b.update(editor::init);
5656
5657        // Client A shares a project.
5658        fs.insert_tree(
5659            "/a",
5660            json!({
5661                "1.txt": "one",
5662                "2.txt": "two",
5663                "3.txt": "three",
5664                "4.txt": "four",
5665            }),
5666        )
5667        .await;
5668        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5669        project_a
5670            .update(cx_a, |project, cx| project.share(cx))
5671            .await
5672            .unwrap();
5673
5674        // Client B joins the project.
5675        let project_b = client_b
5676            .build_remote_project(
5677                project_a
5678                    .read_with(cx_a, |project, _| project.remote_id())
5679                    .unwrap(),
5680                cx_b,
5681            )
5682            .await;
5683
5684        // Client A opens some editors.
5685        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5686        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5687        let _editor_a1 = workspace_a
5688            .update(cx_a, |workspace, cx| {
5689                workspace.open_path((worktree_id, "1.txt"), true, cx)
5690            })
5691            .await
5692            .unwrap()
5693            .downcast::<Editor>()
5694            .unwrap();
5695
5696        // Client B opens an editor.
5697        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5698        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5699        let _editor_b1 = workspace_b
5700            .update(cx_b, |workspace, cx| {
5701                workspace.open_path((worktree_id, "2.txt"), true, cx)
5702            })
5703            .await
5704            .unwrap()
5705            .downcast::<Editor>()
5706            .unwrap();
5707
5708        // Clients A and B follow each other in split panes
5709        workspace_a
5710            .update(cx_a, |workspace, cx| {
5711                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5712                assert_ne!(*workspace.active_pane(), pane_a1);
5713                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5714                workspace
5715                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5716                    .unwrap()
5717            })
5718            .await
5719            .unwrap();
5720        workspace_b
5721            .update(cx_b, |workspace, cx| {
5722                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5723                assert_ne!(*workspace.active_pane(), pane_b1);
5724                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5725                workspace
5726                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5727                    .unwrap()
5728            })
5729            .await
5730            .unwrap();
5731
5732        workspace_a
5733            .update(cx_a, |workspace, cx| {
5734                workspace.activate_next_pane(cx);
5735                assert_eq!(*workspace.active_pane(), pane_a1);
5736                workspace.open_path((worktree_id, "3.txt"), true, cx)
5737            })
5738            .await
5739            .unwrap();
5740        workspace_b
5741            .update(cx_b, |workspace, cx| {
5742                workspace.activate_next_pane(cx);
5743                assert_eq!(*workspace.active_pane(), pane_b1);
5744                workspace.open_path((worktree_id, "4.txt"), true, cx)
5745            })
5746            .await
5747            .unwrap();
5748        cx_a.foreground().run_until_parked();
5749
5750        // Ensure leader updates don't change the active pane of followers
5751        workspace_a.read_with(cx_a, |workspace, _| {
5752            assert_eq!(*workspace.active_pane(), pane_a1);
5753        });
5754        workspace_b.read_with(cx_b, |workspace, _| {
5755            assert_eq!(*workspace.active_pane(), pane_b1);
5756        });
5757
5758        // Ensure peers following each other doesn't cause an infinite loop.
5759        assert_eq!(
5760            workspace_a.read_with(cx_a, |workspace, cx| workspace
5761                .active_item(cx)
5762                .unwrap()
5763                .project_path(cx)),
5764            Some((worktree_id, "3.txt").into())
5765        );
5766        workspace_a.update(cx_a, |workspace, cx| {
5767            assert_eq!(
5768                workspace.active_item(cx).unwrap().project_path(cx),
5769                Some((worktree_id, "3.txt").into())
5770            );
5771            workspace.activate_next_pane(cx);
5772            assert_eq!(
5773                workspace.active_item(cx).unwrap().project_path(cx),
5774                Some((worktree_id, "4.txt").into())
5775            );
5776        });
5777        workspace_b.update(cx_b, |workspace, cx| {
5778            assert_eq!(
5779                workspace.active_item(cx).unwrap().project_path(cx),
5780                Some((worktree_id, "4.txt").into())
5781            );
5782            workspace.activate_next_pane(cx);
5783            assert_eq!(
5784                workspace.active_item(cx).unwrap().project_path(cx),
5785                Some((worktree_id, "3.txt").into())
5786            );
5787        });
5788    }
5789
5790    #[gpui::test(iterations = 10)]
5791    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5792        cx_a.foreground().forbid_parking();
5793        let fs = FakeFs::new(cx_a.background());
5794
5795        // 2 clients connect to a server.
5796        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5797        let mut client_a = server.create_client(cx_a, "user_a").await;
5798        let mut client_b = server.create_client(cx_b, "user_b").await;
5799        server
5800            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5801            .await;
5802        cx_a.update(editor::init);
5803        cx_b.update(editor::init);
5804
5805        // Client A shares a project.
5806        fs.insert_tree(
5807            "/a",
5808            json!({
5809                "1.txt": "one",
5810                "2.txt": "two",
5811                "3.txt": "three",
5812            }),
5813        )
5814        .await;
5815        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5816        project_a
5817            .update(cx_a, |project, cx| project.share(cx))
5818            .await
5819            .unwrap();
5820
5821        // Client B joins the project.
5822        let project_b = client_b
5823            .build_remote_project(
5824                project_a
5825                    .read_with(cx_a, |project, _| project.remote_id())
5826                    .unwrap(),
5827                cx_b,
5828            )
5829            .await;
5830
5831        // Client A opens some editors.
5832        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5833        let _editor_a1 = workspace_a
5834            .update(cx_a, |workspace, cx| {
5835                workspace.open_path((worktree_id, "1.txt"), true, cx)
5836            })
5837            .await
5838            .unwrap()
5839            .downcast::<Editor>()
5840            .unwrap();
5841
5842        // Client B starts following client A.
5843        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5844        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5845        let leader_id = project_b.read_with(cx_b, |project, _| {
5846            project.collaborators().values().next().unwrap().peer_id
5847        });
5848        workspace_b
5849            .update(cx_b, |workspace, cx| {
5850                workspace
5851                    .toggle_follow(&ToggleFollow(leader_id), cx)
5852                    .unwrap()
5853            })
5854            .await
5855            .unwrap();
5856        assert_eq!(
5857            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5858            Some(leader_id)
5859        );
5860        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5861            workspace
5862                .active_item(cx)
5863                .unwrap()
5864                .downcast::<Editor>()
5865                .unwrap()
5866        });
5867
5868        // When client B moves, it automatically stops following client A.
5869        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5870        assert_eq!(
5871            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5872            None
5873        );
5874
5875        workspace_b
5876            .update(cx_b, |workspace, cx| {
5877                workspace
5878                    .toggle_follow(&ToggleFollow(leader_id), cx)
5879                    .unwrap()
5880            })
5881            .await
5882            .unwrap();
5883        assert_eq!(
5884            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5885            Some(leader_id)
5886        );
5887
5888        // When client B edits, it automatically stops following client A.
5889        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5890        assert_eq!(
5891            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5892            None
5893        );
5894
5895        workspace_b
5896            .update(cx_b, |workspace, cx| {
5897                workspace
5898                    .toggle_follow(&ToggleFollow(leader_id), cx)
5899                    .unwrap()
5900            })
5901            .await
5902            .unwrap();
5903        assert_eq!(
5904            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5905            Some(leader_id)
5906        );
5907
5908        // When client B scrolls, it automatically stops following client A.
5909        editor_b2.update(cx_b, |editor, cx| {
5910            editor.set_scroll_position(vec2f(0., 3.), cx)
5911        });
5912        assert_eq!(
5913            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5914            None
5915        );
5916
5917        workspace_b
5918            .update(cx_b, |workspace, cx| {
5919                workspace
5920                    .toggle_follow(&ToggleFollow(leader_id), cx)
5921                    .unwrap()
5922            })
5923            .await
5924            .unwrap();
5925        assert_eq!(
5926            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5927            Some(leader_id)
5928        );
5929
5930        // When client B activates a different pane, it continues following client A in the original pane.
5931        workspace_b.update(cx_b, |workspace, cx| {
5932            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5933        });
5934        assert_eq!(
5935            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5936            Some(leader_id)
5937        );
5938
5939        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5940        assert_eq!(
5941            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5942            Some(leader_id)
5943        );
5944
5945        // When client B activates a different item in the original pane, it automatically stops following client A.
5946        workspace_b
5947            .update(cx_b, |workspace, cx| {
5948                workspace.open_path((worktree_id, "2.txt"), true, cx)
5949            })
5950            .await
5951            .unwrap();
5952        assert_eq!(
5953            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5954            None
5955        );
5956    }
5957
5958    #[gpui::test(iterations = 100)]
5959    async fn test_random_collaboration(
5960        cx: &mut TestAppContext,
5961        deterministic: Arc<Deterministic>,
5962        rng: StdRng,
5963    ) {
5964        cx.foreground().forbid_parking();
5965        let max_peers = env::var("MAX_PEERS")
5966            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5967            .unwrap_or(5);
5968        assert!(max_peers <= 5);
5969
5970        let max_operations = env::var("OPERATIONS")
5971            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5972            .unwrap_or(10);
5973
5974        let rng = Arc::new(Mutex::new(rng));
5975
5976        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5977        let host_language_registry = Arc::new(LanguageRegistry::test());
5978
5979        let fs = FakeFs::new(cx.background());
5980        fs.insert_tree("/_collab", json!({"init": ""})).await;
5981
5982        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5983        let db = server.app_state.db.clone();
5984        let host_user_id = db.create_user("host", false).await.unwrap();
5985        for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
5986            let guest_user_id = db.create_user(username, false).await.unwrap();
5987            server
5988                .app_state
5989                .db
5990                .send_contact_request(guest_user_id, host_user_id)
5991                .await
5992                .unwrap();
5993            server
5994                .app_state
5995                .db
5996                .respond_to_contact_request(host_user_id, guest_user_id, true)
5997                .await
5998                .unwrap();
5999        }
6000
6001        let mut clients = Vec::new();
6002        let mut user_ids = Vec::new();
6003        let mut op_start_signals = Vec::new();
6004
6005        let mut next_entity_id = 100000;
6006        let mut host_cx = TestAppContext::new(
6007            cx.foreground_platform(),
6008            cx.platform(),
6009            deterministic.build_foreground(next_entity_id),
6010            deterministic.build_background(),
6011            cx.font_cache(),
6012            cx.leak_detector(),
6013            next_entity_id,
6014        );
6015        let host = server.create_client(&mut host_cx, "host").await;
6016        let host_project = host_cx.update(|cx| {
6017            Project::local(
6018                host.client.clone(),
6019                host.user_store.clone(),
6020                host_language_registry.clone(),
6021                fs.clone(),
6022                cx,
6023            )
6024        });
6025        let host_project_id = host_project
6026            .update(&mut host_cx, |p, _| p.next_remote_id())
6027            .await;
6028
6029        let (collab_worktree, _) = host_project
6030            .update(&mut host_cx, |project, cx| {
6031                project.find_or_create_local_worktree("/_collab", true, cx)
6032            })
6033            .await
6034            .unwrap();
6035        collab_worktree
6036            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
6037            .await;
6038        host_project
6039            .update(&mut host_cx, |project, cx| project.share(cx))
6040            .await
6041            .unwrap();
6042
6043        // Set up fake language servers.
6044        let mut language = Language::new(
6045            LanguageConfig {
6046                name: "Rust".into(),
6047                path_suffixes: vec!["rs".to_string()],
6048                ..Default::default()
6049            },
6050            None,
6051        );
6052        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6053            name: "the-fake-language-server",
6054            capabilities: lsp::LanguageServer::full_capabilities(),
6055            initializer: Some(Box::new({
6056                let rng = rng.clone();
6057                let fs = fs.clone();
6058                let project = host_project.downgrade();
6059                move |fake_server: &mut FakeLanguageServer| {
6060                    fake_server.handle_request::<lsp::request::Completion, _, _>(
6061                        |_, _| async move {
6062                            Ok(Some(lsp::CompletionResponse::Array(vec![
6063                                lsp::CompletionItem {
6064                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6065                                        range: lsp::Range::new(
6066                                            lsp::Position::new(0, 0),
6067                                            lsp::Position::new(0, 0),
6068                                        ),
6069                                        new_text: "the-new-text".to_string(),
6070                                    })),
6071                                    ..Default::default()
6072                                },
6073                            ])))
6074                        },
6075                    );
6076
6077                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6078                        |_, _| async move {
6079                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6080                                lsp::CodeAction {
6081                                    title: "the-code-action".to_string(),
6082                                    ..Default::default()
6083                                },
6084                            )]))
6085                        },
6086                    );
6087
6088                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6089                        |params, _| async move {
6090                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6091                                params.position,
6092                                params.position,
6093                            ))))
6094                        },
6095                    );
6096
6097                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6098                        let fs = fs.clone();
6099                        let rng = rng.clone();
6100                        move |_, _| {
6101                            let fs = fs.clone();
6102                            let rng = rng.clone();
6103                            async move {
6104                                let files = fs.files().await;
6105                                let mut rng = rng.lock();
6106                                let count = rng.gen_range::<usize, _>(1..3);
6107                                let files = (0..count)
6108                                    .map(|_| files.choose(&mut *rng).unwrap())
6109                                    .collect::<Vec<_>>();
6110                                log::info!("LSP: Returning definitions in files {:?}", &files);
6111                                Ok(Some(lsp::GotoDefinitionResponse::Array(
6112                                    files
6113                                        .into_iter()
6114                                        .map(|file| lsp::Location {
6115                                            uri: lsp::Url::from_file_path(file).unwrap(),
6116                                            range: Default::default(),
6117                                        })
6118                                        .collect(),
6119                                )))
6120                            }
6121                        }
6122                    });
6123
6124                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6125                        let rng = rng.clone();
6126                        let project = project.clone();
6127                        move |params, mut cx| {
6128                            let highlights = if let Some(project) = project.upgrade(&cx) {
6129                                project.update(&mut cx, |project, cx| {
6130                                    let path = params
6131                                        .text_document_position_params
6132                                        .text_document
6133                                        .uri
6134                                        .to_file_path()
6135                                        .unwrap();
6136                                    let (worktree, relative_path) =
6137                                        project.find_local_worktree(&path, cx)?;
6138                                    let project_path =
6139                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
6140                                    let buffer =
6141                                        project.get_open_buffer(&project_path, cx)?.read(cx);
6142
6143                                    let mut highlights = Vec::new();
6144                                    let highlight_count = rng.lock().gen_range(1..=5);
6145                                    let mut prev_end = 0;
6146                                    for _ in 0..highlight_count {
6147                                        let range =
6148                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
6149
6150                                        highlights.push(lsp::DocumentHighlight {
6151                                            range: range_to_lsp(range.to_point_utf16(buffer)),
6152                                            kind: Some(lsp::DocumentHighlightKind::READ),
6153                                        });
6154                                        prev_end = range.end;
6155                                    }
6156                                    Some(highlights)
6157                                })
6158                            } else {
6159                                None
6160                            };
6161                            async move { Ok(highlights) }
6162                        }
6163                    });
6164                }
6165            })),
6166            ..Default::default()
6167        });
6168        host_language_registry.add(Arc::new(language));
6169
6170        let op_start_signal = futures::channel::mpsc::unbounded();
6171        user_ids.push(host.current_user_id(&host_cx));
6172        op_start_signals.push(op_start_signal.0);
6173        clients.push(host_cx.foreground().spawn(host.simulate_host(
6174            host_project,
6175            op_start_signal.1,
6176            rng.clone(),
6177            host_cx,
6178        )));
6179
6180        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6181            rng.lock().gen_range(0..max_operations)
6182        } else {
6183            max_operations
6184        };
6185        let mut available_guests = vec![
6186            "guest-1".to_string(),
6187            "guest-2".to_string(),
6188            "guest-3".to_string(),
6189            "guest-4".to_string(),
6190        ];
6191        let mut operations = 0;
6192        while operations < max_operations {
6193            if operations == disconnect_host_at {
6194                server.disconnect_client(user_ids[0]);
6195                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6196                drop(op_start_signals);
6197                let mut clients = futures::future::join_all(clients).await;
6198                cx.foreground().run_until_parked();
6199
6200                let (host, mut host_cx, host_err) = clients.remove(0);
6201                if let Some(host_err) = host_err {
6202                    log::error!("host error - {}", host_err);
6203                }
6204                host.project
6205                    .as_ref()
6206                    .unwrap()
6207                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6208                for (guest, mut guest_cx, guest_err) in clients {
6209                    if let Some(guest_err) = guest_err {
6210                        log::error!("{} error - {}", guest.username, guest_err);
6211                    }
6212                    // TODO
6213                    // let contacts = server
6214                    //     .store
6215                    //     .read()
6216                    //     .await
6217                    //     .contacts_for_user(guest.current_user_id(&guest_cx));
6218                    // assert!(!contacts
6219                    //     .iter()
6220                    //     .flat_map(|contact| &contact.projects)
6221                    //     .any(|project| project.id == host_project_id));
6222                    guest
6223                        .project
6224                        .as_ref()
6225                        .unwrap()
6226                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6227                    guest_cx.update(|_| drop(guest));
6228                }
6229                host_cx.update(|_| drop(host));
6230
6231                return;
6232            }
6233
6234            let distribution = rng.lock().gen_range(0..100);
6235            match distribution {
6236                0..=19 if !available_guests.is_empty() => {
6237                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
6238                    let guest_username = available_guests.remove(guest_ix);
6239                    log::info!("Adding new connection for {}", guest_username);
6240                    next_entity_id += 100000;
6241                    let mut guest_cx = TestAppContext::new(
6242                        cx.foreground_platform(),
6243                        cx.platform(),
6244                        deterministic.build_foreground(next_entity_id),
6245                        deterministic.build_background(),
6246                        cx.font_cache(),
6247                        cx.leak_detector(),
6248                        next_entity_id,
6249                    );
6250                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
6251                    let guest_project = Project::remote(
6252                        host_project_id,
6253                        guest.client.clone(),
6254                        guest.user_store.clone(),
6255                        guest_lang_registry.clone(),
6256                        FakeFs::new(cx.background()),
6257                        &mut guest_cx.to_async(),
6258                    )
6259                    .await
6260                    .unwrap();
6261                    let op_start_signal = futures::channel::mpsc::unbounded();
6262                    user_ids.push(guest.current_user_id(&guest_cx));
6263                    op_start_signals.push(op_start_signal.0);
6264                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6265                        guest_username.clone(),
6266                        guest_project,
6267                        op_start_signal.1,
6268                        rng.clone(),
6269                        guest_cx,
6270                    )));
6271
6272                    log::info!("Added connection for {}", guest_username);
6273                    operations += 1;
6274                }
6275                20..=29 if clients.len() > 1 => {
6276                    let guest_ix = rng.lock().gen_range(1..clients.len());
6277                    log::info!("Removing guest {}", user_ids[guest_ix]);
6278                    let removed_guest_id = user_ids.remove(guest_ix);
6279                    let guest = clients.remove(guest_ix);
6280                    op_start_signals.remove(guest_ix);
6281                    server.disconnect_client(removed_guest_id);
6282                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6283                    let (guest, mut guest_cx, guest_err) = guest.await;
6284                    if let Some(guest_err) = guest_err {
6285                        log::error!("{} error - {}", guest.username, guest_err);
6286                    }
6287                    guest
6288                        .project
6289                        .as_ref()
6290                        .unwrap()
6291                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6292                    // TODO
6293                    // for user_id in &user_ids {
6294                    //     for contact in server.store.read().await.contacts_for_user(*user_id) {
6295                    //         assert_ne!(
6296                    //             contact.user_id, removed_guest_id.0 as u64,
6297                    //             "removed guest is still a contact of another peer"
6298                    //         );
6299                    //         for project in contact.projects {
6300                    //             for project_guest_id in project.guests {
6301                    //                 assert_ne!(
6302                    //                     project_guest_id, removed_guest_id.0 as u64,
6303                    //                     "removed guest appears as still participating on a project"
6304                    //                 );
6305                    //             }
6306                    //         }
6307                    //     }
6308                    // }
6309
6310                    log::info!("{} removed", guest.username);
6311                    available_guests.push(guest.username.clone());
6312                    guest_cx.update(|_| drop(guest));
6313
6314                    operations += 1;
6315                }
6316                _ => {
6317                    while operations < max_operations && rng.lock().gen_bool(0.7) {
6318                        op_start_signals
6319                            .choose(&mut *rng.lock())
6320                            .unwrap()
6321                            .unbounded_send(())
6322                            .unwrap();
6323                        operations += 1;
6324                    }
6325
6326                    if rng.lock().gen_bool(0.8) {
6327                        cx.foreground().run_until_parked();
6328                    }
6329                }
6330            }
6331        }
6332
6333        drop(op_start_signals);
6334        let mut clients = futures::future::join_all(clients).await;
6335        cx.foreground().run_until_parked();
6336
6337        let (host_client, mut host_cx, host_err) = clients.remove(0);
6338        if let Some(host_err) = host_err {
6339            panic!("host error - {}", host_err);
6340        }
6341        let host_project = host_client.project.as_ref().unwrap();
6342        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6343            project
6344                .worktrees(cx)
6345                .map(|worktree| {
6346                    let snapshot = worktree.read(cx).snapshot();
6347                    (snapshot.id(), snapshot)
6348                })
6349                .collect::<BTreeMap<_, _>>()
6350        });
6351
6352        host_client
6353            .project
6354            .as_ref()
6355            .unwrap()
6356            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6357
6358        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6359            if let Some(guest_err) = guest_err {
6360                panic!("{} error - {}", guest_client.username, guest_err);
6361            }
6362            let worktree_snapshots =
6363                guest_client
6364                    .project
6365                    .as_ref()
6366                    .unwrap()
6367                    .read_with(&guest_cx, |project, cx| {
6368                        project
6369                            .worktrees(cx)
6370                            .map(|worktree| {
6371                                let worktree = worktree.read(cx);
6372                                (worktree.id(), worktree.snapshot())
6373                            })
6374                            .collect::<BTreeMap<_, _>>()
6375                    });
6376
6377            assert_eq!(
6378                worktree_snapshots.keys().collect::<Vec<_>>(),
6379                host_worktree_snapshots.keys().collect::<Vec<_>>(),
6380                "{} has different worktrees than the host",
6381                guest_client.username
6382            );
6383            for (id, host_snapshot) in &host_worktree_snapshots {
6384                let guest_snapshot = &worktree_snapshots[id];
6385                assert_eq!(
6386                    guest_snapshot.root_name(),
6387                    host_snapshot.root_name(),
6388                    "{} has different root name than the host for worktree {}",
6389                    guest_client.username,
6390                    id
6391                );
6392                assert_eq!(
6393                    guest_snapshot.entries(false).collect::<Vec<_>>(),
6394                    host_snapshot.entries(false).collect::<Vec<_>>(),
6395                    "{} has different snapshot than the host for worktree {}",
6396                    guest_client.username,
6397                    id
6398                );
6399                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6400            }
6401
6402            guest_client
6403                .project
6404                .as_ref()
6405                .unwrap()
6406                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6407
6408            for guest_buffer in &guest_client.buffers {
6409                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6410                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6411                    project.buffer_for_id(buffer_id, cx).expect(&format!(
6412                        "host does not have buffer for guest:{}, peer:{}, id:{}",
6413                        guest_client.username, guest_client.peer_id, buffer_id
6414                    ))
6415                });
6416                let path = host_buffer
6417                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6418
6419                assert_eq!(
6420                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6421                    0,
6422                    "{}, buffer {}, path {:?} has deferred operations",
6423                    guest_client.username,
6424                    buffer_id,
6425                    path,
6426                );
6427                assert_eq!(
6428                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6429                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6430                    "{}, buffer {}, path {:?}, differs from the host's buffer",
6431                    guest_client.username,
6432                    buffer_id,
6433                    path
6434                );
6435            }
6436
6437            guest_cx.update(|_| drop(guest_client));
6438        }
6439
6440        host_cx.update(|_| drop(host_client));
6441    }
6442
6443    struct TestServer {
6444        peer: Arc<Peer>,
6445        app_state: Arc<AppState>,
6446        server: Arc<Server>,
6447        foreground: Rc<executor::Foreground>,
6448        notifications: mpsc::UnboundedReceiver<()>,
6449        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6450        forbid_connections: Arc<AtomicBool>,
6451        _test_db: TestDb,
6452    }
6453
6454    impl TestServer {
6455        async fn start(
6456            foreground: Rc<executor::Foreground>,
6457            background: Arc<executor::Background>,
6458        ) -> Self {
6459            let test_db = TestDb::fake(background);
6460            let app_state = Self::build_app_state(&test_db).await;
6461            let peer = Peer::new();
6462            let notifications = mpsc::unbounded();
6463            let server = Server::new(app_state.clone(), Some(notifications.0));
6464            Self {
6465                peer,
6466                app_state,
6467                server,
6468                foreground,
6469                notifications: notifications.1,
6470                connection_killers: Default::default(),
6471                forbid_connections: Default::default(),
6472                _test_db: test_db,
6473            }
6474        }
6475
6476        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6477            cx.update(|cx| {
6478                let settings = Settings::test(cx);
6479                cx.set_global(settings);
6480            });
6481
6482            let http = FakeHttpClient::with_404_response();
6483            let user_id =
6484                if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6485                    user.id
6486                } else {
6487                    self.app_state.db.create_user(name, false).await.unwrap()
6488                };
6489            let client_name = name.to_string();
6490            let mut client = Client::new(http.clone());
6491            let server = self.server.clone();
6492            let connection_killers = self.connection_killers.clone();
6493            let forbid_connections = self.forbid_connections.clone();
6494            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6495
6496            Arc::get_mut(&mut client)
6497                .unwrap()
6498                .override_authenticate(move |cx| {
6499                    cx.spawn(|_| async move {
6500                        let access_token = "the-token".to_string();
6501                        Ok(Credentials {
6502                            user_id: user_id.0 as u64,
6503                            access_token,
6504                        })
6505                    })
6506                })
6507                .override_establish_connection(move |credentials, cx| {
6508                    assert_eq!(credentials.user_id, user_id.0 as u64);
6509                    assert_eq!(credentials.access_token, "the-token");
6510
6511                    let server = server.clone();
6512                    let connection_killers = connection_killers.clone();
6513                    let forbid_connections = forbid_connections.clone();
6514                    let client_name = client_name.clone();
6515                    let connection_id_tx = connection_id_tx.clone();
6516                    cx.spawn(move |cx| async move {
6517                        if forbid_connections.load(SeqCst) {
6518                            Err(EstablishConnectionError::other(anyhow!(
6519                                "server is forbidding connections"
6520                            )))
6521                        } else {
6522                            let (client_conn, server_conn, killed) =
6523                                Connection::in_memory(cx.background());
6524                            connection_killers.lock().insert(user_id, killed);
6525                            cx.background()
6526                                .spawn(server.handle_connection(
6527                                    server_conn,
6528                                    client_name,
6529                                    user_id,
6530                                    Some(connection_id_tx),
6531                                    cx.background(),
6532                                ))
6533                                .detach();
6534                            Ok(client_conn)
6535                        }
6536                    })
6537                });
6538
6539            client
6540                .authenticate_and_connect(false, &cx.to_async())
6541                .await
6542                .unwrap();
6543
6544            Channel::init(&client);
6545            Project::init(&client);
6546            cx.update(|cx| {
6547                workspace::init(&client, cx);
6548            });
6549
6550            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6551            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6552
6553            let client = TestClient {
6554                client,
6555                peer_id,
6556                username: name.to_string(),
6557                user_store,
6558                language_registry: Arc::new(LanguageRegistry::test()),
6559                project: Default::default(),
6560                buffers: Default::default(),
6561            };
6562            client.wait_for_current_user(cx).await;
6563            client
6564        }
6565
6566        fn disconnect_client(&self, user_id: UserId) {
6567            self.connection_killers
6568                .lock()
6569                .remove(&user_id)
6570                .unwrap()
6571                .store(true, SeqCst);
6572        }
6573
6574        fn forbid_connections(&self) {
6575            self.forbid_connections.store(true, SeqCst);
6576        }
6577
6578        fn allow_connections(&self) {
6579            self.forbid_connections.store(false, SeqCst);
6580        }
6581
6582        async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6583            while let Some((client_a, cx_a)) = clients.pop() {
6584                for (client_b, cx_b) in &mut clients {
6585                    client_a
6586                        .user_store
6587                        .update(cx_a, |store, cx| {
6588                            store.request_contact(client_b.user_id().unwrap(), cx)
6589                        })
6590                        .await
6591                        .unwrap();
6592                    cx_a.foreground().run_until_parked();
6593                    client_b
6594                        .user_store
6595                        .update(*cx_b, |store, cx| {
6596                            store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6597                        })
6598                        .await
6599                        .unwrap();
6600                }
6601            }
6602        }
6603
6604        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6605            Arc::new(AppState {
6606                db: test_db.db().clone(),
6607                api_token: Default::default(),
6608            })
6609        }
6610
6611        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6612            self.server.store.read().await
6613        }
6614
6615        async fn condition<F>(&mut self, mut predicate: F)
6616        where
6617            F: FnMut(&Store) -> bool,
6618        {
6619            assert!(
6620                self.foreground.parking_forbidden(),
6621                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6622            );
6623            while !(predicate)(&*self.server.store.read().await) {
6624                self.foreground.start_waiting();
6625                self.notifications.next().await;
6626                self.foreground.finish_waiting();
6627            }
6628        }
6629    }
6630
6631    impl Deref for TestServer {
6632        type Target = Server;
6633
6634        fn deref(&self) -> &Self::Target {
6635            &self.server
6636        }
6637    }
6638
6639    impl Drop for TestServer {
6640        fn drop(&mut self) {
6641            self.peer.reset();
6642        }
6643    }
6644
6645    struct TestClient {
6646        client: Arc<Client>,
6647        username: String,
6648        pub peer_id: PeerId,
6649        pub user_store: ModelHandle<UserStore>,
6650        language_registry: Arc<LanguageRegistry>,
6651        project: Option<ModelHandle<Project>>,
6652        buffers: HashSet<ModelHandle<language::Buffer>>,
6653    }
6654
6655    impl Deref for TestClient {
6656        type Target = Arc<Client>;
6657
6658        fn deref(&self) -> &Self::Target {
6659            &self.client
6660        }
6661    }
6662
6663    struct ContactsSummary {
6664        pub current: Vec<String>,
6665        pub outgoing_requests: Vec<String>,
6666        pub incoming_requests: Vec<String>,
6667    }
6668
6669    impl TestClient {
6670        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6671            UserId::from_proto(
6672                self.user_store
6673                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6674            )
6675        }
6676
6677        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6678            let mut authed_user = self
6679                .user_store
6680                .read_with(cx, |user_store, _| user_store.watch_current_user());
6681            while authed_user.next().await.unwrap().is_none() {}
6682        }
6683
6684        async fn clear_contacts(&self, cx: &mut TestAppContext) {
6685            self.user_store
6686                .update(cx, |store, _| store.clear_contacts())
6687                .await;
6688        }
6689
6690        fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6691            self.user_store.read_with(cx, |store, _| ContactsSummary {
6692                current: store
6693                    .contacts()
6694                    .iter()
6695                    .map(|contact| contact.user.github_login.clone())
6696                    .collect(),
6697                outgoing_requests: store
6698                    .outgoing_contact_requests()
6699                    .iter()
6700                    .map(|user| user.github_login.clone())
6701                    .collect(),
6702                incoming_requests: store
6703                    .incoming_contact_requests()
6704                    .iter()
6705                    .map(|user| user.github_login.clone())
6706                    .collect(),
6707            })
6708        }
6709
6710        async fn build_local_project(
6711            &mut self,
6712            fs: Arc<FakeFs>,
6713            root_path: impl AsRef<Path>,
6714            cx: &mut TestAppContext,
6715        ) -> (ModelHandle<Project>, WorktreeId) {
6716            let project = cx.update(|cx| {
6717                Project::local(
6718                    self.client.clone(),
6719                    self.user_store.clone(),
6720                    self.language_registry.clone(),
6721                    fs,
6722                    cx,
6723                )
6724            });
6725            self.project = Some(project.clone());
6726            let (worktree, _) = project
6727                .update(cx, |p, cx| {
6728                    p.find_or_create_local_worktree(root_path, true, cx)
6729                })
6730                .await
6731                .unwrap();
6732            worktree
6733                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6734                .await;
6735            project
6736                .update(cx, |project, _| project.next_remote_id())
6737                .await;
6738            (project, worktree.read_with(cx, |tree, _| tree.id()))
6739        }
6740
6741        async fn build_remote_project(
6742            &mut self,
6743            project_id: u64,
6744            cx: &mut TestAppContext,
6745        ) -> ModelHandle<Project> {
6746            let project = Project::remote(
6747                project_id,
6748                self.client.clone(),
6749                self.user_store.clone(),
6750                self.language_registry.clone(),
6751                FakeFs::new(cx.background()),
6752                &mut cx.to_async(),
6753            )
6754            .await
6755            .unwrap();
6756            self.project = Some(project.clone());
6757            project
6758        }
6759
6760        fn build_workspace(
6761            &self,
6762            project: &ModelHandle<Project>,
6763            cx: &mut TestAppContext,
6764        ) -> ViewHandle<Workspace> {
6765            let (window_id, _) = cx.add_window(|_| EmptyView);
6766            cx.add_view(window_id, |cx| {
6767                let fs = project.read(cx).fs().clone();
6768                Workspace::new(
6769                    &WorkspaceParams {
6770                        fs,
6771                        project: project.clone(),
6772                        user_store: self.user_store.clone(),
6773                        languages: self.language_registry.clone(),
6774                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6775                        channel_list: cx.add_model(|cx| {
6776                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6777                        }),
6778                        client: self.client.clone(),
6779                    },
6780                    cx,
6781                )
6782            })
6783        }
6784
6785        async fn simulate_host(
6786            mut self,
6787            project: ModelHandle<Project>,
6788            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6789            rng: Arc<Mutex<StdRng>>,
6790            mut cx: TestAppContext,
6791        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6792            async fn simulate_host_internal(
6793                client: &mut TestClient,
6794                project: ModelHandle<Project>,
6795                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6796                rng: Arc<Mutex<StdRng>>,
6797                cx: &mut TestAppContext,
6798            ) -> anyhow::Result<()> {
6799                let fs = project.read_with(cx, |project, _| project.fs().clone());
6800
6801                while op_start_signal.next().await.is_some() {
6802                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6803                    let files = fs.as_fake().files().await;
6804                    match distribution {
6805                        0..=20 if !files.is_empty() => {
6806                            let path = files.choose(&mut *rng.lock()).unwrap();
6807                            let mut path = path.as_path();
6808                            while let Some(parent_path) = path.parent() {
6809                                path = parent_path;
6810                                if rng.lock().gen() {
6811                                    break;
6812                                }
6813                            }
6814
6815                            log::info!("Host: find/create local worktree {:?}", path);
6816                            let find_or_create_worktree = project.update(cx, |project, cx| {
6817                                project.find_or_create_local_worktree(path, true, cx)
6818                            });
6819                            if rng.lock().gen() {
6820                                cx.background().spawn(find_or_create_worktree).detach();
6821                            } else {
6822                                find_or_create_worktree.await?;
6823                            }
6824                        }
6825                        10..=80 if !files.is_empty() => {
6826                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6827                                let file = files.choose(&mut *rng.lock()).unwrap();
6828                                let (worktree, path) = project
6829                                    .update(cx, |project, cx| {
6830                                        project.find_or_create_local_worktree(
6831                                            file.clone(),
6832                                            true,
6833                                            cx,
6834                                        )
6835                                    })
6836                                    .await?;
6837                                let project_path =
6838                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6839                                log::info!(
6840                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6841                                    file,
6842                                    project_path.0,
6843                                    project_path.1
6844                                );
6845                                let buffer = project
6846                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6847                                    .await
6848                                    .unwrap();
6849                                client.buffers.insert(buffer.clone());
6850                                buffer
6851                            } else {
6852                                client
6853                                    .buffers
6854                                    .iter()
6855                                    .choose(&mut *rng.lock())
6856                                    .unwrap()
6857                                    .clone()
6858                            };
6859
6860                            if rng.lock().gen_bool(0.1) {
6861                                cx.update(|cx| {
6862                                    log::info!(
6863                                        "Host: dropping buffer {:?}",
6864                                        buffer.read(cx).file().unwrap().full_path(cx)
6865                                    );
6866                                    client.buffers.remove(&buffer);
6867                                    drop(buffer);
6868                                });
6869                            } else {
6870                                buffer.update(cx, |buffer, cx| {
6871                                    log::info!(
6872                                        "Host: updating buffer {:?} ({})",
6873                                        buffer.file().unwrap().full_path(cx),
6874                                        buffer.remote_id()
6875                                    );
6876
6877                                    if rng.lock().gen_bool(0.7) {
6878                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6879                                    } else {
6880                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6881                                    }
6882                                });
6883                            }
6884                        }
6885                        _ => loop {
6886                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6887                            let mut path = PathBuf::new();
6888                            path.push("/");
6889                            for _ in 0..path_component_count {
6890                                let letter = rng.lock().gen_range(b'a'..=b'z');
6891                                path.push(std::str::from_utf8(&[letter]).unwrap());
6892                            }
6893                            path.set_extension("rs");
6894                            let parent_path = path.parent().unwrap();
6895
6896                            log::info!("Host: creating file {:?}", path,);
6897
6898                            if fs.create_dir(&parent_path).await.is_ok()
6899                                && fs.create_file(&path, Default::default()).await.is_ok()
6900                            {
6901                                break;
6902                            } else {
6903                                log::info!("Host: cannot create file");
6904                            }
6905                        },
6906                    }
6907
6908                    cx.background().simulate_random_delay().await;
6909                }
6910
6911                Ok(())
6912            }
6913
6914            let result =
6915                simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
6916                    .await;
6917            log::info!("Host done");
6918            self.project = Some(project);
6919            (self, cx, result.err())
6920        }
6921
6922        pub async fn simulate_guest(
6923            mut self,
6924            guest_username: String,
6925            project: ModelHandle<Project>,
6926            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6927            rng: Arc<Mutex<StdRng>>,
6928            mut cx: TestAppContext,
6929        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6930            async fn simulate_guest_internal(
6931                client: &mut TestClient,
6932                guest_username: &str,
6933                project: ModelHandle<Project>,
6934                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6935                rng: Arc<Mutex<StdRng>>,
6936                cx: &mut TestAppContext,
6937            ) -> anyhow::Result<()> {
6938                while op_start_signal.next().await.is_some() {
6939                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6940                        let worktree = if let Some(worktree) =
6941                            project.read_with(cx, |project, cx| {
6942                                project
6943                                    .worktrees(&cx)
6944                                    .filter(|worktree| {
6945                                        let worktree = worktree.read(cx);
6946                                        worktree.is_visible()
6947                                            && worktree.entries(false).any(|e| e.is_file())
6948                                    })
6949                                    .choose(&mut *rng.lock())
6950                            }) {
6951                            worktree
6952                        } else {
6953                            cx.background().simulate_random_delay().await;
6954                            continue;
6955                        };
6956
6957                        let (worktree_root_name, project_path) =
6958                            worktree.read_with(cx, |worktree, _| {
6959                                let entry = worktree
6960                                    .entries(false)
6961                                    .filter(|e| e.is_file())
6962                                    .choose(&mut *rng.lock())
6963                                    .unwrap();
6964                                (
6965                                    worktree.root_name().to_string(),
6966                                    (worktree.id(), entry.path.clone()),
6967                                )
6968                            });
6969                        log::info!(
6970                            "{}: opening path {:?} in worktree {} ({})",
6971                            guest_username,
6972                            project_path.1,
6973                            project_path.0,
6974                            worktree_root_name,
6975                        );
6976                        let buffer = project
6977                            .update(cx, |project, cx| {
6978                                project.open_buffer(project_path.clone(), cx)
6979                            })
6980                            .await?;
6981                        log::info!(
6982                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6983                            guest_username,
6984                            project_path.1,
6985                            project_path.0,
6986                            worktree_root_name,
6987                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6988                        );
6989                        client.buffers.insert(buffer.clone());
6990                        buffer
6991                    } else {
6992                        client
6993                            .buffers
6994                            .iter()
6995                            .choose(&mut *rng.lock())
6996                            .unwrap()
6997                            .clone()
6998                    };
6999
7000                    let choice = rng.lock().gen_range(0..100);
7001                    match choice {
7002                        0..=9 => {
7003                            cx.update(|cx| {
7004                                log::info!(
7005                                    "{}: dropping buffer {:?}",
7006                                    guest_username,
7007                                    buffer.read(cx).file().unwrap().full_path(cx)
7008                                );
7009                                client.buffers.remove(&buffer);
7010                                drop(buffer);
7011                            });
7012                        }
7013                        10..=19 => {
7014                            let completions = project.update(cx, |project, cx| {
7015                                log::info!(
7016                                    "{}: requesting completions for buffer {} ({:?})",
7017                                    guest_username,
7018                                    buffer.read(cx).remote_id(),
7019                                    buffer.read(cx).file().unwrap().full_path(cx)
7020                                );
7021                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7022                                project.completions(&buffer, offset, cx)
7023                            });
7024                            let completions = cx.background().spawn(async move {
7025                                completions
7026                                    .await
7027                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
7028                            });
7029                            if rng.lock().gen_bool(0.3) {
7030                                log::info!("{}: detaching completions request", guest_username);
7031                                cx.update(|cx| completions.detach_and_log_err(cx));
7032                            } else {
7033                                completions.await?;
7034                            }
7035                        }
7036                        20..=29 => {
7037                            let code_actions = project.update(cx, |project, cx| {
7038                                log::info!(
7039                                    "{}: requesting code actions for buffer {} ({:?})",
7040                                    guest_username,
7041                                    buffer.read(cx).remote_id(),
7042                                    buffer.read(cx).file().unwrap().full_path(cx)
7043                                );
7044                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7045                                project.code_actions(&buffer, range, cx)
7046                            });
7047                            let code_actions = cx.background().spawn(async move {
7048                                code_actions.await.map_err(|err| {
7049                                    anyhow!("code actions request failed: {:?}", err)
7050                                })
7051                            });
7052                            if rng.lock().gen_bool(0.3) {
7053                                log::info!("{}: detaching code actions request", guest_username);
7054                                cx.update(|cx| code_actions.detach_and_log_err(cx));
7055                            } else {
7056                                code_actions.await?;
7057                            }
7058                        }
7059                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7060                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7061                                log::info!(
7062                                    "{}: saving buffer {} ({:?})",
7063                                    guest_username,
7064                                    buffer.remote_id(),
7065                                    buffer.file().unwrap().full_path(cx)
7066                                );
7067                                (buffer.version(), buffer.save(cx))
7068                            });
7069                            let save = cx.background().spawn(async move {
7070                                let (saved_version, _) = save
7071                                    .await
7072                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7073                                assert!(saved_version.observed_all(&requested_version));
7074                                Ok::<_, anyhow::Error>(())
7075                            });
7076                            if rng.lock().gen_bool(0.3) {
7077                                log::info!("{}: detaching save request", guest_username);
7078                                cx.update(|cx| save.detach_and_log_err(cx));
7079                            } else {
7080                                save.await?;
7081                            }
7082                        }
7083                        40..=44 => {
7084                            let prepare_rename = project.update(cx, |project, cx| {
7085                                log::info!(
7086                                    "{}: preparing rename for buffer {} ({:?})",
7087                                    guest_username,
7088                                    buffer.read(cx).remote_id(),
7089                                    buffer.read(cx).file().unwrap().full_path(cx)
7090                                );
7091                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7092                                project.prepare_rename(buffer, offset, cx)
7093                            });
7094                            let prepare_rename = cx.background().spawn(async move {
7095                                prepare_rename.await.map_err(|err| {
7096                                    anyhow!("prepare rename request failed: {:?}", err)
7097                                })
7098                            });
7099                            if rng.lock().gen_bool(0.3) {
7100                                log::info!("{}: detaching prepare rename request", guest_username);
7101                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7102                            } else {
7103                                prepare_rename.await?;
7104                            }
7105                        }
7106                        45..=49 => {
7107                            let definitions = project.update(cx, |project, cx| {
7108                                log::info!(
7109                                    "{}: requesting definitions for buffer {} ({:?})",
7110                                    guest_username,
7111                                    buffer.read(cx).remote_id(),
7112                                    buffer.read(cx).file().unwrap().full_path(cx)
7113                                );
7114                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7115                                project.definition(&buffer, offset, cx)
7116                            });
7117                            let definitions = cx.background().spawn(async move {
7118                                definitions
7119                                    .await
7120                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7121                            });
7122                            if rng.lock().gen_bool(0.3) {
7123                                log::info!("{}: detaching definitions request", guest_username);
7124                                cx.update(|cx| definitions.detach_and_log_err(cx));
7125                            } else {
7126                                client
7127                                    .buffers
7128                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7129                            }
7130                        }
7131                        50..=54 => {
7132                            let highlights = project.update(cx, |project, cx| {
7133                                log::info!(
7134                                    "{}: requesting highlights for buffer {} ({:?})",
7135                                    guest_username,
7136                                    buffer.read(cx).remote_id(),
7137                                    buffer.read(cx).file().unwrap().full_path(cx)
7138                                );
7139                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7140                                project.document_highlights(&buffer, offset, cx)
7141                            });
7142                            let highlights = cx.background().spawn(async move {
7143                                highlights
7144                                    .await
7145                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7146                            });
7147                            if rng.lock().gen_bool(0.3) {
7148                                log::info!("{}: detaching highlights request", guest_username);
7149                                cx.update(|cx| highlights.detach_and_log_err(cx));
7150                            } else {
7151                                highlights.await?;
7152                            }
7153                        }
7154                        55..=59 => {
7155                            let search = project.update(cx, |project, cx| {
7156                                let query = rng.lock().gen_range('a'..='z');
7157                                log::info!("{}: project-wide search {:?}", guest_username, query);
7158                                project.search(SearchQuery::text(query, false, false), cx)
7159                            });
7160                            let search = cx.background().spawn(async move {
7161                                search
7162                                    .await
7163                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
7164                            });
7165                            if rng.lock().gen_bool(0.3) {
7166                                log::info!("{}: detaching search request", guest_username);
7167                                cx.update(|cx| search.detach_and_log_err(cx));
7168                            } else {
7169                                client.buffers.extend(search.await?.into_keys());
7170                            }
7171                        }
7172                        60..=69 => {
7173                            let worktree = project
7174                                .read_with(cx, |project, cx| {
7175                                    project
7176                                        .worktrees(&cx)
7177                                        .filter(|worktree| {
7178                                            let worktree = worktree.read(cx);
7179                                            worktree.is_visible()
7180                                                && worktree.entries(false).any(|e| e.is_file())
7181                                                && worktree
7182                                                    .root_entry()
7183                                                    .map_or(false, |e| e.is_dir())
7184                                        })
7185                                        .choose(&mut *rng.lock())
7186                                })
7187                                .unwrap();
7188                            let (worktree_id, worktree_root_name) = worktree
7189                                .read_with(cx, |worktree, _| {
7190                                    (worktree.id(), worktree.root_name().to_string())
7191                                });
7192
7193                            let mut new_name = String::new();
7194                            for _ in 0..10 {
7195                                let letter = rng.lock().gen_range('a'..='z');
7196                                new_name.push(letter);
7197                            }
7198                            let mut new_path = PathBuf::new();
7199                            new_path.push(new_name);
7200                            new_path.set_extension("rs");
7201                            log::info!(
7202                                "{}: creating {:?} in worktree {} ({})",
7203                                guest_username,
7204                                new_path,
7205                                worktree_id,
7206                                worktree_root_name,
7207                            );
7208                            project
7209                                .update(cx, |project, cx| {
7210                                    project.create_entry((worktree_id, new_path), false, cx)
7211                                })
7212                                .unwrap()
7213                                .await?;
7214                        }
7215                        _ => {
7216                            buffer.update(cx, |buffer, cx| {
7217                                log::info!(
7218                                    "{}: updating buffer {} ({:?})",
7219                                    guest_username,
7220                                    buffer.remote_id(),
7221                                    buffer.file().unwrap().full_path(cx)
7222                                );
7223                                if rng.lock().gen_bool(0.7) {
7224                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7225                                } else {
7226                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7227                                }
7228                            });
7229                        }
7230                    }
7231                    cx.background().simulate_random_delay().await;
7232                }
7233                Ok(())
7234            }
7235
7236            let result = simulate_guest_internal(
7237                &mut self,
7238                &guest_username,
7239                project.clone(),
7240                op_start_signal,
7241                rng,
7242                &mut cx,
7243            )
7244            .await;
7245            log::info!("{}: done", guest_username);
7246
7247            self.project = Some(project);
7248            (self, cx, result.err())
7249        }
7250    }
7251
7252    impl Drop for TestClient {
7253        fn drop(&mut self) {
7254            self.client.tear_down();
7255        }
7256    }
7257
7258    impl Executor for Arc<gpui::executor::Background> {
7259        type Sleep = gpui::executor::Timer;
7260
7261        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7262            self.spawn(future).detach();
7263        }
7264
7265        fn sleep(&self, duration: Duration) -> Self::Sleep {
7266            self.as_ref().timer(duration)
7267        }
7268    }
7269
7270    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7271        channel
7272            .messages()
7273            .cursor::<()>()
7274            .map(|m| {
7275                (
7276                    m.sender.github_login.clone(),
7277                    m.body.clone(),
7278                    m.is_pending(),
7279                )
7280            })
7281            .collect()
7282    }
7283
7284    struct EmptyView;
7285
7286    impl gpui::Entity for EmptyView {
7287        type Event = ();
7288    }
7289
7290    impl gpui::View for EmptyView {
7291        fn ui_name() -> &'static str {
7292            "empty view"
7293        }
7294
7295        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7296            gpui::Element::boxed(gpui::elements::Empty)
7297        }
7298    }
7299}