rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::IntoResponse,
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::HashMap;
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::{
  40        atomic::{AtomicBool, Ordering::SeqCst},
  41        Arc,
  42    },
  43    time::Duration,
  44};
  45use store::{Store, Worktree};
  46use time::OffsetDateTime;
  47use tokio::{
  48    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  49    time::Sleep,
  50};
  51use tower::ServiceBuilder;
  52use tracing::{info_span, Instrument};
  53
  54type MessageHandler =
  55    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  56
  57struct Response<R> {
  58    server: Arc<Server>,
  59    receipt: Receipt<R>,
  60    responded: Arc<AtomicBool>,
  61}
  62
  63impl<R: RequestMessage> Response<R> {
  64    fn send(self, payload: R::Response) -> Result<()> {
  65        self.responded.store(true, SeqCst);
  66        self.server.peer.respond(self.receipt, payload)?;
  67        Ok(())
  68    }
  69}
  70
  71pub struct Server {
  72    peer: Arc<Peer>,
  73    store: RwLock<Store>,
  74    app_state: Arc<AppState>,
  75    handlers: HashMap<TypeId, MessageHandler>,
  76    notifications: Option<mpsc::UnboundedSender<()>>,
  77}
  78
  79pub trait Executor: Send + Clone {
  80    type Sleep: Send + Future;
  81    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  82    fn sleep(&self, duration: Duration) -> Self::Sleep;
  83}
  84
  85#[derive(Clone)]
  86pub struct RealExecutor;
  87
  88const MESSAGE_COUNT_PER_PAGE: usize = 100;
  89const MAX_MESSAGE_LEN: usize = 1024;
  90
  91struct StoreReadGuard<'a> {
  92    guard: RwLockReadGuard<'a, Store>,
  93    _not_send: PhantomData<Rc<()>>,
  94}
  95
  96struct StoreWriteGuard<'a> {
  97    guard: RwLockWriteGuard<'a, Store>,
  98    _not_send: PhantomData<Rc<()>>,
  99}
 100
 101impl Server {
 102    pub fn new(
 103        app_state: Arc<AppState>,
 104        notifications: Option<mpsc::UnboundedSender<()>>,
 105    ) -> Arc<Self> {
 106        let mut server = Self {
 107            peer: Peer::new(),
 108            app_state,
 109            store: Default::default(),
 110            handlers: Default::default(),
 111            notifications,
 112        };
 113
 114        server
 115            .add_request_handler(Server::ping)
 116            .add_request_handler(Server::register_project)
 117            .add_message_handler(Server::unregister_project)
 118            .add_request_handler(Server::share_project)
 119            .add_message_handler(Server::unshare_project)
 120            .add_request_handler(Server::join_project)
 121            .add_message_handler(Server::leave_project)
 122            .add_request_handler(Server::register_worktree)
 123            .add_message_handler(Server::unregister_worktree)
 124            .add_request_handler(Server::update_worktree)
 125            .add_message_handler(Server::start_language_server)
 126            .add_message_handler(Server::update_language_server)
 127            .add_message_handler(Server::update_diagnostic_summary)
 128            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 129            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 130            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 131            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 132            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 133            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 134            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 135            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 136            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 137            .add_request_handler(
 138                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 139            )
 140            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 141            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 142            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 143            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 144            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 145            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 146            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 147            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 148            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 149            .add_request_handler(Server::update_buffer)
 150            .add_message_handler(Server::update_buffer_file)
 151            .add_message_handler(Server::buffer_reloaded)
 152            .add_message_handler(Server::buffer_saved)
 153            .add_request_handler(Server::save_buffer)
 154            .add_request_handler(Server::get_channels)
 155            .add_request_handler(Server::get_users)
 156            .add_request_handler(Server::fuzzy_search_users)
 157            .add_request_handler(Server::request_contact)
 158            .add_request_handler(Server::remove_contact)
 159            .add_request_handler(Server::respond_to_contact_request)
 160            .add_request_handler(Server::join_channel)
 161            .add_message_handler(Server::leave_channel)
 162            .add_request_handler(Server::send_channel_message)
 163            .add_request_handler(Server::follow)
 164            .add_message_handler(Server::unfollow)
 165            .add_message_handler(Server::update_followers)
 166            .add_request_handler(Server::get_channel_messages);
 167
 168        Arc::new(server)
 169    }
 170
 171    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 172    where
 173        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 174        Fut: 'static + Send + Future<Output = Result<()>>,
 175        M: EnvelopedMessage,
 176    {
 177        let prev_handler = self.handlers.insert(
 178            TypeId::of::<M>(),
 179            Box::new(move |server, envelope| {
 180                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 181                let span = info_span!(
 182                    "handle message",
 183                    payload_type = envelope.payload_type_name(),
 184                    payload = format!("{:?}", envelope.payload).as_str(),
 185                );
 186                let future = (handler)(server, *envelope);
 187                async move {
 188                    if let Err(error) = future.await {
 189                        tracing::error!(%error, "error handling message");
 190                    }
 191                }
 192                .instrument(span)
 193                .boxed()
 194            }),
 195        );
 196        if prev_handler.is_some() {
 197            panic!("registered a handler for the same message twice");
 198        }
 199        self
 200    }
 201
 202    /// Handle a request while holding a lock to the store. This is useful when we're registering
 203    /// a connection but we want to respond on the connection before anybody else can send on it.
 204    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 205    where
 206        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
 207        Fut: Send + Future<Output = Result<()>>,
 208        M: RequestMessage,
 209    {
 210        let handler = Arc::new(handler);
 211        self.add_message_handler(move |server, envelope| {
 212            let receipt = envelope.receipt();
 213            let handler = handler.clone();
 214            async move {
 215                let responded = Arc::new(AtomicBool::default());
 216                let response = Response {
 217                    server: server.clone(),
 218                    responded: responded.clone(),
 219                    receipt: envelope.receipt(),
 220                };
 221                match (handler)(server.clone(), envelope, response).await {
 222                    Ok(()) => {
 223                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 224                            Ok(())
 225                        } else {
 226                            Err(anyhow!("handler did not send a response"))?
 227                        }
 228                    }
 229                    Err(error) => {
 230                        server.peer.respond_with_error(
 231                            receipt,
 232                            proto::Error {
 233                                message: error.to_string(),
 234                            },
 235                        )?;
 236                        Err(error)
 237                    }
 238                }
 239            }
 240        })
 241    }
 242
 243    pub fn handle_connection<E: Executor>(
 244        self: &Arc<Self>,
 245        connection: Connection,
 246        address: String,
 247        user_id: UserId,
 248        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 249        executor: E,
 250    ) -> impl Future<Output = Result<()>> {
 251        let mut this = self.clone();
 252        let span = info_span!("handle connection", %user_id, %address);
 253        async move {
 254            let (connection_id, handle_io, mut incoming_rx) = this
 255                .peer
 256                .add_connection(connection, {
 257                    let executor = executor.clone();
 258                    move |duration| {
 259                        let timer = executor.sleep(duration);
 260                        async move {
 261                            timer.await;
 262                        }
 263                    }
 264                })
 265                .await;
 266
 267            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 268
 269            if let Some(send_connection_id) = send_connection_id.as_mut() {
 270                let _ = send_connection_id.send(connection_id).await;
 271            }
 272
 273            let contacts = this.app_state.db.get_contacts(user_id).await?;
 274
 275            {
 276                let mut store = this.store_mut().await;
 277                store.add_connection(connection_id, user_id);
 278                this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 279            }
 280            this.update_user_contacts(user_id).await?;
 281
 282            let handle_io = handle_io.fuse();
 283            futures::pin_mut!(handle_io);
 284            loop {
 285                let next_message = incoming_rx.next().fuse();
 286                futures::pin_mut!(next_message);
 287                futures::select_biased! {
 288                    result = handle_io => {
 289                        if let Err(error) = result {
 290                            tracing::error!(%error, "error handling I/O");
 291                        }
 292                        break;
 293                    }
 294                    message = next_message => {
 295                        if let Some(message) = message {
 296                            let type_name = message.payload_type_name();
 297                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 298                            async {
 299                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 300                                    let notifications = this.notifications.clone();
 301                                    let is_background = message.is_background();
 302                                    let handle_message = (handler)(this.clone(), message);
 303                                    let handle_message = async move {
 304                                        handle_message.await;
 305                                        if let Some(mut notifications) = notifications {
 306                                            let _ = notifications.send(()).await;
 307                                        }
 308                                    };
 309                                    if is_background {
 310                                        executor.spawn_detached(handle_message);
 311                                    } else {
 312                                        handle_message.await;
 313                                    }
 314                                } else {
 315                                    tracing::error!("no message handler");
 316                                }
 317                            }.instrument(span).await;
 318                        } else {
 319                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 320                            break;
 321                        }
 322                    }
 323                }
 324            }
 325
 326            if let Err(error) = this.sign_out(connection_id).await {
 327                tracing::error!(%error, "error signing out");
 328            }
 329
 330            Ok(())
 331        }.instrument(span)
 332    }
 333
 334    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 335        self.peer.disconnect(connection_id);
 336        let removed_connection = self.store_mut().await.remove_connection(connection_id)?;
 337
 338        for (project_id, project) in removed_connection.hosted_projects {
 339            if let Some(share) = project.share {
 340                broadcast(connection_id, share.guests.keys().copied(), |conn_id| {
 341                    self.peer
 342                        .send(conn_id, proto::UnshareProject { project_id })
 343                });
 344            }
 345        }
 346
 347        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 348            broadcast(connection_id, peer_ids, |conn_id| {
 349                self.peer.send(
 350                    conn_id,
 351                    proto::RemoveProjectCollaborator {
 352                        project_id,
 353                        peer_id: connection_id.0,
 354                    },
 355                )
 356            });
 357        }
 358
 359        self.update_user_contacts(removed_connection.user_id)
 360            .await?;
 361
 362        Ok(())
 363    }
 364
 365    async fn ping(
 366        self: Arc<Server>,
 367        _: TypedEnvelope<proto::Ping>,
 368        response: Response<proto::Ping>,
 369    ) -> Result<()> {
 370        response.send(proto::Ack {})?;
 371        Ok(())
 372    }
 373
 374    async fn register_project(
 375        self: Arc<Server>,
 376        request: TypedEnvelope<proto::RegisterProject>,
 377        response: Response<proto::RegisterProject>,
 378    ) -> Result<()> {
 379        let project_id = {
 380            let mut state = self.store_mut().await;
 381            let user_id = state.user_id_for_connection(request.sender_id)?;
 382            state.register_project(request.sender_id, user_id)
 383        };
 384        response.send(proto::RegisterProjectResponse { project_id })?;
 385        Ok(())
 386    }
 387
 388    async fn unregister_project(
 389        self: Arc<Server>,
 390        request: TypedEnvelope<proto::UnregisterProject>,
 391    ) -> Result<()> {
 392        let user_id = {
 393            let mut state = self.store_mut().await;
 394            state.unregister_project(request.payload.project_id, request.sender_id)?;
 395            state.user_id_for_connection(request.sender_id)?
 396        };
 397
 398        self.update_user_contacts(user_id).await?;
 399        Ok(())
 400    }
 401
 402    async fn share_project(
 403        self: Arc<Server>,
 404        request: TypedEnvelope<proto::ShareProject>,
 405        response: Response<proto::ShareProject>,
 406    ) -> Result<()> {
 407        let user_id = {
 408            let mut state = self.store_mut().await;
 409            state.share_project(request.payload.project_id, request.sender_id)?;
 410            state.user_id_for_connection(request.sender_id)?
 411        };
 412        self.update_user_contacts(user_id).await?;
 413        response.send(proto::Ack {})?;
 414        Ok(())
 415    }
 416
 417    async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
 418        let contacts = self.app_state.db.get_contacts(user_id).await?;
 419        let store = self.store().await;
 420        let updated_contact = store.contact_for_user(user_id);
 421        for contact_user_id in contacts.current {
 422            for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
 423                self.peer
 424                    .send(
 425                        contact_conn_id,
 426                        proto::UpdateContacts {
 427                            contacts: vec![updated_contact.clone()],
 428                            remove_contacts: Default::default(),
 429                            incoming_requests: Default::default(),
 430                            remove_incoming_requests: Default::default(),
 431                            outgoing_requests: Default::default(),
 432                            remove_outgoing_requests: Default::default(),
 433                        },
 434                    )
 435                    .trace_err();
 436            }
 437        }
 438        Ok(())
 439    }
 440
 441    async fn unshare_project(
 442        self: Arc<Server>,
 443        request: TypedEnvelope<proto::UnshareProject>,
 444    ) -> Result<()> {
 445        let project_id = request.payload.project_id;
 446        let project;
 447        {
 448            let mut state = self.store_mut().await;
 449            project = state.unshare_project(project_id, request.sender_id)?;
 450            broadcast(request.sender_id, project.connection_ids, |conn_id| {
 451                self.peer
 452                    .send(conn_id, proto::UnshareProject { project_id })
 453            });
 454        }
 455        self.update_user_contacts(project.host_user_id).await?;
 456        Ok(())
 457    }
 458
 459    async fn join_project(
 460        self: Arc<Server>,
 461        request: TypedEnvelope<proto::JoinProject>,
 462        response: Response<proto::JoinProject>,
 463    ) -> Result<()> {
 464        let project_id = request.payload.project_id;
 465        let host_user_id;
 466        let guest_user_id;
 467        {
 468            let state = self.store().await;
 469            host_user_id = state.project(project_id)?.host_user_id;
 470            guest_user_id = state.user_id_for_connection(request.sender_id)?;
 471        };
 472
 473        let guest_contacts = self.app_state.db.get_contacts(guest_user_id).await?;
 474        if !guest_contacts.current.contains(&host_user_id) {
 475            return Err(anyhow!("no such project"))?;
 476        }
 477
 478        {
 479            let state = &mut *self.store_mut().await;
 480            let joined = state.join_project(request.sender_id, guest_user_id, project_id)?;
 481            let share = joined.project.share()?;
 482            let peer_count = share.guests.len();
 483            let mut collaborators = Vec::with_capacity(peer_count);
 484            collaborators.push(proto::Collaborator {
 485                peer_id: joined.project.host_connection_id.0,
 486                replica_id: 0,
 487                user_id: joined.project.host_user_id.to_proto(),
 488            });
 489            let worktrees = share
 490                .worktrees
 491                .iter()
 492                .filter_map(|(id, shared_worktree)| {
 493                    let worktree = joined.project.worktrees.get(&id)?;
 494                    Some(proto::Worktree {
 495                        id: *id,
 496                        root_name: worktree.root_name.clone(),
 497                        entries: shared_worktree.entries.values().cloned().collect(),
 498                        diagnostic_summaries: shared_worktree
 499                            .diagnostic_summaries
 500                            .values()
 501                            .cloned()
 502                            .collect(),
 503                        visible: worktree.visible,
 504                        scan_id: shared_worktree.scan_id,
 505                    })
 506                })
 507                .collect();
 508            for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 509                if *peer_conn_id != request.sender_id {
 510                    collaborators.push(proto::Collaborator {
 511                        peer_id: peer_conn_id.0,
 512                        replica_id: *peer_replica_id as u32,
 513                        user_id: peer_user_id.to_proto(),
 514                    });
 515                }
 516            }
 517            broadcast(
 518                request.sender_id,
 519                joined.project.connection_ids(),
 520                |conn_id| {
 521                    self.peer.send(
 522                        conn_id,
 523                        proto::AddProjectCollaborator {
 524                            project_id,
 525                            collaborator: Some(proto::Collaborator {
 526                                peer_id: request.sender_id.0,
 527                                replica_id: joined.replica_id as u32,
 528                                user_id: guest_user_id.to_proto(),
 529                            }),
 530                        },
 531                    )
 532                },
 533            );
 534            response.send(proto::JoinProjectResponse {
 535                worktrees,
 536                replica_id: joined.replica_id as u32,
 537                collaborators,
 538                language_servers: joined.project.language_servers.clone(),
 539            })?;
 540        }
 541        self.update_user_contacts(host_user_id).await?;
 542        Ok(())
 543    }
 544
 545    async fn leave_project(
 546        self: Arc<Server>,
 547        request: TypedEnvelope<proto::LeaveProject>,
 548    ) -> Result<()> {
 549        let sender_id = request.sender_id;
 550        let project_id = request.payload.project_id;
 551        let project;
 552        {
 553            let mut state = self.store_mut().await;
 554            project = state.leave_project(sender_id, project_id)?;
 555            broadcast(sender_id, project.connection_ids, |conn_id| {
 556                self.peer.send(
 557                    conn_id,
 558                    proto::RemoveProjectCollaborator {
 559                        project_id,
 560                        peer_id: sender_id.0,
 561                    },
 562                )
 563            });
 564        }
 565        self.update_user_contacts(project.host_user_id).await?;
 566        Ok(())
 567    }
 568
 569    async fn register_worktree(
 570        self: Arc<Server>,
 571        request: TypedEnvelope<proto::RegisterWorktree>,
 572        response: Response<proto::RegisterWorktree>,
 573    ) -> Result<()> {
 574        let host_user_id;
 575        {
 576            let mut state = self.store_mut().await;
 577            host_user_id = state.user_id_for_connection(request.sender_id)?;
 578
 579            let guest_connection_ids = state
 580                .read_project(request.payload.project_id, request.sender_id)?
 581                .guest_connection_ids();
 582            state.register_worktree(
 583                request.payload.project_id,
 584                request.payload.worktree_id,
 585                request.sender_id,
 586                Worktree {
 587                    root_name: request.payload.root_name.clone(),
 588                    visible: request.payload.visible,
 589                },
 590            )?;
 591
 592            broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 593                self.peer
 594                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 595            });
 596        }
 597        self.update_user_contacts(host_user_id).await?;
 598        response.send(proto::Ack {})?;
 599        Ok(())
 600    }
 601
 602    async fn unregister_worktree(
 603        self: Arc<Server>,
 604        request: TypedEnvelope<proto::UnregisterWorktree>,
 605    ) -> Result<()> {
 606        let host_user_id;
 607        let project_id = request.payload.project_id;
 608        let worktree_id = request.payload.worktree_id;
 609        {
 610            let mut state = self.store_mut().await;
 611            let (_, guest_connection_ids) =
 612                state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 613            host_user_id = state.user_id_for_connection(request.sender_id)?;
 614            broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 615                self.peer.send(
 616                    conn_id,
 617                    proto::UnregisterWorktree {
 618                        project_id,
 619                        worktree_id,
 620                    },
 621                )
 622            });
 623        }
 624        self.update_user_contacts(host_user_id).await?;
 625        Ok(())
 626    }
 627
 628    async fn update_worktree(
 629        self: Arc<Server>,
 630        request: TypedEnvelope<proto::UpdateWorktree>,
 631        response: Response<proto::UpdateWorktree>,
 632    ) -> Result<()> {
 633        let connection_ids = self.store_mut().await.update_worktree(
 634            request.sender_id,
 635            request.payload.project_id,
 636            request.payload.worktree_id,
 637            &request.payload.removed_entries,
 638            &request.payload.updated_entries,
 639            request.payload.scan_id,
 640        )?;
 641
 642        broadcast(request.sender_id, connection_ids, |connection_id| {
 643            self.peer
 644                .forward_send(request.sender_id, connection_id, request.payload.clone())
 645        });
 646        response.send(proto::Ack {})?;
 647        Ok(())
 648    }
 649
 650    async fn update_diagnostic_summary(
 651        self: Arc<Server>,
 652        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 653    ) -> Result<()> {
 654        let summary = request
 655            .payload
 656            .summary
 657            .clone()
 658            .ok_or_else(|| anyhow!("invalid summary"))?;
 659        let receiver_ids = self.store_mut().await.update_diagnostic_summary(
 660            request.payload.project_id,
 661            request.payload.worktree_id,
 662            request.sender_id,
 663            summary,
 664        )?;
 665
 666        broadcast(request.sender_id, receiver_ids, |connection_id| {
 667            self.peer
 668                .forward_send(request.sender_id, connection_id, request.payload.clone())
 669        });
 670        Ok(())
 671    }
 672
 673    async fn start_language_server(
 674        self: Arc<Server>,
 675        request: TypedEnvelope<proto::StartLanguageServer>,
 676    ) -> Result<()> {
 677        let receiver_ids = self.store_mut().await.start_language_server(
 678            request.payload.project_id,
 679            request.sender_id,
 680            request
 681                .payload
 682                .server
 683                .clone()
 684                .ok_or_else(|| anyhow!("invalid language server"))?,
 685        )?;
 686        broadcast(request.sender_id, receiver_ids, |connection_id| {
 687            self.peer
 688                .forward_send(request.sender_id, connection_id, request.payload.clone())
 689        });
 690        Ok(())
 691    }
 692
 693    async fn update_language_server(
 694        self: Arc<Server>,
 695        request: TypedEnvelope<proto::UpdateLanguageServer>,
 696    ) -> Result<()> {
 697        let receiver_ids = self
 698            .store()
 699            .await
 700            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 701        broadcast(request.sender_id, receiver_ids, |connection_id| {
 702            self.peer
 703                .forward_send(request.sender_id, connection_id, request.payload.clone())
 704        });
 705        Ok(())
 706    }
 707
 708    async fn forward_project_request<T>(
 709        self: Arc<Server>,
 710        request: TypedEnvelope<T>,
 711        response: Response<T>,
 712    ) -> Result<()>
 713    where
 714        T: EntityMessage + RequestMessage,
 715    {
 716        let host_connection_id = self
 717            .store()
 718            .await
 719            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 720            .host_connection_id;
 721
 722        response.send(
 723            self.peer
 724                .forward_request(request.sender_id, host_connection_id, request.payload)
 725                .await?,
 726        )?;
 727        Ok(())
 728    }
 729
 730    async fn save_buffer(
 731        self: Arc<Server>,
 732        request: TypedEnvelope<proto::SaveBuffer>,
 733        response: Response<proto::SaveBuffer>,
 734    ) -> Result<()> {
 735        let host = self
 736            .store()
 737            .await
 738            .read_project(request.payload.project_id, request.sender_id)?
 739            .host_connection_id;
 740        let response_payload = self
 741            .peer
 742            .forward_request(request.sender_id, host, request.payload.clone())
 743            .await?;
 744
 745        let mut guests = self
 746            .store()
 747            .await
 748            .read_project(request.payload.project_id, request.sender_id)?
 749            .connection_ids();
 750        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 751        broadcast(host, guests, |conn_id| {
 752            self.peer
 753                .forward_send(host, conn_id, response_payload.clone())
 754        });
 755        response.send(response_payload)?;
 756        Ok(())
 757    }
 758
 759    async fn update_buffer(
 760        self: Arc<Server>,
 761        request: TypedEnvelope<proto::UpdateBuffer>,
 762        response: Response<proto::UpdateBuffer>,
 763    ) -> Result<()> {
 764        let receiver_ids = self
 765            .store()
 766            .await
 767            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 768        broadcast(request.sender_id, receiver_ids, |connection_id| {
 769            self.peer
 770                .forward_send(request.sender_id, connection_id, request.payload.clone())
 771        });
 772        response.send(proto::Ack {})?;
 773        Ok(())
 774    }
 775
 776    async fn update_buffer_file(
 777        self: Arc<Server>,
 778        request: TypedEnvelope<proto::UpdateBufferFile>,
 779    ) -> Result<()> {
 780        let receiver_ids = self
 781            .store()
 782            .await
 783            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 784        broadcast(request.sender_id, receiver_ids, |connection_id| {
 785            self.peer
 786                .forward_send(request.sender_id, connection_id, request.payload.clone())
 787        });
 788        Ok(())
 789    }
 790
 791    async fn buffer_reloaded(
 792        self: Arc<Server>,
 793        request: TypedEnvelope<proto::BufferReloaded>,
 794    ) -> Result<()> {
 795        let receiver_ids = self
 796            .store()
 797            .await
 798            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 799        broadcast(request.sender_id, receiver_ids, |connection_id| {
 800            self.peer
 801                .forward_send(request.sender_id, connection_id, request.payload.clone())
 802        });
 803        Ok(())
 804    }
 805
 806    async fn buffer_saved(
 807        self: Arc<Server>,
 808        request: TypedEnvelope<proto::BufferSaved>,
 809    ) -> Result<()> {
 810        let receiver_ids = self
 811            .store()
 812            .await
 813            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 814        broadcast(request.sender_id, receiver_ids, |connection_id| {
 815            self.peer
 816                .forward_send(request.sender_id, connection_id, request.payload.clone())
 817        });
 818        Ok(())
 819    }
 820
 821    async fn follow(
 822        self: Arc<Self>,
 823        request: TypedEnvelope<proto::Follow>,
 824        response: Response<proto::Follow>,
 825    ) -> Result<()> {
 826        let leader_id = ConnectionId(request.payload.leader_id);
 827        let follower_id = request.sender_id;
 828        if !self
 829            .store()
 830            .await
 831            .project_connection_ids(request.payload.project_id, follower_id)?
 832            .contains(&leader_id)
 833        {
 834            Err(anyhow!("no such peer"))?;
 835        }
 836        let mut response_payload = self
 837            .peer
 838            .forward_request(request.sender_id, leader_id, request.payload)
 839            .await?;
 840        response_payload
 841            .views
 842            .retain(|view| view.leader_id != Some(follower_id.0));
 843        response.send(response_payload)?;
 844        Ok(())
 845    }
 846
 847    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 848        let leader_id = ConnectionId(request.payload.leader_id);
 849        if !self
 850            .store()
 851            .await
 852            .project_connection_ids(request.payload.project_id, request.sender_id)?
 853            .contains(&leader_id)
 854        {
 855            Err(anyhow!("no such peer"))?;
 856        }
 857        self.peer
 858            .forward_send(request.sender_id, leader_id, request.payload)?;
 859        Ok(())
 860    }
 861
 862    async fn update_followers(
 863        self: Arc<Self>,
 864        request: TypedEnvelope<proto::UpdateFollowers>,
 865    ) -> Result<()> {
 866        let connection_ids = self
 867            .store()
 868            .await
 869            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 870        let leader_id = request
 871            .payload
 872            .variant
 873            .as_ref()
 874            .and_then(|variant| match variant {
 875                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 876                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 877                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 878            });
 879        for follower_id in &request.payload.follower_ids {
 880            let follower_id = ConnectionId(*follower_id);
 881            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 882                self.peer
 883                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 884            }
 885        }
 886        Ok(())
 887    }
 888
 889    async fn get_channels(
 890        self: Arc<Server>,
 891        request: TypedEnvelope<proto::GetChannels>,
 892        response: Response<proto::GetChannels>,
 893    ) -> Result<()> {
 894        let user_id = self
 895            .store()
 896            .await
 897            .user_id_for_connection(request.sender_id)?;
 898        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 899        response.send(proto::GetChannelsResponse {
 900            channels: channels
 901                .into_iter()
 902                .map(|chan| proto::Channel {
 903                    id: chan.id.to_proto(),
 904                    name: chan.name,
 905                })
 906                .collect(),
 907        })?;
 908        Ok(())
 909    }
 910
 911    async fn get_users(
 912        self: Arc<Server>,
 913        request: TypedEnvelope<proto::GetUsers>,
 914        response: Response<proto::GetUsers>,
 915    ) -> Result<()> {
 916        let user_ids = request
 917            .payload
 918            .user_ids
 919            .into_iter()
 920            .map(UserId::from_proto)
 921            .collect();
 922        let users = self
 923            .app_state
 924            .db
 925            .get_users_by_ids(user_ids)
 926            .await?
 927            .into_iter()
 928            .map(|user| proto::User {
 929                id: user.id.to_proto(),
 930                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 931                github_login: user.github_login,
 932            })
 933            .collect();
 934        response.send(proto::UsersResponse { users })?;
 935        Ok(())
 936    }
 937
 938    async fn fuzzy_search_users(
 939        self: Arc<Server>,
 940        request: TypedEnvelope<proto::FuzzySearchUsers>,
 941        response: Response<proto::FuzzySearchUsers>,
 942    ) -> Result<()> {
 943        let query = request.payload.query;
 944        let db = &self.app_state.db;
 945        let users = match query.len() {
 946            0 => vec![],
 947            1 | 2 => db
 948                .get_user_by_github_login(&query)
 949                .await?
 950                .into_iter()
 951                .collect(),
 952            _ => db.fuzzy_search_users(&query, 10).await?,
 953        };
 954        let users = users
 955            .into_iter()
 956            .map(|user| proto::User {
 957                id: user.id.to_proto(),
 958                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 959                github_login: user.github_login,
 960            })
 961            .collect();
 962        response.send(proto::UsersResponse { users })?;
 963        Ok(())
 964    }
 965
 966    async fn request_contact(
 967        self: Arc<Server>,
 968        request: TypedEnvelope<proto::RequestContact>,
 969        response: Response<proto::RequestContact>,
 970    ) -> Result<()> {
 971        let requester_id = self
 972            .store()
 973            .await
 974            .user_id_for_connection(request.sender_id)?;
 975        let responder_id = UserId::from_proto(request.payload.responder_id);
 976        self.app_state
 977            .db
 978            .send_contact_request(requester_id, responder_id)
 979            .await?;
 980
 981        // Update outgoing contact requests of requester
 982        let mut update = proto::UpdateContacts::default();
 983        update.outgoing_requests.push(responder_id.to_proto());
 984        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
 985            self.peer.send(connection_id, update.clone())?;
 986        }
 987
 988        // Update incoming contact requests of responder
 989        let mut update = proto::UpdateContacts::default();
 990        update
 991            .incoming_requests
 992            .push(proto::IncomingContactRequest {
 993                requester_id: requester_id.to_proto(),
 994                should_notify: true,
 995            });
 996        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
 997            self.peer.send(connection_id, update.clone())?;
 998        }
 999
1000        response.send(proto::Ack {})?;
1001        Ok(())
1002    }
1003
1004    async fn respond_to_contact_request(
1005        self: Arc<Server>,
1006        request: TypedEnvelope<proto::RespondToContactRequest>,
1007        response: Response<proto::RespondToContactRequest>,
1008    ) -> Result<()> {
1009        let responder_id = self
1010            .store()
1011            .await
1012            .user_id_for_connection(request.sender_id)?;
1013        let requester_id = UserId::from_proto(request.payload.requester_id);
1014        let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1015        self.app_state
1016            .db
1017            .respond_to_contact_request(responder_id, requester_id, accept)
1018            .await?;
1019
1020        let store = self.store().await;
1021        // Update responder with new contact
1022        let mut update = proto::UpdateContacts::default();
1023        if accept {
1024            update.contacts.push(store.contact_for_user(requester_id));
1025        }
1026        update
1027            .remove_incoming_requests
1028            .push(requester_id.to_proto());
1029        for connection_id in store.connection_ids_for_user(responder_id) {
1030            self.peer.send(connection_id, update.clone())?;
1031        }
1032
1033        // Update requester with new contact
1034        let mut update = proto::UpdateContacts::default();
1035        if accept {
1036            update.contacts.push(store.contact_for_user(responder_id));
1037        }
1038        update
1039            .remove_outgoing_requests
1040            .push(responder_id.to_proto());
1041        for connection_id in store.connection_ids_for_user(requester_id) {
1042            self.peer.send(connection_id, update.clone())?;
1043        }
1044
1045        response.send(proto::Ack {})?;
1046        Ok(())
1047    }
1048
1049    async fn remove_contact(
1050        self: Arc<Server>,
1051        request: TypedEnvelope<proto::RemoveContact>,
1052        response: Response<proto::RemoveContact>,
1053    ) -> Result<()> {
1054        let requester_id = self
1055            .store()
1056            .await
1057            .user_id_for_connection(request.sender_id)?;
1058        let responder_id = UserId::from_proto(request.payload.user_id);
1059        self.app_state
1060            .db
1061            .remove_contact(requester_id, responder_id)
1062            .await?;
1063
1064        // Update outgoing contact requests of requester
1065        let mut update = proto::UpdateContacts::default();
1066        update
1067            .remove_outgoing_requests
1068            .push(responder_id.to_proto());
1069        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1070            self.peer.send(connection_id, update.clone())?;
1071        }
1072
1073        // Update incoming contact requests of responder
1074        let mut update = proto::UpdateContacts::default();
1075        update
1076            .remove_incoming_requests
1077            .push(requester_id.to_proto());
1078        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1079            self.peer.send(connection_id, update.clone())?;
1080        }
1081
1082        response.send(proto::Ack {})?;
1083        Ok(())
1084    }
1085
1086    // #[instrument(skip(self, state, user_ids))]
1087    // fn update_contacts_for_users<'a>(
1088    //     self: &Arc<Self>,
1089    //     state: &Store,
1090    //     user_ids: impl IntoIterator<Item = &'a UserId>,
1091    // ) {
1092    //     for user_id in user_ids {
1093    //         let contacts = state.contacts_for_user(*user_id);
1094    //         for connection_id in state.connection_ids_for_user(*user_id) {
1095    //             self.peer
1096    //                 .send(
1097    //                     connection_id,
1098    //                     proto::UpdateContacts {
1099    //                         contacts: contacts.clone(),
1100    //                         pending_requests_from_user_ids: Default::default(),
1101    //                         pending_requests_to_user_ids: Default::default(),
1102    //                     },
1103    //                 )
1104    //                 .trace_err();
1105    //         }
1106    //     }
1107    // }
1108
1109    async fn join_channel(
1110        self: Arc<Self>,
1111        request: TypedEnvelope<proto::JoinChannel>,
1112        response: Response<proto::JoinChannel>,
1113    ) -> Result<()> {
1114        let user_id = self
1115            .store()
1116            .await
1117            .user_id_for_connection(request.sender_id)?;
1118        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1119        if !self
1120            .app_state
1121            .db
1122            .can_user_access_channel(user_id, channel_id)
1123            .await?
1124        {
1125            Err(anyhow!("access denied"))?;
1126        }
1127
1128        self.store_mut()
1129            .await
1130            .join_channel(request.sender_id, channel_id);
1131        let messages = self
1132            .app_state
1133            .db
1134            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1135            .await?
1136            .into_iter()
1137            .map(|msg| proto::ChannelMessage {
1138                id: msg.id.to_proto(),
1139                body: msg.body,
1140                timestamp: msg.sent_at.unix_timestamp() as u64,
1141                sender_id: msg.sender_id.to_proto(),
1142                nonce: Some(msg.nonce.as_u128().into()),
1143            })
1144            .collect::<Vec<_>>();
1145        response.send(proto::JoinChannelResponse {
1146            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1147            messages,
1148        })?;
1149        Ok(())
1150    }
1151
1152    async fn leave_channel(
1153        self: Arc<Self>,
1154        request: TypedEnvelope<proto::LeaveChannel>,
1155    ) -> Result<()> {
1156        let user_id = self
1157            .store()
1158            .await
1159            .user_id_for_connection(request.sender_id)?;
1160        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1161        if !self
1162            .app_state
1163            .db
1164            .can_user_access_channel(user_id, channel_id)
1165            .await?
1166        {
1167            Err(anyhow!("access denied"))?;
1168        }
1169
1170        self.store_mut()
1171            .await
1172            .leave_channel(request.sender_id, channel_id);
1173
1174        Ok(())
1175    }
1176
1177    async fn send_channel_message(
1178        self: Arc<Self>,
1179        request: TypedEnvelope<proto::SendChannelMessage>,
1180        response: Response<proto::SendChannelMessage>,
1181    ) -> Result<()> {
1182        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1183        let user_id;
1184        let connection_ids;
1185        {
1186            let state = self.store().await;
1187            user_id = state.user_id_for_connection(request.sender_id)?;
1188            connection_ids = state.channel_connection_ids(channel_id)?;
1189        }
1190
1191        // Validate the message body.
1192        let body = request.payload.body.trim().to_string();
1193        if body.len() > MAX_MESSAGE_LEN {
1194            return Err(anyhow!("message is too long"))?;
1195        }
1196        if body.is_empty() {
1197            return Err(anyhow!("message can't be blank"))?;
1198        }
1199
1200        let timestamp = OffsetDateTime::now_utc();
1201        let nonce = request
1202            .payload
1203            .nonce
1204            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1205
1206        let message_id = self
1207            .app_state
1208            .db
1209            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1210            .await?
1211            .to_proto();
1212        let message = proto::ChannelMessage {
1213            sender_id: user_id.to_proto(),
1214            id: message_id,
1215            body,
1216            timestamp: timestamp.unix_timestamp() as u64,
1217            nonce: Some(nonce),
1218        };
1219        broadcast(request.sender_id, connection_ids, |conn_id| {
1220            self.peer.send(
1221                conn_id,
1222                proto::ChannelMessageSent {
1223                    channel_id: channel_id.to_proto(),
1224                    message: Some(message.clone()),
1225                },
1226            )
1227        });
1228        response.send(proto::SendChannelMessageResponse {
1229            message: Some(message),
1230        })?;
1231        Ok(())
1232    }
1233
1234    async fn get_channel_messages(
1235        self: Arc<Self>,
1236        request: TypedEnvelope<proto::GetChannelMessages>,
1237        response: Response<proto::GetChannelMessages>,
1238    ) -> Result<()> {
1239        let user_id = self
1240            .store()
1241            .await
1242            .user_id_for_connection(request.sender_id)?;
1243        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1244        if !self
1245            .app_state
1246            .db
1247            .can_user_access_channel(user_id, channel_id)
1248            .await?
1249        {
1250            Err(anyhow!("access denied"))?;
1251        }
1252
1253        let messages = self
1254            .app_state
1255            .db
1256            .get_channel_messages(
1257                channel_id,
1258                MESSAGE_COUNT_PER_PAGE,
1259                Some(MessageId::from_proto(request.payload.before_message_id)),
1260            )
1261            .await?
1262            .into_iter()
1263            .map(|msg| proto::ChannelMessage {
1264                id: msg.id.to_proto(),
1265                body: msg.body,
1266                timestamp: msg.sent_at.unix_timestamp() as u64,
1267                sender_id: msg.sender_id.to_proto(),
1268                nonce: Some(msg.nonce.as_u128().into()),
1269            })
1270            .collect::<Vec<_>>();
1271        response.send(proto::GetChannelMessagesResponse {
1272            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1273            messages,
1274        })?;
1275        Ok(())
1276    }
1277
1278    async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1279        #[cfg(test)]
1280        tokio::task::yield_now().await;
1281        let guard = self.store.read().await;
1282        #[cfg(test)]
1283        tokio::task::yield_now().await;
1284        StoreReadGuard {
1285            guard,
1286            _not_send: PhantomData,
1287        }
1288    }
1289
1290    async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1291        #[cfg(test)]
1292        tokio::task::yield_now().await;
1293        let guard = self.store.write().await;
1294        #[cfg(test)]
1295        tokio::task::yield_now().await;
1296        StoreWriteGuard {
1297            guard,
1298            _not_send: PhantomData,
1299        }
1300    }
1301}
1302
1303impl<'a> Deref for StoreReadGuard<'a> {
1304    type Target = Store;
1305
1306    fn deref(&self) -> &Self::Target {
1307        &*self.guard
1308    }
1309}
1310
1311impl<'a> Deref for StoreWriteGuard<'a> {
1312    type Target = Store;
1313
1314    fn deref(&self) -> &Self::Target {
1315        &*self.guard
1316    }
1317}
1318
1319impl<'a> DerefMut for StoreWriteGuard<'a> {
1320    fn deref_mut(&mut self) -> &mut Self::Target {
1321        &mut *self.guard
1322    }
1323}
1324
1325impl<'a> Drop for StoreWriteGuard<'a> {
1326    fn drop(&mut self) {
1327        #[cfg(test)]
1328        self.check_invariants();
1329
1330        let metrics = self.metrics();
1331        tracing::info!(
1332            connections = metrics.connections,
1333            registered_projects = metrics.registered_projects,
1334            shared_projects = metrics.shared_projects,
1335            "metrics"
1336        );
1337    }
1338}
1339
1340impl Executor for RealExecutor {
1341    type Sleep = Sleep;
1342
1343    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1344        tokio::task::spawn(future);
1345    }
1346
1347    fn sleep(&self, duration: Duration) -> Self::Sleep {
1348        tokio::time::sleep(duration)
1349    }
1350}
1351
1352fn broadcast<F>(
1353    sender_id: ConnectionId,
1354    receiver_ids: impl IntoIterator<Item = ConnectionId>,
1355    mut f: F,
1356) where
1357    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1358{
1359    for receiver_id in receiver_ids {
1360        if receiver_id != sender_id {
1361            f(receiver_id).trace_err();
1362        }
1363    }
1364}
1365
1366lazy_static! {
1367    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1368}
1369
1370pub struct ProtocolVersion(u32);
1371
1372impl Header for ProtocolVersion {
1373    fn name() -> &'static HeaderName {
1374        &ZED_PROTOCOL_VERSION
1375    }
1376
1377    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1378    where
1379        Self: Sized,
1380        I: Iterator<Item = &'i axum::http::HeaderValue>,
1381    {
1382        let version = values
1383            .next()
1384            .ok_or_else(|| axum::headers::Error::invalid())?
1385            .to_str()
1386            .map_err(|_| axum::headers::Error::invalid())?
1387            .parse()
1388            .map_err(|_| axum::headers::Error::invalid())?;
1389        Ok(Self(version))
1390    }
1391
1392    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1393        values.extend([self.0.to_string().parse().unwrap()]);
1394    }
1395}
1396
1397pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1398    let server = Server::new(app_state.clone(), None);
1399    Router::new()
1400        .route("/rpc", get(handle_websocket_request))
1401        .layer(
1402            ServiceBuilder::new()
1403                .layer(Extension(app_state))
1404                .layer(middleware::from_fn(auth::validate_header))
1405                .layer(Extension(server)),
1406        )
1407}
1408
1409pub async fn handle_websocket_request(
1410    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1411    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1412    Extension(server): Extension<Arc<Server>>,
1413    Extension(user_id): Extension<UserId>,
1414    ws: WebSocketUpgrade,
1415) -> axum::response::Response {
1416    if protocol_version != rpc::PROTOCOL_VERSION {
1417        return (
1418            StatusCode::UPGRADE_REQUIRED,
1419            "client must be upgraded".to_string(),
1420        )
1421            .into_response();
1422    }
1423    let socket_address = socket_address.to_string();
1424    ws.on_upgrade(move |socket| {
1425        use util::ResultExt;
1426        let socket = socket
1427            .map_ok(to_tungstenite_message)
1428            .err_into()
1429            .with(|message| async move { Ok(to_axum_message(message)) });
1430        let connection = Connection::new(Box::pin(socket));
1431        async move {
1432            server
1433                .handle_connection(connection, socket_address, user_id, None, RealExecutor)
1434                .await
1435                .log_err();
1436        }
1437    })
1438}
1439
1440fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1441    match message {
1442        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1443        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1444        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1445        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1446        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1447            code: frame.code.into(),
1448            reason: frame.reason,
1449        })),
1450    }
1451}
1452
1453fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1454    match message {
1455        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1456        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1457        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1458        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1459        AxumMessage::Close(frame) => {
1460            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1461                code: frame.code.into(),
1462                reason: frame.reason,
1463            }))
1464        }
1465    }
1466}
1467
1468pub trait ResultExt {
1469    type Ok;
1470
1471    fn trace_err(self) -> Option<Self::Ok>;
1472}
1473
1474impl<T, E> ResultExt for Result<T, E>
1475where
1476    E: std::fmt::Debug,
1477{
1478    type Ok = T;
1479
1480    fn trace_err(self) -> Option<T> {
1481        match self {
1482            Ok(value) => Some(value),
1483            Err(error) => {
1484                tracing::error!("{:?}", error);
1485                None
1486            }
1487        }
1488    }
1489}
1490
1491#[cfg(test)]
1492mod tests {
1493    use super::*;
1494    use crate::{
1495        db::{tests::TestDb, UserId},
1496        AppState,
1497    };
1498    use ::rpc::Peer;
1499    use client::{
1500        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1501        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1502    };
1503    use collections::{BTreeMap, HashSet};
1504    use editor::{
1505        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1506        ToOffset, ToggleCodeActions, Undo,
1507    };
1508    use gpui::{
1509        executor::{self, Deterministic},
1510        geometry::vector::vec2f,
1511        ModelHandle, TestAppContext, ViewHandle,
1512    };
1513    use language::{
1514        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1515        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1516    };
1517    use lsp::{self, FakeLanguageServer};
1518    use parking_lot::Mutex;
1519    use project::{
1520        fs::{FakeFs, Fs as _},
1521        search::SearchQuery,
1522        worktree::WorktreeHandle,
1523        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1524    };
1525    use rand::prelude::*;
1526    use rpc::PeerId;
1527    use serde_json::json;
1528    use settings::Settings;
1529    use sqlx::types::time::OffsetDateTime;
1530    use std::{
1531        env,
1532        ops::Deref,
1533        path::{Path, PathBuf},
1534        rc::Rc,
1535        sync::{
1536            atomic::{AtomicBool, Ordering::SeqCst},
1537            Arc,
1538        },
1539        time::Duration,
1540    };
1541    use theme::ThemeRegistry;
1542    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1543
1544    #[cfg(test)]
1545    #[ctor::ctor]
1546    fn init_logger() {
1547        if std::env::var("RUST_LOG").is_ok() {
1548            env_logger::init();
1549        }
1550    }
1551
1552    #[gpui::test(iterations = 10)]
1553    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1554        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1555        let lang_registry = Arc::new(LanguageRegistry::test());
1556        let fs = FakeFs::new(cx_a.background());
1557        cx_a.foreground().forbid_parking();
1558
1559        // Connect to a server as 2 clients.
1560        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1561        let client_a = server.create_client(cx_a, "user_a").await;
1562        let client_b = server.create_client(cx_b, "user_b").await;
1563        server
1564            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1565            .await;
1566
1567        // Share a project as client A
1568        fs.insert_tree(
1569            "/a",
1570            json!({
1571                "a.txt": "a-contents",
1572                "b.txt": "b-contents",
1573            }),
1574        )
1575        .await;
1576        let project_a = cx_a.update(|cx| {
1577            Project::local(
1578                client_a.clone(),
1579                client_a.user_store.clone(),
1580                lang_registry.clone(),
1581                fs.clone(),
1582                cx,
1583            )
1584        });
1585        let (worktree_a, _) = project_a
1586            .update(cx_a, |p, cx| {
1587                p.find_or_create_local_worktree("/a", true, cx)
1588            })
1589            .await
1590            .unwrap();
1591        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1592        worktree_a
1593            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1594            .await;
1595        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1596        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1597
1598        // Join that project as client B
1599        let project_b = Project::remote(
1600            project_id,
1601            client_b.clone(),
1602            client_b.user_store.clone(),
1603            lang_registry.clone(),
1604            fs.clone(),
1605            &mut cx_b.to_async(),
1606        )
1607        .await
1608        .unwrap();
1609
1610        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1611            assert_eq!(
1612                project
1613                    .collaborators()
1614                    .get(&client_a.peer_id)
1615                    .unwrap()
1616                    .user
1617                    .github_login,
1618                "user_a"
1619            );
1620            project.replica_id()
1621        });
1622        project_a
1623            .condition(&cx_a, |tree, _| {
1624                tree.collaborators()
1625                    .get(&client_b.peer_id)
1626                    .map_or(false, |collaborator| {
1627                        collaborator.replica_id == replica_id_b
1628                            && collaborator.user.github_login == "user_b"
1629                    })
1630            })
1631            .await;
1632
1633        // Open the same file as client B and client A.
1634        let buffer_b = project_b
1635            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1636            .await
1637            .unwrap();
1638        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1639        project_a.read_with(cx_a, |project, cx| {
1640            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1641        });
1642        let buffer_a = project_a
1643            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1644            .await
1645            .unwrap();
1646
1647        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1648
1649        // TODO
1650        // // Create a selection set as client B and see that selection set as client A.
1651        // buffer_a
1652        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1653        //     .await;
1654
1655        // Edit the buffer as client B and see that edit as client A.
1656        editor_b.update(cx_b, |editor, cx| {
1657            editor.handle_input(&Input("ok, ".into()), cx)
1658        });
1659        buffer_a
1660            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1661            .await;
1662
1663        // TODO
1664        // // Remove the selection set as client B, see those selections disappear as client A.
1665        cx_b.update(move |_| drop(editor_b));
1666        // buffer_a
1667        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1668        //     .await;
1669
1670        // Dropping the client B's project removes client B from client A's collaborators.
1671        cx_b.update(move |_| drop(project_b));
1672        project_a
1673            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1674            .await;
1675    }
1676
1677    #[gpui::test(iterations = 10)]
1678    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1679        let lang_registry = Arc::new(LanguageRegistry::test());
1680        let fs = FakeFs::new(cx_a.background());
1681        cx_a.foreground().forbid_parking();
1682
1683        // Connect to a server as 2 clients.
1684        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1685        let client_a = server.create_client(cx_a, "user_a").await;
1686        let client_b = server.create_client(cx_b, "user_b").await;
1687        server
1688            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1689            .await;
1690
1691        // Share a project as client A
1692        fs.insert_tree(
1693            "/a",
1694            json!({
1695                "a.txt": "a-contents",
1696                "b.txt": "b-contents",
1697            }),
1698        )
1699        .await;
1700        let project_a = cx_a.update(|cx| {
1701            Project::local(
1702                client_a.clone(),
1703                client_a.user_store.clone(),
1704                lang_registry.clone(),
1705                fs.clone(),
1706                cx,
1707            )
1708        });
1709        let (worktree_a, _) = project_a
1710            .update(cx_a, |p, cx| {
1711                p.find_or_create_local_worktree("/a", true, cx)
1712            })
1713            .await
1714            .unwrap();
1715        worktree_a
1716            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1717            .await;
1718        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1719        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1720        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1721        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1722
1723        // Join that project as client B
1724        let project_b = Project::remote(
1725            project_id,
1726            client_b.clone(),
1727            client_b.user_store.clone(),
1728            lang_registry.clone(),
1729            fs.clone(),
1730            &mut cx_b.to_async(),
1731        )
1732        .await
1733        .unwrap();
1734        project_b
1735            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1736            .await
1737            .unwrap();
1738
1739        // Unshare the project as client A
1740        project_a.update(cx_a, |project, cx| project.unshare(cx));
1741        project_b
1742            .condition(cx_b, |project, _| project.is_read_only())
1743            .await;
1744        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1745        cx_b.update(|_| {
1746            drop(project_b);
1747        });
1748
1749        // Share the project again and ensure guests can still join.
1750        project_a
1751            .update(cx_a, |project, cx| project.share(cx))
1752            .await
1753            .unwrap();
1754        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1755
1756        let project_b2 = Project::remote(
1757            project_id,
1758            client_b.clone(),
1759            client_b.user_store.clone(),
1760            lang_registry.clone(),
1761            fs.clone(),
1762            &mut cx_b.to_async(),
1763        )
1764        .await
1765        .unwrap();
1766        project_b2
1767            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1768            .await
1769            .unwrap();
1770    }
1771
1772    #[gpui::test(iterations = 10)]
1773    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1774        let lang_registry = Arc::new(LanguageRegistry::test());
1775        let fs = FakeFs::new(cx_a.background());
1776        cx_a.foreground().forbid_parking();
1777
1778        // Connect to a server as 2 clients.
1779        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1780        let client_a = server.create_client(cx_a, "user_a").await;
1781        let client_b = server.create_client(cx_b, "user_b").await;
1782        server
1783            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1784            .await;
1785
1786        // Share a project as client A
1787        fs.insert_tree(
1788            "/a",
1789            json!({
1790                "a.txt": "a-contents",
1791                "b.txt": "b-contents",
1792            }),
1793        )
1794        .await;
1795        let project_a = cx_a.update(|cx| {
1796            Project::local(
1797                client_a.clone(),
1798                client_a.user_store.clone(),
1799                lang_registry.clone(),
1800                fs.clone(),
1801                cx,
1802            )
1803        });
1804        let (worktree_a, _) = project_a
1805            .update(cx_a, |p, cx| {
1806                p.find_or_create_local_worktree("/a", true, cx)
1807            })
1808            .await
1809            .unwrap();
1810        worktree_a
1811            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1812            .await;
1813        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1814        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1815        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1816        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1817
1818        // Join that project as client B
1819        let project_b = Project::remote(
1820            project_id,
1821            client_b.clone(),
1822            client_b.user_store.clone(),
1823            lang_registry.clone(),
1824            fs.clone(),
1825            &mut cx_b.to_async(),
1826        )
1827        .await
1828        .unwrap();
1829        project_b
1830            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1831            .await
1832            .unwrap();
1833
1834        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1835        server.disconnect_client(client_a.current_user_id(cx_a));
1836        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1837        project_a
1838            .condition(cx_a, |project, _| project.collaborators().is_empty())
1839            .await;
1840        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1841        project_b
1842            .condition(cx_b, |project, _| project.is_read_only())
1843            .await;
1844        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1845        cx_b.update(|_| {
1846            drop(project_b);
1847        });
1848
1849        // Await reconnection
1850        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1851
1852        // Share the project again and ensure guests can still join.
1853        project_a
1854            .update(cx_a, |project, cx| project.share(cx))
1855            .await
1856            .unwrap();
1857        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1858
1859        let project_b2 = Project::remote(
1860            project_id,
1861            client_b.clone(),
1862            client_b.user_store.clone(),
1863            lang_registry.clone(),
1864            fs.clone(),
1865            &mut cx_b.to_async(),
1866        )
1867        .await
1868        .unwrap();
1869        project_b2
1870            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1871            .await
1872            .unwrap();
1873    }
1874
1875    #[gpui::test(iterations = 10)]
1876    async fn test_propagate_saves_and_fs_changes(
1877        cx_a: &mut TestAppContext,
1878        cx_b: &mut TestAppContext,
1879        cx_c: &mut TestAppContext,
1880    ) {
1881        let lang_registry = Arc::new(LanguageRegistry::test());
1882        let fs = FakeFs::new(cx_a.background());
1883        cx_a.foreground().forbid_parking();
1884
1885        // Connect to a server as 3 clients.
1886        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1887        let client_a = server.create_client(cx_a, "user_a").await;
1888        let client_b = server.create_client(cx_b, "user_b").await;
1889        let client_c = server.create_client(cx_c, "user_c").await;
1890        server
1891            .make_contacts(vec![
1892                (&client_a, cx_a),
1893                (&client_b, cx_b),
1894                (&client_c, cx_c),
1895            ])
1896            .await;
1897
1898        // Share a worktree as client A.
1899        fs.insert_tree(
1900            "/a",
1901            json!({
1902                "file1": "",
1903                "file2": ""
1904            }),
1905        )
1906        .await;
1907        let project_a = cx_a.update(|cx| {
1908            Project::local(
1909                client_a.clone(),
1910                client_a.user_store.clone(),
1911                lang_registry.clone(),
1912                fs.clone(),
1913                cx,
1914            )
1915        });
1916        let (worktree_a, _) = project_a
1917            .update(cx_a, |p, cx| {
1918                p.find_or_create_local_worktree("/a", true, cx)
1919            })
1920            .await
1921            .unwrap();
1922        worktree_a
1923            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1924            .await;
1925        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1926        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1927        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1928
1929        // Join that worktree as clients B and C.
1930        let project_b = Project::remote(
1931            project_id,
1932            client_b.clone(),
1933            client_b.user_store.clone(),
1934            lang_registry.clone(),
1935            fs.clone(),
1936            &mut cx_b.to_async(),
1937        )
1938        .await
1939        .unwrap();
1940        let project_c = Project::remote(
1941            project_id,
1942            client_c.clone(),
1943            client_c.user_store.clone(),
1944            lang_registry.clone(),
1945            fs.clone(),
1946            &mut cx_c.to_async(),
1947        )
1948        .await
1949        .unwrap();
1950        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1951        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1952
1953        // Open and edit a buffer as both guests B and C.
1954        let buffer_b = project_b
1955            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1956            .await
1957            .unwrap();
1958        let buffer_c = project_c
1959            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1960            .await
1961            .unwrap();
1962        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1963        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1964
1965        // Open and edit that buffer as the host.
1966        let buffer_a = project_a
1967            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1968            .await
1969            .unwrap();
1970
1971        buffer_a
1972            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1973            .await;
1974        buffer_a.update(cx_a, |buf, cx| {
1975            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1976        });
1977
1978        // Wait for edits to propagate
1979        buffer_a
1980            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1981            .await;
1982        buffer_b
1983            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1984            .await;
1985        buffer_c
1986            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1987            .await;
1988
1989        // Edit the buffer as the host and concurrently save as guest B.
1990        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1991        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
1992        save_b.await.unwrap();
1993        assert_eq!(
1994            fs.load("/a/file1".as_ref()).await.unwrap(),
1995            "hi-a, i-am-c, i-am-b, i-am-a"
1996        );
1997        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1998        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1999        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2000
2001        worktree_a.flush_fs_events(cx_a).await;
2002
2003        // Make changes on host's file system, see those changes on guest worktrees.
2004        fs.rename(
2005            "/a/file1".as_ref(),
2006            "/a/file1-renamed".as_ref(),
2007            Default::default(),
2008        )
2009        .await
2010        .unwrap();
2011
2012        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2013            .await
2014            .unwrap();
2015        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2016
2017        worktree_a
2018            .condition(&cx_a, |tree, _| {
2019                tree.paths()
2020                    .map(|p| p.to_string_lossy())
2021                    .collect::<Vec<_>>()
2022                    == ["file1-renamed", "file3", "file4"]
2023            })
2024            .await;
2025        worktree_b
2026            .condition(&cx_b, |tree, _| {
2027                tree.paths()
2028                    .map(|p| p.to_string_lossy())
2029                    .collect::<Vec<_>>()
2030                    == ["file1-renamed", "file3", "file4"]
2031            })
2032            .await;
2033        worktree_c
2034            .condition(&cx_c, |tree, _| {
2035                tree.paths()
2036                    .map(|p| p.to_string_lossy())
2037                    .collect::<Vec<_>>()
2038                    == ["file1-renamed", "file3", "file4"]
2039            })
2040            .await;
2041
2042        // Ensure buffer files are updated as well.
2043        buffer_a
2044            .condition(&cx_a, |buf, _| {
2045                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2046            })
2047            .await;
2048        buffer_b
2049            .condition(&cx_b, |buf, _| {
2050                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2051            })
2052            .await;
2053        buffer_c
2054            .condition(&cx_c, |buf, _| {
2055                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2056            })
2057            .await;
2058    }
2059
2060    #[gpui::test(iterations = 10)]
2061    async fn test_fs_operations(
2062        executor: Arc<Deterministic>,
2063        cx_a: &mut TestAppContext,
2064        cx_b: &mut TestAppContext,
2065    ) {
2066        executor.forbid_parking();
2067        let fs = FakeFs::new(cx_a.background());
2068
2069        // Connect to a server as 2 clients.
2070        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2071        let mut client_a = server.create_client(cx_a, "user_a").await;
2072        let mut client_b = server.create_client(cx_b, "user_b").await;
2073        server
2074            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2075            .await;
2076
2077        // Share a project as client A
2078        fs.insert_tree(
2079            "/dir",
2080            json!({
2081                "a.txt": "a-contents",
2082                "b.txt": "b-contents",
2083            }),
2084        )
2085        .await;
2086
2087        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2088        let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
2089        project_a
2090            .update(cx_a, |project, cx| project.share(cx))
2091            .await
2092            .unwrap();
2093
2094        let project_b = client_b.build_remote_project(project_id, cx_b).await;
2095
2096        let worktree_a =
2097            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2098        let worktree_b =
2099            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2100
2101        let entry = project_b
2102            .update(cx_b, |project, cx| {
2103                project
2104                    .create_entry((worktree_id, "c.txt"), false, cx)
2105                    .unwrap()
2106            })
2107            .await
2108            .unwrap();
2109        worktree_a.read_with(cx_a, |worktree, _| {
2110            assert_eq!(
2111                worktree
2112                    .paths()
2113                    .map(|p| p.to_string_lossy())
2114                    .collect::<Vec<_>>(),
2115                ["a.txt", "b.txt", "c.txt"]
2116            );
2117        });
2118        worktree_b.read_with(cx_b, |worktree, _| {
2119            assert_eq!(
2120                worktree
2121                    .paths()
2122                    .map(|p| p.to_string_lossy())
2123                    .collect::<Vec<_>>(),
2124                ["a.txt", "b.txt", "c.txt"]
2125            );
2126        });
2127
2128        project_b
2129            .update(cx_b, |project, cx| {
2130                project.rename_entry(entry.id, Path::new("d.txt"), cx)
2131            })
2132            .unwrap()
2133            .await
2134            .unwrap();
2135        worktree_a.read_with(cx_a, |worktree, _| {
2136            assert_eq!(
2137                worktree
2138                    .paths()
2139                    .map(|p| p.to_string_lossy())
2140                    .collect::<Vec<_>>(),
2141                ["a.txt", "b.txt", "d.txt"]
2142            );
2143        });
2144        worktree_b.read_with(cx_b, |worktree, _| {
2145            assert_eq!(
2146                worktree
2147                    .paths()
2148                    .map(|p| p.to_string_lossy())
2149                    .collect::<Vec<_>>(),
2150                ["a.txt", "b.txt", "d.txt"]
2151            );
2152        });
2153
2154        let dir_entry = project_b
2155            .update(cx_b, |project, cx| {
2156                project
2157                    .create_entry((worktree_id, "DIR"), true, cx)
2158                    .unwrap()
2159            })
2160            .await
2161            .unwrap();
2162        worktree_a.read_with(cx_a, |worktree, _| {
2163            assert_eq!(
2164                worktree
2165                    .paths()
2166                    .map(|p| p.to_string_lossy())
2167                    .collect::<Vec<_>>(),
2168                ["DIR", "a.txt", "b.txt", "d.txt"]
2169            );
2170        });
2171        worktree_b.read_with(cx_b, |worktree, _| {
2172            assert_eq!(
2173                worktree
2174                    .paths()
2175                    .map(|p| p.to_string_lossy())
2176                    .collect::<Vec<_>>(),
2177                ["DIR", "a.txt", "b.txt", "d.txt"]
2178            );
2179        });
2180
2181        project_b
2182            .update(cx_b, |project, cx| {
2183                project.delete_entry(dir_entry.id, cx).unwrap()
2184            })
2185            .await
2186            .unwrap();
2187        worktree_a.read_with(cx_a, |worktree, _| {
2188            assert_eq!(
2189                worktree
2190                    .paths()
2191                    .map(|p| p.to_string_lossy())
2192                    .collect::<Vec<_>>(),
2193                ["a.txt", "b.txt", "d.txt"]
2194            );
2195        });
2196        worktree_b.read_with(cx_b, |worktree, _| {
2197            assert_eq!(
2198                worktree
2199                    .paths()
2200                    .map(|p| p.to_string_lossy())
2201                    .collect::<Vec<_>>(),
2202                ["a.txt", "b.txt", "d.txt"]
2203            );
2204        });
2205
2206        project_b
2207            .update(cx_b, |project, cx| {
2208                project.delete_entry(entry.id, cx).unwrap()
2209            })
2210            .await
2211            .unwrap();
2212        worktree_a.read_with(cx_a, |worktree, _| {
2213            assert_eq!(
2214                worktree
2215                    .paths()
2216                    .map(|p| p.to_string_lossy())
2217                    .collect::<Vec<_>>(),
2218                ["a.txt", "b.txt"]
2219            );
2220        });
2221        worktree_b.read_with(cx_b, |worktree, _| {
2222            assert_eq!(
2223                worktree
2224                    .paths()
2225                    .map(|p| p.to_string_lossy())
2226                    .collect::<Vec<_>>(),
2227                ["a.txt", "b.txt"]
2228            );
2229        });
2230    }
2231
2232    #[gpui::test(iterations = 10)]
2233    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2234        cx_a.foreground().forbid_parking();
2235        let lang_registry = Arc::new(LanguageRegistry::test());
2236        let fs = FakeFs::new(cx_a.background());
2237
2238        // Connect to a server as 2 clients.
2239        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2240        let client_a = server.create_client(cx_a, "user_a").await;
2241        let client_b = server.create_client(cx_b, "user_b").await;
2242        server
2243            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2244            .await;
2245
2246        // Share a project as client A
2247        fs.insert_tree(
2248            "/dir",
2249            json!({
2250                "a.txt": "a-contents",
2251            }),
2252        )
2253        .await;
2254
2255        let project_a = cx_a.update(|cx| {
2256            Project::local(
2257                client_a.clone(),
2258                client_a.user_store.clone(),
2259                lang_registry.clone(),
2260                fs.clone(),
2261                cx,
2262            )
2263        });
2264        let (worktree_a, _) = project_a
2265            .update(cx_a, |p, cx| {
2266                p.find_or_create_local_worktree("/dir", true, cx)
2267            })
2268            .await
2269            .unwrap();
2270        worktree_a
2271            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2272            .await;
2273        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2274        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2275        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2276
2277        // Join that project as client B
2278        let project_b = Project::remote(
2279            project_id,
2280            client_b.clone(),
2281            client_b.user_store.clone(),
2282            lang_registry.clone(),
2283            fs.clone(),
2284            &mut cx_b.to_async(),
2285        )
2286        .await
2287        .unwrap();
2288
2289        // Open a buffer as client B
2290        let buffer_b = project_b
2291            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2292            .await
2293            .unwrap();
2294
2295        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2296        buffer_b.read_with(cx_b, |buf, _| {
2297            assert!(buf.is_dirty());
2298            assert!(!buf.has_conflict());
2299        });
2300
2301        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2302        buffer_b
2303            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2304            .await;
2305        buffer_b.read_with(cx_b, |buf, _| {
2306            assert!(!buf.has_conflict());
2307        });
2308
2309        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2310        buffer_b.read_with(cx_b, |buf, _| {
2311            assert!(buf.is_dirty());
2312            assert!(!buf.has_conflict());
2313        });
2314    }
2315
2316    #[gpui::test(iterations = 10)]
2317    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2318        cx_a.foreground().forbid_parking();
2319        let lang_registry = Arc::new(LanguageRegistry::test());
2320        let fs = FakeFs::new(cx_a.background());
2321
2322        // Connect to a server as 2 clients.
2323        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2324        let client_a = server.create_client(cx_a, "user_a").await;
2325        let client_b = server.create_client(cx_b, "user_b").await;
2326        server
2327            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2328            .await;
2329
2330        // Share a project as client A
2331        fs.insert_tree(
2332            "/dir",
2333            json!({
2334                "a.txt": "a-contents",
2335            }),
2336        )
2337        .await;
2338
2339        let project_a = cx_a.update(|cx| {
2340            Project::local(
2341                client_a.clone(),
2342                client_a.user_store.clone(),
2343                lang_registry.clone(),
2344                fs.clone(),
2345                cx,
2346            )
2347        });
2348        let (worktree_a, _) = project_a
2349            .update(cx_a, |p, cx| {
2350                p.find_or_create_local_worktree("/dir", true, cx)
2351            })
2352            .await
2353            .unwrap();
2354        worktree_a
2355            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2356            .await;
2357        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2358        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2359        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2360
2361        // Join that project as client B
2362        let project_b = Project::remote(
2363            project_id,
2364            client_b.clone(),
2365            client_b.user_store.clone(),
2366            lang_registry.clone(),
2367            fs.clone(),
2368            &mut cx_b.to_async(),
2369        )
2370        .await
2371        .unwrap();
2372        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2373
2374        // Open a buffer as client B
2375        let buffer_b = project_b
2376            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2377            .await
2378            .unwrap();
2379        buffer_b.read_with(cx_b, |buf, _| {
2380            assert!(!buf.is_dirty());
2381            assert!(!buf.has_conflict());
2382        });
2383
2384        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2385            .await
2386            .unwrap();
2387        buffer_b
2388            .condition(&cx_b, |buf, _| {
2389                buf.text() == "new contents" && !buf.is_dirty()
2390            })
2391            .await;
2392        buffer_b.read_with(cx_b, |buf, _| {
2393            assert!(!buf.has_conflict());
2394        });
2395    }
2396
2397    #[gpui::test(iterations = 10)]
2398    async fn test_editing_while_guest_opens_buffer(
2399        cx_a: &mut TestAppContext,
2400        cx_b: &mut TestAppContext,
2401    ) {
2402        cx_a.foreground().forbid_parking();
2403        let lang_registry = Arc::new(LanguageRegistry::test());
2404        let fs = FakeFs::new(cx_a.background());
2405
2406        // Connect to a server as 2 clients.
2407        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2408        let client_a = server.create_client(cx_a, "user_a").await;
2409        let client_b = server.create_client(cx_b, "user_b").await;
2410        server
2411            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2412            .await;
2413
2414        // Share a project as client A
2415        fs.insert_tree(
2416            "/dir",
2417            json!({
2418                "a.txt": "a-contents",
2419            }),
2420        )
2421        .await;
2422        let project_a = cx_a.update(|cx| {
2423            Project::local(
2424                client_a.clone(),
2425                client_a.user_store.clone(),
2426                lang_registry.clone(),
2427                fs.clone(),
2428                cx,
2429            )
2430        });
2431        let (worktree_a, _) = project_a
2432            .update(cx_a, |p, cx| {
2433                p.find_or_create_local_worktree("/dir", true, cx)
2434            })
2435            .await
2436            .unwrap();
2437        worktree_a
2438            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2439            .await;
2440        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2441        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2442        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2443
2444        // Join that project as client B
2445        let project_b = Project::remote(
2446            project_id,
2447            client_b.clone(),
2448            client_b.user_store.clone(),
2449            lang_registry.clone(),
2450            fs.clone(),
2451            &mut cx_b.to_async(),
2452        )
2453        .await
2454        .unwrap();
2455
2456        // Open a buffer as client A
2457        let buffer_a = project_a
2458            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2459            .await
2460            .unwrap();
2461
2462        // Start opening the same buffer as client B
2463        let buffer_b = cx_b
2464            .background()
2465            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2466
2467        // Edit the buffer as client A while client B is still opening it.
2468        cx_b.background().simulate_random_delay().await;
2469        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2470        cx_b.background().simulate_random_delay().await;
2471        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2472
2473        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2474        let buffer_b = buffer_b.await.unwrap();
2475        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2476    }
2477
2478    #[gpui::test(iterations = 10)]
2479    async fn test_leaving_worktree_while_opening_buffer(
2480        cx_a: &mut TestAppContext,
2481        cx_b: &mut TestAppContext,
2482    ) {
2483        cx_a.foreground().forbid_parking();
2484        let lang_registry = Arc::new(LanguageRegistry::test());
2485        let fs = FakeFs::new(cx_a.background());
2486
2487        // Connect to a server as 2 clients.
2488        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2489        let client_a = server.create_client(cx_a, "user_a").await;
2490        let client_b = server.create_client(cx_b, "user_b").await;
2491        server
2492            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2493            .await;
2494
2495        // Share a project as client A
2496        fs.insert_tree(
2497            "/dir",
2498            json!({
2499                "a.txt": "a-contents",
2500            }),
2501        )
2502        .await;
2503        let project_a = cx_a.update(|cx| {
2504            Project::local(
2505                client_a.clone(),
2506                client_a.user_store.clone(),
2507                lang_registry.clone(),
2508                fs.clone(),
2509                cx,
2510            )
2511        });
2512        let (worktree_a, _) = project_a
2513            .update(cx_a, |p, cx| {
2514                p.find_or_create_local_worktree("/dir", true, cx)
2515            })
2516            .await
2517            .unwrap();
2518        worktree_a
2519            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2520            .await;
2521        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2522        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2523        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2524
2525        // Join that project as client B
2526        let project_b = Project::remote(
2527            project_id,
2528            client_b.clone(),
2529            client_b.user_store.clone(),
2530            lang_registry.clone(),
2531            fs.clone(),
2532            &mut cx_b.to_async(),
2533        )
2534        .await
2535        .unwrap();
2536
2537        // See that a guest has joined as client A.
2538        project_a
2539            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2540            .await;
2541
2542        // Begin opening a buffer as client B, but leave the project before the open completes.
2543        let buffer_b = cx_b
2544            .background()
2545            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2546        cx_b.update(|_| drop(project_b));
2547        drop(buffer_b);
2548
2549        // See that the guest has left.
2550        project_a
2551            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2552            .await;
2553    }
2554
2555    #[gpui::test(iterations = 10)]
2556    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2557        cx_a.foreground().forbid_parking();
2558        let lang_registry = Arc::new(LanguageRegistry::test());
2559        let fs = FakeFs::new(cx_a.background());
2560
2561        // Connect to a server as 2 clients.
2562        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2563        let client_a = server.create_client(cx_a, "user_a").await;
2564        let client_b = server.create_client(cx_b, "user_b").await;
2565        server
2566            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2567            .await;
2568
2569        // Share a project as client A
2570        fs.insert_tree(
2571            "/a",
2572            json!({
2573                "a.txt": "a-contents",
2574                "b.txt": "b-contents",
2575            }),
2576        )
2577        .await;
2578        let project_a = cx_a.update(|cx| {
2579            Project::local(
2580                client_a.clone(),
2581                client_a.user_store.clone(),
2582                lang_registry.clone(),
2583                fs.clone(),
2584                cx,
2585            )
2586        });
2587        let (worktree_a, _) = project_a
2588            .update(cx_a, |p, cx| {
2589                p.find_or_create_local_worktree("/a", true, cx)
2590            })
2591            .await
2592            .unwrap();
2593        worktree_a
2594            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2595            .await;
2596        let project_id = project_a
2597            .update(cx_a, |project, _| project.next_remote_id())
2598            .await;
2599        project_a
2600            .update(cx_a, |project, cx| project.share(cx))
2601            .await
2602            .unwrap();
2603
2604        // Join that project as client B
2605        let _project_b = Project::remote(
2606            project_id,
2607            client_b.clone(),
2608            client_b.user_store.clone(),
2609            lang_registry.clone(),
2610            fs.clone(),
2611            &mut cx_b.to_async(),
2612        )
2613        .await
2614        .unwrap();
2615
2616        // Client A sees that a guest has joined.
2617        project_a
2618            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2619            .await;
2620
2621        // Drop client B's connection and ensure client A observes client B leaving the project.
2622        client_b.disconnect(&cx_b.to_async()).unwrap();
2623        project_a
2624            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2625            .await;
2626
2627        // Rejoin the project as client B
2628        let _project_b = Project::remote(
2629            project_id,
2630            client_b.clone(),
2631            client_b.user_store.clone(),
2632            lang_registry.clone(),
2633            fs.clone(),
2634            &mut cx_b.to_async(),
2635        )
2636        .await
2637        .unwrap();
2638
2639        // Client A sees that a guest has re-joined.
2640        project_a
2641            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2642            .await;
2643
2644        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2645        client_b.wait_for_current_user(cx_b).await;
2646        server.disconnect_client(client_b.current_user_id(cx_b));
2647        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2648        project_a
2649            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2650            .await;
2651    }
2652
2653    #[gpui::test(iterations = 10)]
2654    async fn test_collaborating_with_diagnostics(
2655        cx_a: &mut TestAppContext,
2656        cx_b: &mut TestAppContext,
2657    ) {
2658        cx_a.foreground().forbid_parking();
2659        let lang_registry = Arc::new(LanguageRegistry::test());
2660        let fs = FakeFs::new(cx_a.background());
2661
2662        // Set up a fake language server.
2663        let mut language = Language::new(
2664            LanguageConfig {
2665                name: "Rust".into(),
2666                path_suffixes: vec!["rs".to_string()],
2667                ..Default::default()
2668            },
2669            Some(tree_sitter_rust::language()),
2670        );
2671        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2672        lang_registry.add(Arc::new(language));
2673
2674        // Connect to a server as 2 clients.
2675        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2676        let client_a = server.create_client(cx_a, "user_a").await;
2677        let client_b = server.create_client(cx_b, "user_b").await;
2678        server
2679            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2680            .await;
2681
2682        // Share a project as client A
2683        fs.insert_tree(
2684            "/a",
2685            json!({
2686                "a.rs": "let one = two",
2687                "other.rs": "",
2688            }),
2689        )
2690        .await;
2691        let project_a = cx_a.update(|cx| {
2692            Project::local(
2693                client_a.clone(),
2694                client_a.user_store.clone(),
2695                lang_registry.clone(),
2696                fs.clone(),
2697                cx,
2698            )
2699        });
2700        let (worktree_a, _) = project_a
2701            .update(cx_a, |p, cx| {
2702                p.find_or_create_local_worktree("/a", true, cx)
2703            })
2704            .await
2705            .unwrap();
2706        worktree_a
2707            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2708            .await;
2709        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2710        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2711        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2712
2713        // Cause the language server to start.
2714        let _ = cx_a
2715            .background()
2716            .spawn(project_a.update(cx_a, |project, cx| {
2717                project.open_buffer(
2718                    ProjectPath {
2719                        worktree_id,
2720                        path: Path::new("other.rs").into(),
2721                    },
2722                    cx,
2723                )
2724            }))
2725            .await
2726            .unwrap();
2727
2728        // Simulate a language server reporting errors for a file.
2729        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2730        fake_language_server
2731            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2732            .await;
2733        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2734            lsp::PublishDiagnosticsParams {
2735                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2736                version: None,
2737                diagnostics: vec![lsp::Diagnostic {
2738                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2739                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2740                    message: "message 1".to_string(),
2741                    ..Default::default()
2742                }],
2743            },
2744        );
2745
2746        // Wait for server to see the diagnostics update.
2747        server
2748            .condition(|store| {
2749                let worktree = store
2750                    .project(project_id)
2751                    .unwrap()
2752                    .share
2753                    .as_ref()
2754                    .unwrap()
2755                    .worktrees
2756                    .get(&worktree_id.to_proto())
2757                    .unwrap();
2758
2759                !worktree.diagnostic_summaries.is_empty()
2760            })
2761            .await;
2762
2763        // Join the worktree as client B.
2764        let project_b = Project::remote(
2765            project_id,
2766            client_b.clone(),
2767            client_b.user_store.clone(),
2768            lang_registry.clone(),
2769            fs.clone(),
2770            &mut cx_b.to_async(),
2771        )
2772        .await
2773        .unwrap();
2774
2775        project_b.read_with(cx_b, |project, cx| {
2776            assert_eq!(
2777                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2778                &[(
2779                    ProjectPath {
2780                        worktree_id,
2781                        path: Arc::from(Path::new("a.rs")),
2782                    },
2783                    DiagnosticSummary {
2784                        error_count: 1,
2785                        warning_count: 0,
2786                        ..Default::default()
2787                    },
2788                )]
2789            )
2790        });
2791
2792        // Simulate a language server reporting more errors for a file.
2793        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2794            lsp::PublishDiagnosticsParams {
2795                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2796                version: None,
2797                diagnostics: vec![
2798                    lsp::Diagnostic {
2799                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2800                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2801                        message: "message 1".to_string(),
2802                        ..Default::default()
2803                    },
2804                    lsp::Diagnostic {
2805                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2806                        range: lsp::Range::new(
2807                            lsp::Position::new(0, 10),
2808                            lsp::Position::new(0, 13),
2809                        ),
2810                        message: "message 2".to_string(),
2811                        ..Default::default()
2812                    },
2813                ],
2814            },
2815        );
2816
2817        // Client b gets the updated summaries
2818        project_b
2819            .condition(&cx_b, |project, cx| {
2820                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2821                    == &[(
2822                        ProjectPath {
2823                            worktree_id,
2824                            path: Arc::from(Path::new("a.rs")),
2825                        },
2826                        DiagnosticSummary {
2827                            error_count: 1,
2828                            warning_count: 1,
2829                            ..Default::default()
2830                        },
2831                    )]
2832            })
2833            .await;
2834
2835        // Open the file with the errors on client B. They should be present.
2836        let buffer_b = cx_b
2837            .background()
2838            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2839            .await
2840            .unwrap();
2841
2842        buffer_b.read_with(cx_b, |buffer, _| {
2843            assert_eq!(
2844                buffer
2845                    .snapshot()
2846                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2847                    .map(|entry| entry)
2848                    .collect::<Vec<_>>(),
2849                &[
2850                    DiagnosticEntry {
2851                        range: Point::new(0, 4)..Point::new(0, 7),
2852                        diagnostic: Diagnostic {
2853                            group_id: 0,
2854                            message: "message 1".to_string(),
2855                            severity: lsp::DiagnosticSeverity::ERROR,
2856                            is_primary: true,
2857                            ..Default::default()
2858                        }
2859                    },
2860                    DiagnosticEntry {
2861                        range: Point::new(0, 10)..Point::new(0, 13),
2862                        diagnostic: Diagnostic {
2863                            group_id: 1,
2864                            severity: lsp::DiagnosticSeverity::WARNING,
2865                            message: "message 2".to_string(),
2866                            is_primary: true,
2867                            ..Default::default()
2868                        }
2869                    }
2870                ]
2871            );
2872        });
2873
2874        // Simulate a language server reporting no errors for a file.
2875        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2876            lsp::PublishDiagnosticsParams {
2877                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2878                version: None,
2879                diagnostics: vec![],
2880            },
2881        );
2882        project_a
2883            .condition(cx_a, |project, cx| {
2884                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2885            })
2886            .await;
2887        project_b
2888            .condition(cx_b, |project, cx| {
2889                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2890            })
2891            .await;
2892    }
2893
2894    #[gpui::test(iterations = 10)]
2895    async fn test_collaborating_with_completion(
2896        cx_a: &mut TestAppContext,
2897        cx_b: &mut TestAppContext,
2898    ) {
2899        cx_a.foreground().forbid_parking();
2900        let lang_registry = Arc::new(LanguageRegistry::test());
2901        let fs = FakeFs::new(cx_a.background());
2902
2903        // Set up a fake language server.
2904        let mut language = Language::new(
2905            LanguageConfig {
2906                name: "Rust".into(),
2907                path_suffixes: vec!["rs".to_string()],
2908                ..Default::default()
2909            },
2910            Some(tree_sitter_rust::language()),
2911        );
2912        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2913            capabilities: lsp::ServerCapabilities {
2914                completion_provider: Some(lsp::CompletionOptions {
2915                    trigger_characters: Some(vec![".".to_string()]),
2916                    ..Default::default()
2917                }),
2918                ..Default::default()
2919            },
2920            ..Default::default()
2921        });
2922        lang_registry.add(Arc::new(language));
2923
2924        // Connect to a server as 2 clients.
2925        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2926        let client_a = server.create_client(cx_a, "user_a").await;
2927        let client_b = server.create_client(cx_b, "user_b").await;
2928        server
2929            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2930            .await;
2931
2932        // Share a project as client A
2933        fs.insert_tree(
2934            "/a",
2935            json!({
2936                "main.rs": "fn main() { a }",
2937                "other.rs": "",
2938            }),
2939        )
2940        .await;
2941        let project_a = cx_a.update(|cx| {
2942            Project::local(
2943                client_a.clone(),
2944                client_a.user_store.clone(),
2945                lang_registry.clone(),
2946                fs.clone(),
2947                cx,
2948            )
2949        });
2950        let (worktree_a, _) = project_a
2951            .update(cx_a, |p, cx| {
2952                p.find_or_create_local_worktree("/a", true, cx)
2953            })
2954            .await
2955            .unwrap();
2956        worktree_a
2957            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2958            .await;
2959        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2960        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2961        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2962
2963        // Join the worktree as client B.
2964        let project_b = Project::remote(
2965            project_id,
2966            client_b.clone(),
2967            client_b.user_store.clone(),
2968            lang_registry.clone(),
2969            fs.clone(),
2970            &mut cx_b.to_async(),
2971        )
2972        .await
2973        .unwrap();
2974
2975        // Open a file in an editor as the guest.
2976        let buffer_b = project_b
2977            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2978            .await
2979            .unwrap();
2980        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2981        let editor_b = cx_b.add_view(window_b, |cx| {
2982            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2983        });
2984
2985        let fake_language_server = fake_language_servers.next().await.unwrap();
2986        buffer_b
2987            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2988            .await;
2989
2990        // Type a completion trigger character as the guest.
2991        editor_b.update(cx_b, |editor, cx| {
2992            editor.select_ranges([13..13], None, cx);
2993            editor.handle_input(&Input(".".into()), cx);
2994            cx.focus(&editor_b);
2995        });
2996
2997        // Receive a completion request as the host's language server.
2998        // Return some completions from the host's language server.
2999        cx_a.foreground().start_waiting();
3000        fake_language_server
3001            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3002                assert_eq!(
3003                    params.text_document_position.text_document.uri,
3004                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3005                );
3006                assert_eq!(
3007                    params.text_document_position.position,
3008                    lsp::Position::new(0, 14),
3009                );
3010
3011                Ok(Some(lsp::CompletionResponse::Array(vec![
3012                    lsp::CompletionItem {
3013                        label: "first_method(…)".into(),
3014                        detail: Some("fn(&mut self, B) -> C".into()),
3015                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3016                            new_text: "first_method($1)".to_string(),
3017                            range: lsp::Range::new(
3018                                lsp::Position::new(0, 14),
3019                                lsp::Position::new(0, 14),
3020                            ),
3021                        })),
3022                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3023                        ..Default::default()
3024                    },
3025                    lsp::CompletionItem {
3026                        label: "second_method(…)".into(),
3027                        detail: Some("fn(&mut self, C) -> D<E>".into()),
3028                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3029                            new_text: "second_method()".to_string(),
3030                            range: lsp::Range::new(
3031                                lsp::Position::new(0, 14),
3032                                lsp::Position::new(0, 14),
3033                            ),
3034                        })),
3035                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3036                        ..Default::default()
3037                    },
3038                ])))
3039            })
3040            .next()
3041            .await
3042            .unwrap();
3043        cx_a.foreground().finish_waiting();
3044
3045        // Open the buffer on the host.
3046        let buffer_a = project_a
3047            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3048            .await
3049            .unwrap();
3050        buffer_a
3051            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3052            .await;
3053
3054        // Confirm a completion on the guest.
3055        editor_b
3056            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3057            .await;
3058        editor_b.update(cx_b, |editor, cx| {
3059            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3060            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3061        });
3062
3063        // Return a resolved completion from the host's language server.
3064        // The resolved completion has an additional text edit.
3065        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3066            |params, _| async move {
3067                assert_eq!(params.label, "first_method(…)");
3068                Ok(lsp::CompletionItem {
3069                    label: "first_method(…)".into(),
3070                    detail: Some("fn(&mut self, B) -> C".into()),
3071                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3072                        new_text: "first_method($1)".to_string(),
3073                        range: lsp::Range::new(
3074                            lsp::Position::new(0, 14),
3075                            lsp::Position::new(0, 14),
3076                        ),
3077                    })),
3078                    additional_text_edits: Some(vec![lsp::TextEdit {
3079                        new_text: "use d::SomeTrait;\n".to_string(),
3080                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3081                    }]),
3082                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3083                    ..Default::default()
3084                })
3085            },
3086        );
3087
3088        // The additional edit is applied.
3089        buffer_a
3090            .condition(&cx_a, |buffer, _| {
3091                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3092            })
3093            .await;
3094        buffer_b
3095            .condition(&cx_b, |buffer, _| {
3096                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3097            })
3098            .await;
3099    }
3100
3101    #[gpui::test(iterations = 10)]
3102    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3103        cx_a.foreground().forbid_parking();
3104        let lang_registry = Arc::new(LanguageRegistry::test());
3105        let fs = FakeFs::new(cx_a.background());
3106
3107        // Connect to a server as 2 clients.
3108        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3109        let client_a = server.create_client(cx_a, "user_a").await;
3110        let client_b = server.create_client(cx_b, "user_b").await;
3111        server
3112            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3113            .await;
3114
3115        // Share a project as client A
3116        fs.insert_tree(
3117            "/a",
3118            json!({
3119                "a.rs": "let one = 1;",
3120            }),
3121        )
3122        .await;
3123        let project_a = cx_a.update(|cx| {
3124            Project::local(
3125                client_a.clone(),
3126                client_a.user_store.clone(),
3127                lang_registry.clone(),
3128                fs.clone(),
3129                cx,
3130            )
3131        });
3132        let (worktree_a, _) = project_a
3133            .update(cx_a, |p, cx| {
3134                p.find_or_create_local_worktree("/a", true, cx)
3135            })
3136            .await
3137            .unwrap();
3138        worktree_a
3139            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3140            .await;
3141        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3142        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3143        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3144        let buffer_a = project_a
3145            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3146            .await
3147            .unwrap();
3148
3149        // Join the worktree as client B.
3150        let project_b = Project::remote(
3151            project_id,
3152            client_b.clone(),
3153            client_b.user_store.clone(),
3154            lang_registry.clone(),
3155            fs.clone(),
3156            &mut cx_b.to_async(),
3157        )
3158        .await
3159        .unwrap();
3160
3161        let buffer_b = cx_b
3162            .background()
3163            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3164            .await
3165            .unwrap();
3166        buffer_b.update(cx_b, |buffer, cx| {
3167            buffer.edit([(4..7, "six")], cx);
3168            buffer.edit([(10..11, "6")], cx);
3169            assert_eq!(buffer.text(), "let six = 6;");
3170            assert!(buffer.is_dirty());
3171            assert!(!buffer.has_conflict());
3172        });
3173        buffer_a
3174            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3175            .await;
3176
3177        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3178            .await
3179            .unwrap();
3180        buffer_a
3181            .condition(cx_a, |buffer, _| buffer.has_conflict())
3182            .await;
3183        buffer_b
3184            .condition(cx_b, |buffer, _| buffer.has_conflict())
3185            .await;
3186
3187        project_b
3188            .update(cx_b, |project, cx| {
3189                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3190            })
3191            .await
3192            .unwrap();
3193        buffer_a.read_with(cx_a, |buffer, _| {
3194            assert_eq!(buffer.text(), "let seven = 7;");
3195            assert!(!buffer.is_dirty());
3196            assert!(!buffer.has_conflict());
3197        });
3198        buffer_b.read_with(cx_b, |buffer, _| {
3199            assert_eq!(buffer.text(), "let seven = 7;");
3200            assert!(!buffer.is_dirty());
3201            assert!(!buffer.has_conflict());
3202        });
3203
3204        buffer_a.update(cx_a, |buffer, cx| {
3205            // Undoing on the host is a no-op when the reload was initiated by the guest.
3206            buffer.undo(cx);
3207            assert_eq!(buffer.text(), "let seven = 7;");
3208            assert!(!buffer.is_dirty());
3209            assert!(!buffer.has_conflict());
3210        });
3211        buffer_b.update(cx_b, |buffer, cx| {
3212            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3213            buffer.undo(cx);
3214            assert_eq!(buffer.text(), "let six = 6;");
3215            assert!(buffer.is_dirty());
3216            assert!(!buffer.has_conflict());
3217        });
3218    }
3219
3220    #[gpui::test(iterations = 10)]
3221    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3222        cx_a.foreground().forbid_parking();
3223        let lang_registry = Arc::new(LanguageRegistry::test());
3224        let fs = FakeFs::new(cx_a.background());
3225
3226        // Set up a fake language server.
3227        let mut language = Language::new(
3228            LanguageConfig {
3229                name: "Rust".into(),
3230                path_suffixes: vec!["rs".to_string()],
3231                ..Default::default()
3232            },
3233            Some(tree_sitter_rust::language()),
3234        );
3235        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3236        lang_registry.add(Arc::new(language));
3237
3238        // Connect to a server as 2 clients.
3239        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3240        let client_a = server.create_client(cx_a, "user_a").await;
3241        let client_b = server.create_client(cx_b, "user_b").await;
3242        server
3243            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3244            .await;
3245
3246        // Share a project as client A
3247        fs.insert_tree(
3248            "/a",
3249            json!({
3250                "a.rs": "let one = two",
3251            }),
3252        )
3253        .await;
3254        let project_a = cx_a.update(|cx| {
3255            Project::local(
3256                client_a.clone(),
3257                client_a.user_store.clone(),
3258                lang_registry.clone(),
3259                fs.clone(),
3260                cx,
3261            )
3262        });
3263        let (worktree_a, _) = project_a
3264            .update(cx_a, |p, cx| {
3265                p.find_or_create_local_worktree("/a", true, cx)
3266            })
3267            .await
3268            .unwrap();
3269        worktree_a
3270            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3271            .await;
3272        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3273        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3274        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3275
3276        // Join the worktree as client B.
3277        let project_b = Project::remote(
3278            project_id,
3279            client_b.clone(),
3280            client_b.user_store.clone(),
3281            lang_registry.clone(),
3282            fs.clone(),
3283            &mut cx_b.to_async(),
3284        )
3285        .await
3286        .unwrap();
3287
3288        let buffer_b = cx_b
3289            .background()
3290            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3291            .await
3292            .unwrap();
3293
3294        let fake_language_server = fake_language_servers.next().await.unwrap();
3295        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3296            Ok(Some(vec![
3297                lsp::TextEdit {
3298                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3299                    new_text: "h".to_string(),
3300                },
3301                lsp::TextEdit {
3302                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3303                    new_text: "y".to_string(),
3304                },
3305            ]))
3306        });
3307
3308        project_b
3309            .update(cx_b, |project, cx| {
3310                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3311            })
3312            .await
3313            .unwrap();
3314        assert_eq!(
3315            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3316            "let honey = two"
3317        );
3318    }
3319
3320    #[gpui::test(iterations = 10)]
3321    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3322        cx_a.foreground().forbid_parking();
3323        let lang_registry = Arc::new(LanguageRegistry::test());
3324        let fs = FakeFs::new(cx_a.background());
3325        fs.insert_tree(
3326            "/root-1",
3327            json!({
3328                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3329            }),
3330        )
3331        .await;
3332        fs.insert_tree(
3333            "/root-2",
3334            json!({
3335                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3336            }),
3337        )
3338        .await;
3339
3340        // Set up a fake language server.
3341        let mut language = Language::new(
3342            LanguageConfig {
3343                name: "Rust".into(),
3344                path_suffixes: vec!["rs".to_string()],
3345                ..Default::default()
3346            },
3347            Some(tree_sitter_rust::language()),
3348        );
3349        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3350        lang_registry.add(Arc::new(language));
3351
3352        // Connect to a server as 2 clients.
3353        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3354        let client_a = server.create_client(cx_a, "user_a").await;
3355        let client_b = server.create_client(cx_b, "user_b").await;
3356        server
3357            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3358            .await;
3359
3360        // Share a project as client A
3361        let project_a = cx_a.update(|cx| {
3362            Project::local(
3363                client_a.clone(),
3364                client_a.user_store.clone(),
3365                lang_registry.clone(),
3366                fs.clone(),
3367                cx,
3368            )
3369        });
3370        let (worktree_a, _) = project_a
3371            .update(cx_a, |p, cx| {
3372                p.find_or_create_local_worktree("/root-1", true, cx)
3373            })
3374            .await
3375            .unwrap();
3376        worktree_a
3377            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3378            .await;
3379        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3380        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3381        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3382
3383        // Join the worktree as client B.
3384        let project_b = Project::remote(
3385            project_id,
3386            client_b.clone(),
3387            client_b.user_store.clone(),
3388            lang_registry.clone(),
3389            fs.clone(),
3390            &mut cx_b.to_async(),
3391        )
3392        .await
3393        .unwrap();
3394
3395        // Open the file on client B.
3396        let buffer_b = cx_b
3397            .background()
3398            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3399            .await
3400            .unwrap();
3401
3402        // Request the definition of a symbol as the guest.
3403        let fake_language_server = fake_language_servers.next().await.unwrap();
3404        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3405            |_, _| async move {
3406                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3407                    lsp::Location::new(
3408                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3409                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3410                    ),
3411                )))
3412            },
3413        );
3414
3415        let definitions_1 = project_b
3416            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3417            .await
3418            .unwrap();
3419        cx_b.read(|cx| {
3420            assert_eq!(definitions_1.len(), 1);
3421            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3422            let target_buffer = definitions_1[0].buffer.read(cx);
3423            assert_eq!(
3424                target_buffer.text(),
3425                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3426            );
3427            assert_eq!(
3428                definitions_1[0].range.to_point(target_buffer),
3429                Point::new(0, 6)..Point::new(0, 9)
3430            );
3431        });
3432
3433        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3434        // the previous call to `definition`.
3435        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3436            |_, _| async move {
3437                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3438                    lsp::Location::new(
3439                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3440                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3441                    ),
3442                )))
3443            },
3444        );
3445
3446        let definitions_2 = project_b
3447            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3448            .await
3449            .unwrap();
3450        cx_b.read(|cx| {
3451            assert_eq!(definitions_2.len(), 1);
3452            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3453            let target_buffer = definitions_2[0].buffer.read(cx);
3454            assert_eq!(
3455                target_buffer.text(),
3456                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3457            );
3458            assert_eq!(
3459                definitions_2[0].range.to_point(target_buffer),
3460                Point::new(1, 6)..Point::new(1, 11)
3461            );
3462        });
3463        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3464    }
3465
3466    #[gpui::test(iterations = 10)]
3467    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3468        cx_a.foreground().forbid_parking();
3469        let lang_registry = Arc::new(LanguageRegistry::test());
3470        let fs = FakeFs::new(cx_a.background());
3471        fs.insert_tree(
3472            "/root-1",
3473            json!({
3474                "one.rs": "const ONE: usize = 1;",
3475                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3476            }),
3477        )
3478        .await;
3479        fs.insert_tree(
3480            "/root-2",
3481            json!({
3482                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3483            }),
3484        )
3485        .await;
3486
3487        // Set up a fake language server.
3488        let mut language = Language::new(
3489            LanguageConfig {
3490                name: "Rust".into(),
3491                path_suffixes: vec!["rs".to_string()],
3492                ..Default::default()
3493            },
3494            Some(tree_sitter_rust::language()),
3495        );
3496        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3497        lang_registry.add(Arc::new(language));
3498
3499        // Connect to a server as 2 clients.
3500        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3501        let client_a = server.create_client(cx_a, "user_a").await;
3502        let client_b = server.create_client(cx_b, "user_b").await;
3503        server
3504            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3505            .await;
3506
3507        // Share a project as client A
3508        let project_a = cx_a.update(|cx| {
3509            Project::local(
3510                client_a.clone(),
3511                client_a.user_store.clone(),
3512                lang_registry.clone(),
3513                fs.clone(),
3514                cx,
3515            )
3516        });
3517        let (worktree_a, _) = project_a
3518            .update(cx_a, |p, cx| {
3519                p.find_or_create_local_worktree("/root-1", true, cx)
3520            })
3521            .await
3522            .unwrap();
3523        worktree_a
3524            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3525            .await;
3526        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3527        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3528        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3529
3530        // Join the worktree as client B.
3531        let project_b = Project::remote(
3532            project_id,
3533            client_b.clone(),
3534            client_b.user_store.clone(),
3535            lang_registry.clone(),
3536            fs.clone(),
3537            &mut cx_b.to_async(),
3538        )
3539        .await
3540        .unwrap();
3541
3542        // Open the file on client B.
3543        let buffer_b = cx_b
3544            .background()
3545            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3546            .await
3547            .unwrap();
3548
3549        // Request references to a symbol as the guest.
3550        let fake_language_server = fake_language_servers.next().await.unwrap();
3551        fake_language_server.handle_request::<lsp::request::References, _, _>(
3552            |params, _| async move {
3553                assert_eq!(
3554                    params.text_document_position.text_document.uri.as_str(),
3555                    "file:///root-1/one.rs"
3556                );
3557                Ok(Some(vec![
3558                    lsp::Location {
3559                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3560                        range: lsp::Range::new(
3561                            lsp::Position::new(0, 24),
3562                            lsp::Position::new(0, 27),
3563                        ),
3564                    },
3565                    lsp::Location {
3566                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3567                        range: lsp::Range::new(
3568                            lsp::Position::new(0, 35),
3569                            lsp::Position::new(0, 38),
3570                        ),
3571                    },
3572                    lsp::Location {
3573                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3574                        range: lsp::Range::new(
3575                            lsp::Position::new(0, 37),
3576                            lsp::Position::new(0, 40),
3577                        ),
3578                    },
3579                ]))
3580            },
3581        );
3582
3583        let references = project_b
3584            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3585            .await
3586            .unwrap();
3587        cx_b.read(|cx| {
3588            assert_eq!(references.len(), 3);
3589            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3590
3591            let two_buffer = references[0].buffer.read(cx);
3592            let three_buffer = references[2].buffer.read(cx);
3593            assert_eq!(
3594                two_buffer.file().unwrap().path().as_ref(),
3595                Path::new("two.rs")
3596            );
3597            assert_eq!(references[1].buffer, references[0].buffer);
3598            assert_eq!(
3599                three_buffer.file().unwrap().full_path(cx),
3600                Path::new("three.rs")
3601            );
3602
3603            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3604            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3605            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3606        });
3607    }
3608
3609    #[gpui::test(iterations = 10)]
3610    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3611        cx_a.foreground().forbid_parking();
3612        let lang_registry = Arc::new(LanguageRegistry::test());
3613        let fs = FakeFs::new(cx_a.background());
3614        fs.insert_tree(
3615            "/root-1",
3616            json!({
3617                "a": "hello world",
3618                "b": "goodnight moon",
3619                "c": "a world of goo",
3620                "d": "world champion of clown world",
3621            }),
3622        )
3623        .await;
3624        fs.insert_tree(
3625            "/root-2",
3626            json!({
3627                "e": "disney world is fun",
3628            }),
3629        )
3630        .await;
3631
3632        // Connect to a server as 2 clients.
3633        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3634        let client_a = server.create_client(cx_a, "user_a").await;
3635        let client_b = server.create_client(cx_b, "user_b").await;
3636        server
3637            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3638            .await;
3639
3640        // Share a project as client A
3641        let project_a = cx_a.update(|cx| {
3642            Project::local(
3643                client_a.clone(),
3644                client_a.user_store.clone(),
3645                lang_registry.clone(),
3646                fs.clone(),
3647                cx,
3648            )
3649        });
3650        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3651
3652        let (worktree_1, _) = project_a
3653            .update(cx_a, |p, cx| {
3654                p.find_or_create_local_worktree("/root-1", true, cx)
3655            })
3656            .await
3657            .unwrap();
3658        worktree_1
3659            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3660            .await;
3661        let (worktree_2, _) = project_a
3662            .update(cx_a, |p, cx| {
3663                p.find_or_create_local_worktree("/root-2", true, cx)
3664            })
3665            .await
3666            .unwrap();
3667        worktree_2
3668            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3669            .await;
3670
3671        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3672
3673        // Join the worktree as client B.
3674        let project_b = Project::remote(
3675            project_id,
3676            client_b.clone(),
3677            client_b.user_store.clone(),
3678            lang_registry.clone(),
3679            fs.clone(),
3680            &mut cx_b.to_async(),
3681        )
3682        .await
3683        .unwrap();
3684
3685        let results = project_b
3686            .update(cx_b, |project, cx| {
3687                project.search(SearchQuery::text("world", false, false), cx)
3688            })
3689            .await
3690            .unwrap();
3691
3692        let mut ranges_by_path = results
3693            .into_iter()
3694            .map(|(buffer, ranges)| {
3695                buffer.read_with(cx_b, |buffer, cx| {
3696                    let path = buffer.file().unwrap().full_path(cx);
3697                    let offset_ranges = ranges
3698                        .into_iter()
3699                        .map(|range| range.to_offset(buffer))
3700                        .collect::<Vec<_>>();
3701                    (path, offset_ranges)
3702                })
3703            })
3704            .collect::<Vec<_>>();
3705        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3706
3707        assert_eq!(
3708            ranges_by_path,
3709            &[
3710                (PathBuf::from("root-1/a"), vec![6..11]),
3711                (PathBuf::from("root-1/c"), vec![2..7]),
3712                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3713                (PathBuf::from("root-2/e"), vec![7..12]),
3714            ]
3715        );
3716    }
3717
3718    #[gpui::test(iterations = 10)]
3719    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3720        cx_a.foreground().forbid_parking();
3721        let lang_registry = Arc::new(LanguageRegistry::test());
3722        let fs = FakeFs::new(cx_a.background());
3723        fs.insert_tree(
3724            "/root-1",
3725            json!({
3726                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3727            }),
3728        )
3729        .await;
3730
3731        // Set up a fake language server.
3732        let mut language = Language::new(
3733            LanguageConfig {
3734                name: "Rust".into(),
3735                path_suffixes: vec!["rs".to_string()],
3736                ..Default::default()
3737            },
3738            Some(tree_sitter_rust::language()),
3739        );
3740        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3741        lang_registry.add(Arc::new(language));
3742
3743        // Connect to a server as 2 clients.
3744        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3745        let client_a = server.create_client(cx_a, "user_a").await;
3746        let client_b = server.create_client(cx_b, "user_b").await;
3747        server
3748            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3749            .await;
3750
3751        // Share a project as client A
3752        let project_a = cx_a.update(|cx| {
3753            Project::local(
3754                client_a.clone(),
3755                client_a.user_store.clone(),
3756                lang_registry.clone(),
3757                fs.clone(),
3758                cx,
3759            )
3760        });
3761        let (worktree_a, _) = project_a
3762            .update(cx_a, |p, cx| {
3763                p.find_or_create_local_worktree("/root-1", true, cx)
3764            })
3765            .await
3766            .unwrap();
3767        worktree_a
3768            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3769            .await;
3770        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3771        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3772        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3773
3774        // Join the worktree as client B.
3775        let project_b = Project::remote(
3776            project_id,
3777            client_b.clone(),
3778            client_b.user_store.clone(),
3779            lang_registry.clone(),
3780            fs.clone(),
3781            &mut cx_b.to_async(),
3782        )
3783        .await
3784        .unwrap();
3785
3786        // Open the file on client B.
3787        let buffer_b = cx_b
3788            .background()
3789            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3790            .await
3791            .unwrap();
3792
3793        // Request document highlights as the guest.
3794        let fake_language_server = fake_language_servers.next().await.unwrap();
3795        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3796            |params, _| async move {
3797                assert_eq!(
3798                    params
3799                        .text_document_position_params
3800                        .text_document
3801                        .uri
3802                        .as_str(),
3803                    "file:///root-1/main.rs"
3804                );
3805                assert_eq!(
3806                    params.text_document_position_params.position,
3807                    lsp::Position::new(0, 34)
3808                );
3809                Ok(Some(vec![
3810                    lsp::DocumentHighlight {
3811                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3812                        range: lsp::Range::new(
3813                            lsp::Position::new(0, 10),
3814                            lsp::Position::new(0, 16),
3815                        ),
3816                    },
3817                    lsp::DocumentHighlight {
3818                        kind: Some(lsp::DocumentHighlightKind::READ),
3819                        range: lsp::Range::new(
3820                            lsp::Position::new(0, 32),
3821                            lsp::Position::new(0, 38),
3822                        ),
3823                    },
3824                    lsp::DocumentHighlight {
3825                        kind: Some(lsp::DocumentHighlightKind::READ),
3826                        range: lsp::Range::new(
3827                            lsp::Position::new(0, 41),
3828                            lsp::Position::new(0, 47),
3829                        ),
3830                    },
3831                ]))
3832            },
3833        );
3834
3835        let highlights = project_b
3836            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3837            .await
3838            .unwrap();
3839        buffer_b.read_with(cx_b, |buffer, _| {
3840            let snapshot = buffer.snapshot();
3841
3842            let highlights = highlights
3843                .into_iter()
3844                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3845                .collect::<Vec<_>>();
3846            assert_eq!(
3847                highlights,
3848                &[
3849                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3850                    (lsp::DocumentHighlightKind::READ, 32..38),
3851                    (lsp::DocumentHighlightKind::READ, 41..47)
3852                ]
3853            )
3854        });
3855    }
3856
3857    #[gpui::test(iterations = 10)]
3858    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3859        cx_a.foreground().forbid_parking();
3860        let lang_registry = Arc::new(LanguageRegistry::test());
3861        let fs = FakeFs::new(cx_a.background());
3862        fs.insert_tree(
3863            "/code",
3864            json!({
3865                "crate-1": {
3866                    "one.rs": "const ONE: usize = 1;",
3867                },
3868                "crate-2": {
3869                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3870                },
3871                "private": {
3872                    "passwords.txt": "the-password",
3873                }
3874            }),
3875        )
3876        .await;
3877
3878        // Set up a fake language server.
3879        let mut language = Language::new(
3880            LanguageConfig {
3881                name: "Rust".into(),
3882                path_suffixes: vec!["rs".to_string()],
3883                ..Default::default()
3884            },
3885            Some(tree_sitter_rust::language()),
3886        );
3887        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3888        lang_registry.add(Arc::new(language));
3889
3890        // Connect to a server as 2 clients.
3891        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3892        let client_a = server.create_client(cx_a, "user_a").await;
3893        let client_b = server.create_client(cx_b, "user_b").await;
3894        server
3895            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3896            .await;
3897
3898        // Share a project as client A
3899        let project_a = cx_a.update(|cx| {
3900            Project::local(
3901                client_a.clone(),
3902                client_a.user_store.clone(),
3903                lang_registry.clone(),
3904                fs.clone(),
3905                cx,
3906            )
3907        });
3908        let (worktree_a, _) = project_a
3909            .update(cx_a, |p, cx| {
3910                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3911            })
3912            .await
3913            .unwrap();
3914        worktree_a
3915            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3916            .await;
3917        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3918        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3919        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3920
3921        // Join the worktree as client B.
3922        let project_b = Project::remote(
3923            project_id,
3924            client_b.clone(),
3925            client_b.user_store.clone(),
3926            lang_registry.clone(),
3927            fs.clone(),
3928            &mut cx_b.to_async(),
3929        )
3930        .await
3931        .unwrap();
3932
3933        // Cause the language server to start.
3934        let _buffer = cx_b
3935            .background()
3936            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3937            .await
3938            .unwrap();
3939
3940        let fake_language_server = fake_language_servers.next().await.unwrap();
3941        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3942            |_, _| async move {
3943                #[allow(deprecated)]
3944                Ok(Some(vec![lsp::SymbolInformation {
3945                    name: "TWO".into(),
3946                    location: lsp::Location {
3947                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3948                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3949                    },
3950                    kind: lsp::SymbolKind::CONSTANT,
3951                    tags: None,
3952                    container_name: None,
3953                    deprecated: None,
3954                }]))
3955            },
3956        );
3957
3958        // Request the definition of a symbol as the guest.
3959        let symbols = project_b
3960            .update(cx_b, |p, cx| p.symbols("two", cx))
3961            .await
3962            .unwrap();
3963        assert_eq!(symbols.len(), 1);
3964        assert_eq!(symbols[0].name, "TWO");
3965
3966        // Open one of the returned symbols.
3967        let buffer_b_2 = project_b
3968            .update(cx_b, |project, cx| {
3969                project.open_buffer_for_symbol(&symbols[0], cx)
3970            })
3971            .await
3972            .unwrap();
3973        buffer_b_2.read_with(cx_b, |buffer, _| {
3974            assert_eq!(
3975                buffer.file().unwrap().path().as_ref(),
3976                Path::new("../crate-2/two.rs")
3977            );
3978        });
3979
3980        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3981        let mut fake_symbol = symbols[0].clone();
3982        fake_symbol.path = Path::new("/code/secrets").into();
3983        let error = project_b
3984            .update(cx_b, |project, cx| {
3985                project.open_buffer_for_symbol(&fake_symbol, cx)
3986            })
3987            .await
3988            .unwrap_err();
3989        assert!(error.to_string().contains("invalid symbol signature"));
3990    }
3991
3992    #[gpui::test(iterations = 10)]
3993    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3994        cx_a: &mut TestAppContext,
3995        cx_b: &mut TestAppContext,
3996        mut rng: StdRng,
3997    ) {
3998        cx_a.foreground().forbid_parking();
3999        let lang_registry = Arc::new(LanguageRegistry::test());
4000        let fs = FakeFs::new(cx_a.background());
4001        fs.insert_tree(
4002            "/root",
4003            json!({
4004                "a.rs": "const ONE: usize = b::TWO;",
4005                "b.rs": "const TWO: usize = 2",
4006            }),
4007        )
4008        .await;
4009
4010        // Set up a fake language server.
4011        let mut language = Language::new(
4012            LanguageConfig {
4013                name: "Rust".into(),
4014                path_suffixes: vec!["rs".to_string()],
4015                ..Default::default()
4016            },
4017            Some(tree_sitter_rust::language()),
4018        );
4019        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4020        lang_registry.add(Arc::new(language));
4021
4022        // Connect to a server as 2 clients.
4023        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4024        let client_a = server.create_client(cx_a, "user_a").await;
4025        let client_b = server.create_client(cx_b, "user_b").await;
4026        server
4027            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4028            .await;
4029
4030        // Share a project as client A
4031        let project_a = cx_a.update(|cx| {
4032            Project::local(
4033                client_a.clone(),
4034                client_a.user_store.clone(),
4035                lang_registry.clone(),
4036                fs.clone(),
4037                cx,
4038            )
4039        });
4040
4041        let (worktree_a, _) = project_a
4042            .update(cx_a, |p, cx| {
4043                p.find_or_create_local_worktree("/root", true, cx)
4044            })
4045            .await
4046            .unwrap();
4047        worktree_a
4048            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4049            .await;
4050        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4051        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4052        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4053
4054        // Join the worktree as client B.
4055        let project_b = Project::remote(
4056            project_id,
4057            client_b.clone(),
4058            client_b.user_store.clone(),
4059            lang_registry.clone(),
4060            fs.clone(),
4061            &mut cx_b.to_async(),
4062        )
4063        .await
4064        .unwrap();
4065
4066        let buffer_b1 = cx_b
4067            .background()
4068            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4069            .await
4070            .unwrap();
4071
4072        let fake_language_server = fake_language_servers.next().await.unwrap();
4073        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4074            |_, _| async move {
4075                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4076                    lsp::Location::new(
4077                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
4078                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4079                    ),
4080                )))
4081            },
4082        );
4083
4084        let definitions;
4085        let buffer_b2;
4086        if rng.gen() {
4087            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4088            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4089        } else {
4090            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4091            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4092        }
4093
4094        let buffer_b2 = buffer_b2.await.unwrap();
4095        let definitions = definitions.await.unwrap();
4096        assert_eq!(definitions.len(), 1);
4097        assert_eq!(definitions[0].buffer, buffer_b2);
4098    }
4099
4100    #[gpui::test(iterations = 10)]
4101    async fn test_collaborating_with_code_actions(
4102        cx_a: &mut TestAppContext,
4103        cx_b: &mut TestAppContext,
4104    ) {
4105        cx_a.foreground().forbid_parking();
4106        let lang_registry = Arc::new(LanguageRegistry::test());
4107        let fs = FakeFs::new(cx_a.background());
4108        cx_b.update(|cx| editor::init(cx));
4109
4110        // Set up a fake language server.
4111        let mut language = Language::new(
4112            LanguageConfig {
4113                name: "Rust".into(),
4114                path_suffixes: vec!["rs".to_string()],
4115                ..Default::default()
4116            },
4117            Some(tree_sitter_rust::language()),
4118        );
4119        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4120        lang_registry.add(Arc::new(language));
4121
4122        // Connect to a server as 2 clients.
4123        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4124        let client_a = server.create_client(cx_a, "user_a").await;
4125        let client_b = server.create_client(cx_b, "user_b").await;
4126        server
4127            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4128            .await;
4129
4130        // Share a project as client A
4131        fs.insert_tree(
4132            "/a",
4133            json!({
4134                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4135                "other.rs": "pub fn foo() -> usize { 4 }",
4136            }),
4137        )
4138        .await;
4139        let project_a = cx_a.update(|cx| {
4140            Project::local(
4141                client_a.clone(),
4142                client_a.user_store.clone(),
4143                lang_registry.clone(),
4144                fs.clone(),
4145                cx,
4146            )
4147        });
4148        let (worktree_a, _) = project_a
4149            .update(cx_a, |p, cx| {
4150                p.find_or_create_local_worktree("/a", true, cx)
4151            })
4152            .await
4153            .unwrap();
4154        worktree_a
4155            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4156            .await;
4157        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4158        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4159        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4160
4161        // Join the worktree as client B.
4162        let project_b = Project::remote(
4163            project_id,
4164            client_b.clone(),
4165            client_b.user_store.clone(),
4166            lang_registry.clone(),
4167            fs.clone(),
4168            &mut cx_b.to_async(),
4169        )
4170        .await
4171        .unwrap();
4172        let mut params = cx_b.update(WorkspaceParams::test);
4173        params.languages = lang_registry.clone();
4174        params.client = client_b.client.clone();
4175        params.user_store = client_b.user_store.clone();
4176        params.project = project_b;
4177
4178        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4179        let editor_b = workspace_b
4180            .update(cx_b, |workspace, cx| {
4181                workspace.open_path((worktree_id, "main.rs"), true, cx)
4182            })
4183            .await
4184            .unwrap()
4185            .downcast::<Editor>()
4186            .unwrap();
4187
4188        let mut fake_language_server = fake_language_servers.next().await.unwrap();
4189        fake_language_server
4190            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4191                assert_eq!(
4192                    params.text_document.uri,
4193                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4194                );
4195                assert_eq!(params.range.start, lsp::Position::new(0, 0));
4196                assert_eq!(params.range.end, lsp::Position::new(0, 0));
4197                Ok(None)
4198            })
4199            .next()
4200            .await;
4201
4202        // Move cursor to a location that contains code actions.
4203        editor_b.update(cx_b, |editor, cx| {
4204            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4205            cx.focus(&editor_b);
4206        });
4207
4208        fake_language_server
4209            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4210                assert_eq!(
4211                    params.text_document.uri,
4212                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4213                );
4214                assert_eq!(params.range.start, lsp::Position::new(1, 31));
4215                assert_eq!(params.range.end, lsp::Position::new(1, 31));
4216
4217                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4218                    lsp::CodeAction {
4219                        title: "Inline into all callers".to_string(),
4220                        edit: Some(lsp::WorkspaceEdit {
4221                            changes: Some(
4222                                [
4223                                    (
4224                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
4225                                        vec![lsp::TextEdit::new(
4226                                            lsp::Range::new(
4227                                                lsp::Position::new(1, 22),
4228                                                lsp::Position::new(1, 34),
4229                                            ),
4230                                            "4".to_string(),
4231                                        )],
4232                                    ),
4233                                    (
4234                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
4235                                        vec![lsp::TextEdit::new(
4236                                            lsp::Range::new(
4237                                                lsp::Position::new(0, 0),
4238                                                lsp::Position::new(0, 27),
4239                                            ),
4240                                            "".to_string(),
4241                                        )],
4242                                    ),
4243                                ]
4244                                .into_iter()
4245                                .collect(),
4246                            ),
4247                            ..Default::default()
4248                        }),
4249                        data: Some(json!({
4250                            "codeActionParams": {
4251                                "range": {
4252                                    "start": {"line": 1, "column": 31},
4253                                    "end": {"line": 1, "column": 31},
4254                                }
4255                            }
4256                        })),
4257                        ..Default::default()
4258                    },
4259                )]))
4260            })
4261            .next()
4262            .await;
4263
4264        // Toggle code actions and wait for them to display.
4265        editor_b.update(cx_b, |editor, cx| {
4266            editor.toggle_code_actions(
4267                &ToggleCodeActions {
4268                    deployed_from_indicator: false,
4269                },
4270                cx,
4271            );
4272        });
4273        editor_b
4274            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4275            .await;
4276
4277        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4278
4279        // Confirming the code action will trigger a resolve request.
4280        let confirm_action = workspace_b
4281            .update(cx_b, |workspace, cx| {
4282                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4283            })
4284            .unwrap();
4285        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4286            |_, _| async move {
4287                Ok(lsp::CodeAction {
4288                    title: "Inline into all callers".to_string(),
4289                    edit: Some(lsp::WorkspaceEdit {
4290                        changes: Some(
4291                            [
4292                                (
4293                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4294                                    vec![lsp::TextEdit::new(
4295                                        lsp::Range::new(
4296                                            lsp::Position::new(1, 22),
4297                                            lsp::Position::new(1, 34),
4298                                        ),
4299                                        "4".to_string(),
4300                                    )],
4301                                ),
4302                                (
4303                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4304                                    vec![lsp::TextEdit::new(
4305                                        lsp::Range::new(
4306                                            lsp::Position::new(0, 0),
4307                                            lsp::Position::new(0, 27),
4308                                        ),
4309                                        "".to_string(),
4310                                    )],
4311                                ),
4312                            ]
4313                            .into_iter()
4314                            .collect(),
4315                        ),
4316                        ..Default::default()
4317                    }),
4318                    ..Default::default()
4319                })
4320            },
4321        );
4322
4323        // After the action is confirmed, an editor containing both modified files is opened.
4324        confirm_action.await.unwrap();
4325        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4326            workspace
4327                .active_item(cx)
4328                .unwrap()
4329                .downcast::<Editor>()
4330                .unwrap()
4331        });
4332        code_action_editor.update(cx_b, |editor, cx| {
4333            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4334            editor.undo(&Undo, cx);
4335            assert_eq!(
4336                editor.text(cx),
4337                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4338            );
4339            editor.redo(&Redo, cx);
4340            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4341        });
4342    }
4343
4344    #[gpui::test(iterations = 10)]
4345    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4346        cx_a.foreground().forbid_parking();
4347        let lang_registry = Arc::new(LanguageRegistry::test());
4348        let fs = FakeFs::new(cx_a.background());
4349        cx_b.update(|cx| editor::init(cx));
4350
4351        // Set up a fake language server.
4352        let mut language = Language::new(
4353            LanguageConfig {
4354                name: "Rust".into(),
4355                path_suffixes: vec!["rs".to_string()],
4356                ..Default::default()
4357            },
4358            Some(tree_sitter_rust::language()),
4359        );
4360        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4361            capabilities: lsp::ServerCapabilities {
4362                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4363                    prepare_provider: Some(true),
4364                    work_done_progress_options: Default::default(),
4365                })),
4366                ..Default::default()
4367            },
4368            ..Default::default()
4369        });
4370        lang_registry.add(Arc::new(language));
4371
4372        // Connect to a server as 2 clients.
4373        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4374        let client_a = server.create_client(cx_a, "user_a").await;
4375        let client_b = server.create_client(cx_b, "user_b").await;
4376        server
4377            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4378            .await;
4379
4380        // Share a project as client A
4381        fs.insert_tree(
4382            "/dir",
4383            json!({
4384                "one.rs": "const ONE: usize = 1;",
4385                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4386            }),
4387        )
4388        .await;
4389        let project_a = cx_a.update(|cx| {
4390            Project::local(
4391                client_a.clone(),
4392                client_a.user_store.clone(),
4393                lang_registry.clone(),
4394                fs.clone(),
4395                cx,
4396            )
4397        });
4398        let (worktree_a, _) = project_a
4399            .update(cx_a, |p, cx| {
4400                p.find_or_create_local_worktree("/dir", true, cx)
4401            })
4402            .await
4403            .unwrap();
4404        worktree_a
4405            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4406            .await;
4407        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4408        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4409        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4410
4411        // Join the worktree as client B.
4412        let project_b = Project::remote(
4413            project_id,
4414            client_b.clone(),
4415            client_b.user_store.clone(),
4416            lang_registry.clone(),
4417            fs.clone(),
4418            &mut cx_b.to_async(),
4419        )
4420        .await
4421        .unwrap();
4422        let mut params = cx_b.update(WorkspaceParams::test);
4423        params.languages = lang_registry.clone();
4424        params.client = client_b.client.clone();
4425        params.user_store = client_b.user_store.clone();
4426        params.project = project_b;
4427
4428        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4429        let editor_b = workspace_b
4430            .update(cx_b, |workspace, cx| {
4431                workspace.open_path((worktree_id, "one.rs"), true, cx)
4432            })
4433            .await
4434            .unwrap()
4435            .downcast::<Editor>()
4436            .unwrap();
4437        let fake_language_server = fake_language_servers.next().await.unwrap();
4438
4439        // Move cursor to a location that can be renamed.
4440        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4441            editor.select_ranges([7..7], None, cx);
4442            editor.rename(&Rename, cx).unwrap()
4443        });
4444
4445        fake_language_server
4446            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4447                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4448                assert_eq!(params.position, lsp::Position::new(0, 7));
4449                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4450                    lsp::Position::new(0, 6),
4451                    lsp::Position::new(0, 9),
4452                ))))
4453            })
4454            .next()
4455            .await
4456            .unwrap();
4457        prepare_rename.await.unwrap();
4458        editor_b.update(cx_b, |editor, cx| {
4459            let rename = editor.pending_rename().unwrap();
4460            let buffer = editor.buffer().read(cx).snapshot(cx);
4461            assert_eq!(
4462                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4463                6..9
4464            );
4465            rename.editor.update(cx, |rename_editor, cx| {
4466                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4467                    rename_buffer.edit([(0..3, "THREE")], cx);
4468                });
4469            });
4470        });
4471
4472        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4473            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4474        });
4475        fake_language_server
4476            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4477                assert_eq!(
4478                    params.text_document_position.text_document.uri.as_str(),
4479                    "file:///dir/one.rs"
4480                );
4481                assert_eq!(
4482                    params.text_document_position.position,
4483                    lsp::Position::new(0, 6)
4484                );
4485                assert_eq!(params.new_name, "THREE");
4486                Ok(Some(lsp::WorkspaceEdit {
4487                    changes: Some(
4488                        [
4489                            (
4490                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4491                                vec![lsp::TextEdit::new(
4492                                    lsp::Range::new(
4493                                        lsp::Position::new(0, 6),
4494                                        lsp::Position::new(0, 9),
4495                                    ),
4496                                    "THREE".to_string(),
4497                                )],
4498                            ),
4499                            (
4500                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4501                                vec![
4502                                    lsp::TextEdit::new(
4503                                        lsp::Range::new(
4504                                            lsp::Position::new(0, 24),
4505                                            lsp::Position::new(0, 27),
4506                                        ),
4507                                        "THREE".to_string(),
4508                                    ),
4509                                    lsp::TextEdit::new(
4510                                        lsp::Range::new(
4511                                            lsp::Position::new(0, 35),
4512                                            lsp::Position::new(0, 38),
4513                                        ),
4514                                        "THREE".to_string(),
4515                                    ),
4516                                ],
4517                            ),
4518                        ]
4519                        .into_iter()
4520                        .collect(),
4521                    ),
4522                    ..Default::default()
4523                }))
4524            })
4525            .next()
4526            .await
4527            .unwrap();
4528        confirm_rename.await.unwrap();
4529
4530        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4531            workspace
4532                .active_item(cx)
4533                .unwrap()
4534                .downcast::<Editor>()
4535                .unwrap()
4536        });
4537        rename_editor.update(cx_b, |editor, cx| {
4538            assert_eq!(
4539                editor.text(cx),
4540                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4541            );
4542            editor.undo(&Undo, cx);
4543            assert_eq!(
4544                editor.text(cx),
4545                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4546            );
4547            editor.redo(&Redo, cx);
4548            assert_eq!(
4549                editor.text(cx),
4550                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4551            );
4552        });
4553
4554        // Ensure temporary rename edits cannot be undone/redone.
4555        editor_b.update(cx_b, |editor, cx| {
4556            editor.undo(&Undo, cx);
4557            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4558            editor.undo(&Undo, cx);
4559            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4560            editor.redo(&Redo, cx);
4561            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4562        })
4563    }
4564
4565    #[gpui::test(iterations = 10)]
4566    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4567        cx_a.foreground().forbid_parking();
4568
4569        // Connect to a server as 2 clients.
4570        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4571        let client_a = server.create_client(cx_a, "user_a").await;
4572        let client_b = server.create_client(cx_b, "user_b").await;
4573
4574        // Create an org that includes these 2 users.
4575        let db = &server.app_state.db;
4576        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4577        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4578            .await
4579            .unwrap();
4580        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4581            .await
4582            .unwrap();
4583
4584        // Create a channel that includes all the users.
4585        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4586        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4587            .await
4588            .unwrap();
4589        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4590            .await
4591            .unwrap();
4592        db.create_channel_message(
4593            channel_id,
4594            client_b.current_user_id(&cx_b),
4595            "hello A, it's B.",
4596            OffsetDateTime::now_utc(),
4597            1,
4598        )
4599        .await
4600        .unwrap();
4601
4602        let channels_a = cx_a
4603            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4604        channels_a
4605            .condition(cx_a, |list, _| list.available_channels().is_some())
4606            .await;
4607        channels_a.read_with(cx_a, |list, _| {
4608            assert_eq!(
4609                list.available_channels().unwrap(),
4610                &[ChannelDetails {
4611                    id: channel_id.to_proto(),
4612                    name: "test-channel".to_string()
4613                }]
4614            )
4615        });
4616        let channel_a = channels_a.update(cx_a, |this, cx| {
4617            this.get_channel(channel_id.to_proto(), cx).unwrap()
4618        });
4619        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4620        channel_a
4621            .condition(&cx_a, |channel, _| {
4622                channel_messages(channel)
4623                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4624            })
4625            .await;
4626
4627        let channels_b = cx_b
4628            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4629        channels_b
4630            .condition(cx_b, |list, _| list.available_channels().is_some())
4631            .await;
4632        channels_b.read_with(cx_b, |list, _| {
4633            assert_eq!(
4634                list.available_channels().unwrap(),
4635                &[ChannelDetails {
4636                    id: channel_id.to_proto(),
4637                    name: "test-channel".to_string()
4638                }]
4639            )
4640        });
4641
4642        let channel_b = channels_b.update(cx_b, |this, cx| {
4643            this.get_channel(channel_id.to_proto(), cx).unwrap()
4644        });
4645        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4646        channel_b
4647            .condition(&cx_b, |channel, _| {
4648                channel_messages(channel)
4649                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4650            })
4651            .await;
4652
4653        channel_a
4654            .update(cx_a, |channel, cx| {
4655                channel
4656                    .send_message("oh, hi B.".to_string(), cx)
4657                    .unwrap()
4658                    .detach();
4659                let task = channel.send_message("sup".to_string(), cx).unwrap();
4660                assert_eq!(
4661                    channel_messages(channel),
4662                    &[
4663                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4664                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4665                        ("user_a".to_string(), "sup".to_string(), true)
4666                    ]
4667                );
4668                task
4669            })
4670            .await
4671            .unwrap();
4672
4673        channel_b
4674            .condition(&cx_b, |channel, _| {
4675                channel_messages(channel)
4676                    == [
4677                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4678                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4679                        ("user_a".to_string(), "sup".to_string(), false),
4680                    ]
4681            })
4682            .await;
4683
4684        assert_eq!(
4685            server
4686                .state()
4687                .await
4688                .channel(channel_id)
4689                .unwrap()
4690                .connection_ids
4691                .len(),
4692            2
4693        );
4694        cx_b.update(|_| drop(channel_b));
4695        server
4696            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4697            .await;
4698
4699        cx_a.update(|_| drop(channel_a));
4700        server
4701            .condition(|state| state.channel(channel_id).is_none())
4702            .await;
4703    }
4704
4705    #[gpui::test(iterations = 10)]
4706    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4707        cx_a.foreground().forbid_parking();
4708
4709        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4710        let client_a = server.create_client(cx_a, "user_a").await;
4711
4712        let db = &server.app_state.db;
4713        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4714        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4715        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4716            .await
4717            .unwrap();
4718        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4719            .await
4720            .unwrap();
4721
4722        let channels_a = cx_a
4723            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4724        channels_a
4725            .condition(cx_a, |list, _| list.available_channels().is_some())
4726            .await;
4727        let channel_a = channels_a.update(cx_a, |this, cx| {
4728            this.get_channel(channel_id.to_proto(), cx).unwrap()
4729        });
4730
4731        // Messages aren't allowed to be too long.
4732        channel_a
4733            .update(cx_a, |channel, cx| {
4734                let long_body = "this is long.\n".repeat(1024);
4735                channel.send_message(long_body, cx).unwrap()
4736            })
4737            .await
4738            .unwrap_err();
4739
4740        // Messages aren't allowed to be blank.
4741        channel_a.update(cx_a, |channel, cx| {
4742            channel.send_message(String::new(), cx).unwrap_err()
4743        });
4744
4745        // Leading and trailing whitespace are trimmed.
4746        channel_a
4747            .update(cx_a, |channel, cx| {
4748                channel
4749                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4750                    .unwrap()
4751            })
4752            .await
4753            .unwrap();
4754        assert_eq!(
4755            db.get_channel_messages(channel_id, 10, None)
4756                .await
4757                .unwrap()
4758                .iter()
4759                .map(|m| &m.body)
4760                .collect::<Vec<_>>(),
4761            &["surrounded by whitespace"]
4762        );
4763    }
4764
4765    #[gpui::test(iterations = 10)]
4766    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4767        cx_a.foreground().forbid_parking();
4768
4769        // Connect to a server as 2 clients.
4770        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4771        let client_a = server.create_client(cx_a, "user_a").await;
4772        let client_b = server.create_client(cx_b, "user_b").await;
4773        let mut status_b = client_b.status();
4774
4775        // Create an org that includes these 2 users.
4776        let db = &server.app_state.db;
4777        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4778        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4779            .await
4780            .unwrap();
4781        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4782            .await
4783            .unwrap();
4784
4785        // Create a channel that includes all the users.
4786        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4787        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4788            .await
4789            .unwrap();
4790        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4791            .await
4792            .unwrap();
4793        db.create_channel_message(
4794            channel_id,
4795            client_b.current_user_id(&cx_b),
4796            "hello A, it's B.",
4797            OffsetDateTime::now_utc(),
4798            2,
4799        )
4800        .await
4801        .unwrap();
4802
4803        let channels_a = cx_a
4804            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4805        channels_a
4806            .condition(cx_a, |list, _| list.available_channels().is_some())
4807            .await;
4808
4809        channels_a.read_with(cx_a, |list, _| {
4810            assert_eq!(
4811                list.available_channels().unwrap(),
4812                &[ChannelDetails {
4813                    id: channel_id.to_proto(),
4814                    name: "test-channel".to_string()
4815                }]
4816            )
4817        });
4818        let channel_a = channels_a.update(cx_a, |this, cx| {
4819            this.get_channel(channel_id.to_proto(), cx).unwrap()
4820        });
4821        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4822        channel_a
4823            .condition(&cx_a, |channel, _| {
4824                channel_messages(channel)
4825                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4826            })
4827            .await;
4828
4829        let channels_b = cx_b
4830            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4831        channels_b
4832            .condition(cx_b, |list, _| list.available_channels().is_some())
4833            .await;
4834        channels_b.read_with(cx_b, |list, _| {
4835            assert_eq!(
4836                list.available_channels().unwrap(),
4837                &[ChannelDetails {
4838                    id: channel_id.to_proto(),
4839                    name: "test-channel".to_string()
4840                }]
4841            )
4842        });
4843
4844        let channel_b = channels_b.update(cx_b, |this, cx| {
4845            this.get_channel(channel_id.to_proto(), cx).unwrap()
4846        });
4847        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4848        channel_b
4849            .condition(&cx_b, |channel, _| {
4850                channel_messages(channel)
4851                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4852            })
4853            .await;
4854
4855        // Disconnect client B, ensuring we can still access its cached channel data.
4856        server.forbid_connections();
4857        server.disconnect_client(client_b.current_user_id(&cx_b));
4858        cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4859        while !matches!(
4860            status_b.next().await,
4861            Some(client::Status::ReconnectionError { .. })
4862        ) {}
4863
4864        channels_b.read_with(cx_b, |channels, _| {
4865            assert_eq!(
4866                channels.available_channels().unwrap(),
4867                [ChannelDetails {
4868                    id: channel_id.to_proto(),
4869                    name: "test-channel".to_string()
4870                }]
4871            )
4872        });
4873        channel_b.read_with(cx_b, |channel, _| {
4874            assert_eq!(
4875                channel_messages(channel),
4876                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4877            )
4878        });
4879
4880        // Send a message from client B while it is disconnected.
4881        channel_b
4882            .update(cx_b, |channel, cx| {
4883                let task = channel
4884                    .send_message("can you see this?".to_string(), cx)
4885                    .unwrap();
4886                assert_eq!(
4887                    channel_messages(channel),
4888                    &[
4889                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4890                        ("user_b".to_string(), "can you see this?".to_string(), true)
4891                    ]
4892                );
4893                task
4894            })
4895            .await
4896            .unwrap_err();
4897
4898        // Send a message from client A while B is disconnected.
4899        channel_a
4900            .update(cx_a, |channel, cx| {
4901                channel
4902                    .send_message("oh, hi B.".to_string(), cx)
4903                    .unwrap()
4904                    .detach();
4905                let task = channel.send_message("sup".to_string(), cx).unwrap();
4906                assert_eq!(
4907                    channel_messages(channel),
4908                    &[
4909                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4910                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4911                        ("user_a".to_string(), "sup".to_string(), true)
4912                    ]
4913                );
4914                task
4915            })
4916            .await
4917            .unwrap();
4918
4919        // Give client B a chance to reconnect.
4920        server.allow_connections();
4921        cx_b.foreground().advance_clock(Duration::from_secs(10));
4922
4923        // Verify that B sees the new messages upon reconnection, as well as the message client B
4924        // sent while offline.
4925        channel_b
4926            .condition(&cx_b, |channel, _| {
4927                channel_messages(channel)
4928                    == [
4929                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4930                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4931                        ("user_a".to_string(), "sup".to_string(), false),
4932                        ("user_b".to_string(), "can you see this?".to_string(), false),
4933                    ]
4934            })
4935            .await;
4936
4937        // Ensure client A and B can communicate normally after reconnection.
4938        channel_a
4939            .update(cx_a, |channel, cx| {
4940                channel.send_message("you online?".to_string(), cx).unwrap()
4941            })
4942            .await
4943            .unwrap();
4944        channel_b
4945            .condition(&cx_b, |channel, _| {
4946                channel_messages(channel)
4947                    == [
4948                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4949                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4950                        ("user_a".to_string(), "sup".to_string(), false),
4951                        ("user_b".to_string(), "can you see this?".to_string(), false),
4952                        ("user_a".to_string(), "you online?".to_string(), false),
4953                    ]
4954            })
4955            .await;
4956
4957        channel_b
4958            .update(cx_b, |channel, cx| {
4959                channel.send_message("yep".to_string(), cx).unwrap()
4960            })
4961            .await
4962            .unwrap();
4963        channel_a
4964            .condition(&cx_a, |channel, _| {
4965                channel_messages(channel)
4966                    == [
4967                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4968                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4969                        ("user_a".to_string(), "sup".to_string(), false),
4970                        ("user_b".to_string(), "can you see this?".to_string(), false),
4971                        ("user_a".to_string(), "you online?".to_string(), false),
4972                        ("user_b".to_string(), "yep".to_string(), false),
4973                    ]
4974            })
4975            .await;
4976    }
4977
4978    #[gpui::test(iterations = 10)]
4979    async fn test_contacts(
4980        deterministic: Arc<Deterministic>,
4981        cx_a: &mut TestAppContext,
4982        cx_b: &mut TestAppContext,
4983        cx_c: &mut TestAppContext,
4984    ) {
4985        cx_a.foreground().forbid_parking();
4986        let lang_registry = Arc::new(LanguageRegistry::test());
4987        let fs = FakeFs::new(cx_a.background());
4988
4989        // Connect to a server as 3 clients.
4990        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4991        let client_a = server.create_client(cx_a, "user_a").await;
4992        let client_b = server.create_client(cx_b, "user_b").await;
4993        let client_c = server.create_client(cx_c, "user_c").await;
4994        server
4995            .make_contacts(vec![
4996                (&client_a, cx_a),
4997                (&client_b, cx_b),
4998                (&client_c, cx_c),
4999            ])
5000            .await;
5001        deterministic.run_until_parked();
5002        client_a.user_store.read_with(cx_a, |store, _| {
5003            assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
5004        });
5005        client_b.user_store.read_with(cx_b, |store, _| {
5006            assert_eq!(contacts(store), [("user_a", vec![]), ("user_c", vec![])])
5007        });
5008        client_c.user_store.read_with(cx_c, |store, _| {
5009            assert_eq!(contacts(store), [("user_a", vec![]), ("user_b", vec![])])
5010        });
5011
5012        // Share a worktree as client A.
5013        fs.create_dir(Path::new("/a")).await.unwrap();
5014
5015        let project_a = cx_a.update(|cx| {
5016            Project::local(
5017                client_a.clone(),
5018                client_a.user_store.clone(),
5019                lang_registry.clone(),
5020                fs.clone(),
5021                cx,
5022            )
5023        });
5024        let (worktree_a, _) = project_a
5025            .update(cx_a, |p, cx| {
5026                p.find_or_create_local_worktree("/a", true, cx)
5027            })
5028            .await
5029            .unwrap();
5030        worktree_a
5031            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
5032            .await;
5033
5034        deterministic.run_until_parked();
5035        client_a.user_store.read_with(cx_a, |store, _| {
5036            assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
5037        });
5038        client_b.user_store.read_with(cx_b, |store, _| {
5039            assert_eq!(
5040                contacts(store),
5041                [("user_a", vec![("a", false, vec![])]), ("user_c", vec![])]
5042            )
5043        });
5044        client_c.user_store.read_with(cx_c, |store, _| {
5045            assert_eq!(
5046                contacts(store),
5047                [("user_a", vec![("a", false, vec![])]), ("user_b", vec![])]
5048            )
5049        });
5050
5051        let project_id = project_a
5052            .update(cx_a, |project, _| project.next_remote_id())
5053            .await;
5054        project_a
5055            .update(cx_a, |project, cx| project.share(cx))
5056            .await
5057            .unwrap();
5058        deterministic.run_until_parked();
5059        client_a.user_store.read_with(cx_a, |store, _| {
5060            assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
5061        });
5062        client_b.user_store.read_with(cx_b, |store, _| {
5063            assert_eq!(
5064                contacts(store),
5065                [("user_a", vec![("a", true, vec![])]), ("user_c", vec![])]
5066            )
5067        });
5068        client_c.user_store.read_with(cx_c, |store, _| {
5069            assert_eq!(
5070                contacts(store),
5071                [("user_a", vec![("a", true, vec![])]), ("user_b", vec![])]
5072            )
5073        });
5074
5075        let _project_b = Project::remote(
5076            project_id,
5077            client_b.clone(),
5078            client_b.user_store.clone(),
5079            lang_registry.clone(),
5080            fs.clone(),
5081            &mut cx_b.to_async(),
5082        )
5083        .await
5084        .unwrap();
5085        deterministic.run_until_parked();
5086        client_a.user_store.read_with(cx_a, |store, _| {
5087            assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
5088        });
5089        client_b.user_store.read_with(cx_b, |store, _| {
5090            assert_eq!(
5091                contacts(store),
5092                [
5093                    ("user_a", vec![("a", true, vec!["user_b"])]),
5094                    ("user_c", vec![])
5095                ]
5096            )
5097        });
5098        client_c.user_store.read_with(cx_c, |store, _| {
5099            assert_eq!(
5100                contacts(store),
5101                [
5102                    ("user_a", vec![("a", true, vec!["user_b"])]),
5103                    ("user_b", vec![])
5104                ]
5105            )
5106        });
5107
5108        project_a
5109            .condition(&cx_a, |project, _| {
5110                project.collaborators().contains_key(&client_b.peer_id)
5111            })
5112            .await;
5113
5114        cx_a.update(move |_| drop(project_a));
5115        deterministic.run_until_parked();
5116        client_a.user_store.read_with(cx_a, |store, _| {
5117            assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
5118        });
5119        client_b.user_store.read_with(cx_b, |store, _| {
5120            assert_eq!(contacts(store), [("user_a", vec![]), ("user_c", vec![])])
5121        });
5122        client_c.user_store.read_with(cx_c, |store, _| {
5123            assert_eq!(contacts(store), [("user_a", vec![]), ("user_b", vec![])])
5124        });
5125
5126        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
5127            user_store
5128                .contacts()
5129                .iter()
5130                .map(|contact| {
5131                    let worktrees = contact
5132                        .projects
5133                        .iter()
5134                        .map(|p| {
5135                            (
5136                                p.worktree_root_names[0].as_str(),
5137                                p.is_shared,
5138                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5139                            )
5140                        })
5141                        .collect();
5142                    (contact.user.github_login.as_str(), worktrees)
5143                })
5144                .collect()
5145        }
5146    }
5147
5148    #[gpui::test(iterations = 10)]
5149    async fn test_contact_requests(
5150        executor: Arc<Deterministic>,
5151        cx_a: &mut TestAppContext,
5152        cx_a2: &mut TestAppContext,
5153        cx_b: &mut TestAppContext,
5154        cx_b2: &mut TestAppContext,
5155        cx_c: &mut TestAppContext,
5156        cx_c2: &mut TestAppContext,
5157    ) {
5158        cx_a.foreground().forbid_parking();
5159
5160        // Connect to a server as 3 clients.
5161        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5162        let client_a = server.create_client(cx_a, "user_a").await;
5163        let client_a2 = server.create_client(cx_a2, "user_a").await;
5164        let client_b = server.create_client(cx_b, "user_b").await;
5165        let client_b2 = server.create_client(cx_b2, "user_b").await;
5166        let client_c = server.create_client(cx_c, "user_c").await;
5167        let client_c2 = server.create_client(cx_c2, "user_c").await;
5168
5169        assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5170        assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5171        assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5172
5173        // User A and User C request that user B become their contact.
5174        client_a
5175            .user_store
5176            .update(cx_a, |store, cx| {
5177                store.request_contact(client_b.user_id().unwrap(), cx)
5178            })
5179            .await
5180            .unwrap();
5181        client_c
5182            .user_store
5183            .update(cx_c, |store, cx| {
5184                store.request_contact(client_b.user_id().unwrap(), cx)
5185            })
5186            .await
5187            .unwrap();
5188        executor.run_until_parked();
5189
5190        // All users see the pending request appear in all their clients.
5191        assert_eq!(
5192            client_a.summarize_contacts(&cx_a).outgoing_requests,
5193            &["user_b"]
5194        );
5195        assert_eq!(
5196            client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5197            &["user_b"]
5198        );
5199        assert_eq!(
5200            client_b.summarize_contacts(&cx_b).incoming_requests,
5201            &["user_a", "user_c"]
5202        );
5203        assert_eq!(
5204            client_b2.summarize_contacts(&cx_b2).incoming_requests,
5205            &["user_a", "user_c"]
5206        );
5207        assert_eq!(
5208            client_c.summarize_contacts(&cx_c).outgoing_requests,
5209            &["user_b"]
5210        );
5211        assert_eq!(
5212            client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5213            &["user_b"]
5214        );
5215
5216        // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5217        disconnect_and_reconnect(&client_a, cx_a).await;
5218        disconnect_and_reconnect(&client_b, cx_b).await;
5219        disconnect_and_reconnect(&client_c, cx_c).await;
5220        executor.run_until_parked();
5221        assert_eq!(
5222            client_a.summarize_contacts(&cx_a).outgoing_requests,
5223            &["user_b"]
5224        );
5225        assert_eq!(
5226            client_b.summarize_contacts(&cx_b).incoming_requests,
5227            &["user_a", "user_c"]
5228        );
5229        assert_eq!(
5230            client_c.summarize_contacts(&cx_c).outgoing_requests,
5231            &["user_b"]
5232        );
5233
5234        // User B accepts the request from user A.
5235        client_b
5236            .user_store
5237            .update(cx_b, |store, cx| {
5238                store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5239            })
5240            .await
5241            .unwrap();
5242
5243        executor.run_until_parked();
5244
5245        // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5246        let contacts_b = client_b.summarize_contacts(&cx_b);
5247        assert_eq!(contacts_b.current, &["user_a"]);
5248        assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5249        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5250        assert_eq!(contacts_b2.current, &["user_a"]);
5251        assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5252
5253        // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5254        let contacts_a = client_a.summarize_contacts(&cx_a);
5255        assert_eq!(contacts_a.current, &["user_b"]);
5256        assert!(contacts_a.outgoing_requests.is_empty());
5257        let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5258        assert_eq!(contacts_a2.current, &["user_b"]);
5259        assert!(contacts_a2.outgoing_requests.is_empty());
5260
5261        // Contacts are present upon connecting (tested here via disconnect/reconnect)
5262        disconnect_and_reconnect(&client_a, cx_a).await;
5263        disconnect_and_reconnect(&client_b, cx_b).await;
5264        disconnect_and_reconnect(&client_c, cx_c).await;
5265        executor.run_until_parked();
5266        assert_eq!(client_a.summarize_contacts(&cx_a).current, &["user_b"]);
5267        assert_eq!(client_b.summarize_contacts(&cx_b).current, &["user_a"]);
5268        assert_eq!(
5269            client_b.summarize_contacts(&cx_b).incoming_requests,
5270            &["user_c"]
5271        );
5272        assert!(client_c.summarize_contacts(&cx_c).current.is_empty());
5273        assert_eq!(
5274            client_c.summarize_contacts(&cx_c).outgoing_requests,
5275            &["user_b"]
5276        );
5277
5278        // User B rejects the request from user C.
5279        client_b
5280            .user_store
5281            .update(cx_b, |store, cx| {
5282                store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5283            })
5284            .await
5285            .unwrap();
5286
5287        executor.run_until_parked();
5288
5289        // User B doesn't see user C as their contact, and the incoming request from them is removed.
5290        let contacts_b = client_b.summarize_contacts(&cx_b);
5291        assert_eq!(contacts_b.current, &["user_a"]);
5292        assert!(contacts_b.incoming_requests.is_empty());
5293        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5294        assert_eq!(contacts_b2.current, &["user_a"]);
5295        assert!(contacts_b2.incoming_requests.is_empty());
5296
5297        // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5298        let contacts_c = client_c.summarize_contacts(&cx_c);
5299        assert!(contacts_c.current.is_empty());
5300        assert!(contacts_c.outgoing_requests.is_empty());
5301        let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5302        assert!(contacts_c2.current.is_empty());
5303        assert!(contacts_c2.outgoing_requests.is_empty());
5304
5305        // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5306        disconnect_and_reconnect(&client_a, cx_a).await;
5307        disconnect_and_reconnect(&client_b, cx_b).await;
5308        disconnect_and_reconnect(&client_c, cx_c).await;
5309        executor.run_until_parked();
5310        assert_eq!(client_a.summarize_contacts(&cx_a).current, &["user_b"]);
5311        assert_eq!(client_b.summarize_contacts(&cx_b).current, &["user_a"]);
5312        assert!(client_b
5313            .summarize_contacts(&cx_b)
5314            .incoming_requests
5315            .is_empty());
5316        assert!(client_c.summarize_contacts(&cx_c).current.is_empty());
5317        assert!(client_c
5318            .summarize_contacts(&cx_c)
5319            .outgoing_requests
5320            .is_empty());
5321
5322        async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5323            client.disconnect(&cx.to_async()).unwrap();
5324            client.clear_contacts(cx).await;
5325            client
5326                .authenticate_and_connect(false, &cx.to_async())
5327                .await
5328                .unwrap();
5329        }
5330    }
5331
5332    #[gpui::test(iterations = 10)]
5333    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5334        cx_a.foreground().forbid_parking();
5335        let fs = FakeFs::new(cx_a.background());
5336
5337        // 2 clients connect to a server.
5338        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5339        let mut client_a = server.create_client(cx_a, "user_a").await;
5340        let mut client_b = server.create_client(cx_b, "user_b").await;
5341        server
5342            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5343            .await;
5344        cx_a.update(editor::init);
5345        cx_b.update(editor::init);
5346
5347        // Client A shares a project.
5348        fs.insert_tree(
5349            "/a",
5350            json!({
5351                "1.txt": "one",
5352                "2.txt": "two",
5353                "3.txt": "three",
5354            }),
5355        )
5356        .await;
5357        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5358        project_a
5359            .update(cx_a, |project, cx| project.share(cx))
5360            .await
5361            .unwrap();
5362
5363        // Client B joins the project.
5364        let project_b = client_b
5365            .build_remote_project(
5366                project_a
5367                    .read_with(cx_a, |project, _| project.remote_id())
5368                    .unwrap(),
5369                cx_b,
5370            )
5371            .await;
5372
5373        // Client A opens some editors.
5374        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5375        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5376        let editor_a1 = workspace_a
5377            .update(cx_a, |workspace, cx| {
5378                workspace.open_path((worktree_id, "1.txt"), true, cx)
5379            })
5380            .await
5381            .unwrap()
5382            .downcast::<Editor>()
5383            .unwrap();
5384        let editor_a2 = workspace_a
5385            .update(cx_a, |workspace, cx| {
5386                workspace.open_path((worktree_id, "2.txt"), true, cx)
5387            })
5388            .await
5389            .unwrap()
5390            .downcast::<Editor>()
5391            .unwrap();
5392
5393        // Client B opens an editor.
5394        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5395        let editor_b1 = workspace_b
5396            .update(cx_b, |workspace, cx| {
5397                workspace.open_path((worktree_id, "1.txt"), true, cx)
5398            })
5399            .await
5400            .unwrap()
5401            .downcast::<Editor>()
5402            .unwrap();
5403
5404        let client_a_id = project_b.read_with(cx_b, |project, _| {
5405            project.collaborators().values().next().unwrap().peer_id
5406        });
5407        let client_b_id = project_a.read_with(cx_a, |project, _| {
5408            project.collaborators().values().next().unwrap().peer_id
5409        });
5410
5411        // When client B starts following client A, all visible view states are replicated to client B.
5412        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5413        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5414        workspace_b
5415            .update(cx_b, |workspace, cx| {
5416                workspace
5417                    .toggle_follow(&ToggleFollow(client_a_id), cx)
5418                    .unwrap()
5419            })
5420            .await
5421            .unwrap();
5422        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5423            workspace
5424                .active_item(cx)
5425                .unwrap()
5426                .downcast::<Editor>()
5427                .unwrap()
5428        });
5429        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5430        assert_eq!(
5431            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5432            Some((worktree_id, "2.txt").into())
5433        );
5434        assert_eq!(
5435            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5436            vec![2..3]
5437        );
5438        assert_eq!(
5439            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5440            vec![0..1]
5441        );
5442
5443        // When client A activates a different editor, client B does so as well.
5444        workspace_a.update(cx_a, |workspace, cx| {
5445            workspace.activate_item(&editor_a1, cx)
5446        });
5447        workspace_b
5448            .condition(cx_b, |workspace, cx| {
5449                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5450            })
5451            .await;
5452
5453        // When client A navigates back and forth, client B does so as well.
5454        workspace_a
5455            .update(cx_a, |workspace, cx| {
5456                workspace::Pane::go_back(workspace, None, cx)
5457            })
5458            .await;
5459        workspace_b
5460            .condition(cx_b, |workspace, cx| {
5461                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5462            })
5463            .await;
5464
5465        workspace_a
5466            .update(cx_a, |workspace, cx| {
5467                workspace::Pane::go_forward(workspace, None, cx)
5468            })
5469            .await;
5470        workspace_b
5471            .condition(cx_b, |workspace, cx| {
5472                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5473            })
5474            .await;
5475
5476        // Changes to client A's editor are reflected on client B.
5477        editor_a1.update(cx_a, |editor, cx| {
5478            editor.select_ranges([1..1, 2..2], None, cx);
5479        });
5480        editor_b1
5481            .condition(cx_b, |editor, cx| {
5482                editor.selected_ranges(cx) == vec![1..1, 2..2]
5483            })
5484            .await;
5485
5486        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5487        editor_b1
5488            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5489            .await;
5490
5491        editor_a1.update(cx_a, |editor, cx| {
5492            editor.select_ranges([3..3], None, cx);
5493            editor.set_scroll_position(vec2f(0., 100.), cx);
5494        });
5495        editor_b1
5496            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5497            .await;
5498
5499        // After unfollowing, client B stops receiving updates from client A.
5500        workspace_b.update(cx_b, |workspace, cx| {
5501            workspace.unfollow(&workspace.active_pane().clone(), cx)
5502        });
5503        workspace_a.update(cx_a, |workspace, cx| {
5504            workspace.activate_item(&editor_a2, cx)
5505        });
5506        cx_a.foreground().run_until_parked();
5507        assert_eq!(
5508            workspace_b.read_with(cx_b, |workspace, cx| workspace
5509                .active_item(cx)
5510                .unwrap()
5511                .id()),
5512            editor_b1.id()
5513        );
5514
5515        // Client A starts following client B.
5516        workspace_a
5517            .update(cx_a, |workspace, cx| {
5518                workspace
5519                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5520                    .unwrap()
5521            })
5522            .await
5523            .unwrap();
5524        assert_eq!(
5525            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5526            Some(client_b_id)
5527        );
5528        assert_eq!(
5529            workspace_a.read_with(cx_a, |workspace, cx| workspace
5530                .active_item(cx)
5531                .unwrap()
5532                .id()),
5533            editor_a1.id()
5534        );
5535
5536        // Following interrupts when client B disconnects.
5537        client_b.disconnect(&cx_b.to_async()).unwrap();
5538        cx_a.foreground().run_until_parked();
5539        assert_eq!(
5540            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5541            None
5542        );
5543    }
5544
5545    #[gpui::test(iterations = 10)]
5546    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5547        cx_a.foreground().forbid_parking();
5548        let fs = FakeFs::new(cx_a.background());
5549
5550        // 2 clients connect to a server.
5551        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5552        let mut client_a = server.create_client(cx_a, "user_a").await;
5553        let mut client_b = server.create_client(cx_b, "user_b").await;
5554        server
5555            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5556            .await;
5557        cx_a.update(editor::init);
5558        cx_b.update(editor::init);
5559
5560        // Client A shares a project.
5561        fs.insert_tree(
5562            "/a",
5563            json!({
5564                "1.txt": "one",
5565                "2.txt": "two",
5566                "3.txt": "three",
5567                "4.txt": "four",
5568            }),
5569        )
5570        .await;
5571        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5572        project_a
5573            .update(cx_a, |project, cx| project.share(cx))
5574            .await
5575            .unwrap();
5576
5577        // Client B joins the project.
5578        let project_b = client_b
5579            .build_remote_project(
5580                project_a
5581                    .read_with(cx_a, |project, _| project.remote_id())
5582                    .unwrap(),
5583                cx_b,
5584            )
5585            .await;
5586
5587        // Client A opens some editors.
5588        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5589        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5590        let _editor_a1 = workspace_a
5591            .update(cx_a, |workspace, cx| {
5592                workspace.open_path((worktree_id, "1.txt"), true, cx)
5593            })
5594            .await
5595            .unwrap()
5596            .downcast::<Editor>()
5597            .unwrap();
5598
5599        // Client B opens an editor.
5600        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5601        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5602        let _editor_b1 = workspace_b
5603            .update(cx_b, |workspace, cx| {
5604                workspace.open_path((worktree_id, "2.txt"), true, cx)
5605            })
5606            .await
5607            .unwrap()
5608            .downcast::<Editor>()
5609            .unwrap();
5610
5611        // Clients A and B follow each other in split panes
5612        workspace_a
5613            .update(cx_a, |workspace, cx| {
5614                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5615                assert_ne!(*workspace.active_pane(), pane_a1);
5616                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5617                workspace
5618                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5619                    .unwrap()
5620            })
5621            .await
5622            .unwrap();
5623        workspace_b
5624            .update(cx_b, |workspace, cx| {
5625                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5626                assert_ne!(*workspace.active_pane(), pane_b1);
5627                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5628                workspace
5629                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5630                    .unwrap()
5631            })
5632            .await
5633            .unwrap();
5634
5635        workspace_a
5636            .update(cx_a, |workspace, cx| {
5637                workspace.activate_next_pane(cx);
5638                assert_eq!(*workspace.active_pane(), pane_a1);
5639                workspace.open_path((worktree_id, "3.txt"), true, cx)
5640            })
5641            .await
5642            .unwrap();
5643        workspace_b
5644            .update(cx_b, |workspace, cx| {
5645                workspace.activate_next_pane(cx);
5646                assert_eq!(*workspace.active_pane(), pane_b1);
5647                workspace.open_path((worktree_id, "4.txt"), true, cx)
5648            })
5649            .await
5650            .unwrap();
5651        cx_a.foreground().run_until_parked();
5652
5653        // Ensure leader updates don't change the active pane of followers
5654        workspace_a.read_with(cx_a, |workspace, _| {
5655            assert_eq!(*workspace.active_pane(), pane_a1);
5656        });
5657        workspace_b.read_with(cx_b, |workspace, _| {
5658            assert_eq!(*workspace.active_pane(), pane_b1);
5659        });
5660
5661        // Ensure peers following each other doesn't cause an infinite loop.
5662        assert_eq!(
5663            workspace_a.read_with(cx_a, |workspace, cx| workspace
5664                .active_item(cx)
5665                .unwrap()
5666                .project_path(cx)),
5667            Some((worktree_id, "3.txt").into())
5668        );
5669        workspace_a.update(cx_a, |workspace, cx| {
5670            assert_eq!(
5671                workspace.active_item(cx).unwrap().project_path(cx),
5672                Some((worktree_id, "3.txt").into())
5673            );
5674            workspace.activate_next_pane(cx);
5675            assert_eq!(
5676                workspace.active_item(cx).unwrap().project_path(cx),
5677                Some((worktree_id, "4.txt").into())
5678            );
5679        });
5680        workspace_b.update(cx_b, |workspace, cx| {
5681            assert_eq!(
5682                workspace.active_item(cx).unwrap().project_path(cx),
5683                Some((worktree_id, "4.txt").into())
5684            );
5685            workspace.activate_next_pane(cx);
5686            assert_eq!(
5687                workspace.active_item(cx).unwrap().project_path(cx),
5688                Some((worktree_id, "3.txt").into())
5689            );
5690        });
5691    }
5692
5693    #[gpui::test(iterations = 10)]
5694    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5695        cx_a.foreground().forbid_parking();
5696        let fs = FakeFs::new(cx_a.background());
5697
5698        // 2 clients connect to a server.
5699        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5700        let mut client_a = server.create_client(cx_a, "user_a").await;
5701        let mut client_b = server.create_client(cx_b, "user_b").await;
5702        server
5703            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5704            .await;
5705        cx_a.update(editor::init);
5706        cx_b.update(editor::init);
5707
5708        // Client A shares a project.
5709        fs.insert_tree(
5710            "/a",
5711            json!({
5712                "1.txt": "one",
5713                "2.txt": "two",
5714                "3.txt": "three",
5715            }),
5716        )
5717        .await;
5718        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5719        project_a
5720            .update(cx_a, |project, cx| project.share(cx))
5721            .await
5722            .unwrap();
5723
5724        // Client B joins the project.
5725        let project_b = client_b
5726            .build_remote_project(
5727                project_a
5728                    .read_with(cx_a, |project, _| project.remote_id())
5729                    .unwrap(),
5730                cx_b,
5731            )
5732            .await;
5733
5734        // Client A opens some editors.
5735        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5736        let _editor_a1 = workspace_a
5737            .update(cx_a, |workspace, cx| {
5738                workspace.open_path((worktree_id, "1.txt"), true, cx)
5739            })
5740            .await
5741            .unwrap()
5742            .downcast::<Editor>()
5743            .unwrap();
5744
5745        // Client B starts following client A.
5746        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5747        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5748        let leader_id = project_b.read_with(cx_b, |project, _| {
5749            project.collaborators().values().next().unwrap().peer_id
5750        });
5751        workspace_b
5752            .update(cx_b, |workspace, cx| {
5753                workspace
5754                    .toggle_follow(&ToggleFollow(leader_id), cx)
5755                    .unwrap()
5756            })
5757            .await
5758            .unwrap();
5759        assert_eq!(
5760            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5761            Some(leader_id)
5762        );
5763        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5764            workspace
5765                .active_item(cx)
5766                .unwrap()
5767                .downcast::<Editor>()
5768                .unwrap()
5769        });
5770
5771        // When client B moves, it automatically stops following client A.
5772        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5773        assert_eq!(
5774            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5775            None
5776        );
5777
5778        workspace_b
5779            .update(cx_b, |workspace, cx| {
5780                workspace
5781                    .toggle_follow(&ToggleFollow(leader_id), cx)
5782                    .unwrap()
5783            })
5784            .await
5785            .unwrap();
5786        assert_eq!(
5787            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5788            Some(leader_id)
5789        );
5790
5791        // When client B edits, it automatically stops following client A.
5792        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5793        assert_eq!(
5794            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5795            None
5796        );
5797
5798        workspace_b
5799            .update(cx_b, |workspace, cx| {
5800                workspace
5801                    .toggle_follow(&ToggleFollow(leader_id), cx)
5802                    .unwrap()
5803            })
5804            .await
5805            .unwrap();
5806        assert_eq!(
5807            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5808            Some(leader_id)
5809        );
5810
5811        // When client B scrolls, it automatically stops following client A.
5812        editor_b2.update(cx_b, |editor, cx| {
5813            editor.set_scroll_position(vec2f(0., 3.), cx)
5814        });
5815        assert_eq!(
5816            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5817            None
5818        );
5819
5820        workspace_b
5821            .update(cx_b, |workspace, cx| {
5822                workspace
5823                    .toggle_follow(&ToggleFollow(leader_id), cx)
5824                    .unwrap()
5825            })
5826            .await
5827            .unwrap();
5828        assert_eq!(
5829            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5830            Some(leader_id)
5831        );
5832
5833        // When client B activates a different pane, it continues following client A in the original pane.
5834        workspace_b.update(cx_b, |workspace, cx| {
5835            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5836        });
5837        assert_eq!(
5838            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5839            Some(leader_id)
5840        );
5841
5842        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5843        assert_eq!(
5844            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5845            Some(leader_id)
5846        );
5847
5848        // When client B activates a different item in the original pane, it automatically stops following client A.
5849        workspace_b
5850            .update(cx_b, |workspace, cx| {
5851                workspace.open_path((worktree_id, "2.txt"), true, cx)
5852            })
5853            .await
5854            .unwrap();
5855        assert_eq!(
5856            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5857            None
5858        );
5859    }
5860
5861    #[gpui::test(iterations = 100)]
5862    async fn test_random_collaboration(
5863        cx: &mut TestAppContext,
5864        deterministic: Arc<Deterministic>,
5865        rng: StdRng,
5866    ) {
5867        cx.foreground().forbid_parking();
5868        let max_peers = env::var("MAX_PEERS")
5869            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5870            .unwrap_or(5);
5871        assert!(max_peers <= 5);
5872
5873        let max_operations = env::var("OPERATIONS")
5874            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5875            .unwrap_or(10);
5876
5877        let rng = Arc::new(Mutex::new(rng));
5878
5879        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5880        let host_language_registry = Arc::new(LanguageRegistry::test());
5881
5882        let fs = FakeFs::new(cx.background());
5883        fs.insert_tree("/_collab", json!({"init": ""})).await;
5884
5885        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5886        let db = server.app_state.db.clone();
5887        let host_user_id = db.create_user("host", false).await.unwrap();
5888        for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
5889            let guest_user_id = db.create_user(username, false).await.unwrap();
5890            server
5891                .app_state
5892                .db
5893                .send_contact_request(guest_user_id, host_user_id)
5894                .await
5895                .unwrap();
5896            server
5897                .app_state
5898                .db
5899                .respond_to_contact_request(host_user_id, guest_user_id, true)
5900                .await
5901                .unwrap();
5902        }
5903
5904        let mut clients = Vec::new();
5905        let mut user_ids = Vec::new();
5906        let mut op_start_signals = Vec::new();
5907
5908        let mut next_entity_id = 100000;
5909        let mut host_cx = TestAppContext::new(
5910            cx.foreground_platform(),
5911            cx.platform(),
5912            deterministic.build_foreground(next_entity_id),
5913            deterministic.build_background(),
5914            cx.font_cache(),
5915            cx.leak_detector(),
5916            next_entity_id,
5917        );
5918        let host = server.create_client(&mut host_cx, "host").await;
5919        let host_project = host_cx.update(|cx| {
5920            Project::local(
5921                host.client.clone(),
5922                host.user_store.clone(),
5923                host_language_registry.clone(),
5924                fs.clone(),
5925                cx,
5926            )
5927        });
5928        let host_project_id = host_project
5929            .update(&mut host_cx, |p, _| p.next_remote_id())
5930            .await;
5931
5932        let (collab_worktree, _) = host_project
5933            .update(&mut host_cx, |project, cx| {
5934                project.find_or_create_local_worktree("/_collab", true, cx)
5935            })
5936            .await
5937            .unwrap();
5938        collab_worktree
5939            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5940            .await;
5941        host_project
5942            .update(&mut host_cx, |project, cx| project.share(cx))
5943            .await
5944            .unwrap();
5945
5946        // Set up fake language servers.
5947        let mut language = Language::new(
5948            LanguageConfig {
5949                name: "Rust".into(),
5950                path_suffixes: vec!["rs".to_string()],
5951                ..Default::default()
5952            },
5953            None,
5954        );
5955        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5956            name: "the-fake-language-server",
5957            capabilities: lsp::LanguageServer::full_capabilities(),
5958            initializer: Some(Box::new({
5959                let rng = rng.clone();
5960                let fs = fs.clone();
5961                let project = host_project.downgrade();
5962                move |fake_server: &mut FakeLanguageServer| {
5963                    fake_server.handle_request::<lsp::request::Completion, _, _>(
5964                        |_, _| async move {
5965                            Ok(Some(lsp::CompletionResponse::Array(vec![
5966                                lsp::CompletionItem {
5967                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5968                                        range: lsp::Range::new(
5969                                            lsp::Position::new(0, 0),
5970                                            lsp::Position::new(0, 0),
5971                                        ),
5972                                        new_text: "the-new-text".to_string(),
5973                                    })),
5974                                    ..Default::default()
5975                                },
5976                            ])))
5977                        },
5978                    );
5979
5980                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5981                        |_, _| async move {
5982                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5983                                lsp::CodeAction {
5984                                    title: "the-code-action".to_string(),
5985                                    ..Default::default()
5986                                },
5987                            )]))
5988                        },
5989                    );
5990
5991                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5992                        |params, _| async move {
5993                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5994                                params.position,
5995                                params.position,
5996                            ))))
5997                        },
5998                    );
5999
6000                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6001                        let fs = fs.clone();
6002                        let rng = rng.clone();
6003                        move |_, _| {
6004                            let fs = fs.clone();
6005                            let rng = rng.clone();
6006                            async move {
6007                                let files = fs.files().await;
6008                                let mut rng = rng.lock();
6009                                let count = rng.gen_range::<usize, _>(1..3);
6010                                let files = (0..count)
6011                                    .map(|_| files.choose(&mut *rng).unwrap())
6012                                    .collect::<Vec<_>>();
6013                                log::info!("LSP: Returning definitions in files {:?}", &files);
6014                                Ok(Some(lsp::GotoDefinitionResponse::Array(
6015                                    files
6016                                        .into_iter()
6017                                        .map(|file| lsp::Location {
6018                                            uri: lsp::Url::from_file_path(file).unwrap(),
6019                                            range: Default::default(),
6020                                        })
6021                                        .collect(),
6022                                )))
6023                            }
6024                        }
6025                    });
6026
6027                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6028                        let rng = rng.clone();
6029                        let project = project.clone();
6030                        move |params, mut cx| {
6031                            let highlights = if let Some(project) = project.upgrade(&cx) {
6032                                project.update(&mut cx, |project, cx| {
6033                                    let path = params
6034                                        .text_document_position_params
6035                                        .text_document
6036                                        .uri
6037                                        .to_file_path()
6038                                        .unwrap();
6039                                    let (worktree, relative_path) =
6040                                        project.find_local_worktree(&path, cx)?;
6041                                    let project_path =
6042                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
6043                                    let buffer =
6044                                        project.get_open_buffer(&project_path, cx)?.read(cx);
6045
6046                                    let mut highlights = Vec::new();
6047                                    let highlight_count = rng.lock().gen_range(1..=5);
6048                                    let mut prev_end = 0;
6049                                    for _ in 0..highlight_count {
6050                                        let range =
6051                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
6052
6053                                        highlights.push(lsp::DocumentHighlight {
6054                                            range: range_to_lsp(range.to_point_utf16(buffer)),
6055                                            kind: Some(lsp::DocumentHighlightKind::READ),
6056                                        });
6057                                        prev_end = range.end;
6058                                    }
6059                                    Some(highlights)
6060                                })
6061                            } else {
6062                                None
6063                            };
6064                            async move { Ok(highlights) }
6065                        }
6066                    });
6067                }
6068            })),
6069            ..Default::default()
6070        });
6071        host_language_registry.add(Arc::new(language));
6072
6073        let op_start_signal = futures::channel::mpsc::unbounded();
6074        user_ids.push(host.current_user_id(&host_cx));
6075        op_start_signals.push(op_start_signal.0);
6076        clients.push(host_cx.foreground().spawn(host.simulate_host(
6077            host_project,
6078            op_start_signal.1,
6079            rng.clone(),
6080            host_cx,
6081        )));
6082
6083        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6084            rng.lock().gen_range(0..max_operations)
6085        } else {
6086            max_operations
6087        };
6088        let mut available_guests = vec![
6089            "guest-1".to_string(),
6090            "guest-2".to_string(),
6091            "guest-3".to_string(),
6092            "guest-4".to_string(),
6093        ];
6094        let mut operations = 0;
6095        while operations < max_operations {
6096            if operations == disconnect_host_at {
6097                server.disconnect_client(user_ids[0]);
6098                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6099                drop(op_start_signals);
6100                let mut clients = futures::future::join_all(clients).await;
6101                cx.foreground().run_until_parked();
6102
6103                let (host, mut host_cx, host_err) = clients.remove(0);
6104                if let Some(host_err) = host_err {
6105                    log::error!("host error - {}", host_err);
6106                }
6107                host.project
6108                    .as_ref()
6109                    .unwrap()
6110                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6111                for (guest, mut guest_cx, guest_err) in clients {
6112                    if let Some(guest_err) = guest_err {
6113                        log::error!("{} error - {}", guest.username, guest_err);
6114                    }
6115                    // TODO
6116                    // let contacts = server
6117                    //     .store
6118                    //     .read()
6119                    //     .await
6120                    //     .contacts_for_user(guest.current_user_id(&guest_cx));
6121                    // assert!(!contacts
6122                    //     .iter()
6123                    //     .flat_map(|contact| &contact.projects)
6124                    //     .any(|project| project.id == host_project_id));
6125                    guest
6126                        .project
6127                        .as_ref()
6128                        .unwrap()
6129                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6130                    guest_cx.update(|_| drop(guest));
6131                }
6132                host_cx.update(|_| drop(host));
6133
6134                return;
6135            }
6136
6137            let distribution = rng.lock().gen_range(0..100);
6138            match distribution {
6139                0..=19 if !available_guests.is_empty() => {
6140                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
6141                    let guest_username = available_guests.remove(guest_ix);
6142                    log::info!("Adding new connection for {}", guest_username);
6143                    next_entity_id += 100000;
6144                    let mut guest_cx = TestAppContext::new(
6145                        cx.foreground_platform(),
6146                        cx.platform(),
6147                        deterministic.build_foreground(next_entity_id),
6148                        deterministic.build_background(),
6149                        cx.font_cache(),
6150                        cx.leak_detector(),
6151                        next_entity_id,
6152                    );
6153                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
6154                    let guest_project = Project::remote(
6155                        host_project_id,
6156                        guest.client.clone(),
6157                        guest.user_store.clone(),
6158                        guest_lang_registry.clone(),
6159                        FakeFs::new(cx.background()),
6160                        &mut guest_cx.to_async(),
6161                    )
6162                    .await
6163                    .unwrap();
6164                    let op_start_signal = futures::channel::mpsc::unbounded();
6165                    user_ids.push(guest.current_user_id(&guest_cx));
6166                    op_start_signals.push(op_start_signal.0);
6167                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6168                        guest_username.clone(),
6169                        guest_project,
6170                        op_start_signal.1,
6171                        rng.clone(),
6172                        guest_cx,
6173                    )));
6174
6175                    log::info!("Added connection for {}", guest_username);
6176                    operations += 1;
6177                }
6178                20..=29 if clients.len() > 1 => {
6179                    let guest_ix = rng.lock().gen_range(1..clients.len());
6180                    log::info!("Removing guest {}", user_ids[guest_ix]);
6181                    let removed_guest_id = user_ids.remove(guest_ix);
6182                    let guest = clients.remove(guest_ix);
6183                    op_start_signals.remove(guest_ix);
6184                    server.disconnect_client(removed_guest_id);
6185                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6186                    let (guest, mut guest_cx, guest_err) = guest.await;
6187                    if let Some(guest_err) = guest_err {
6188                        log::error!("{} error - {}", guest.username, guest_err);
6189                    }
6190                    guest
6191                        .project
6192                        .as_ref()
6193                        .unwrap()
6194                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6195                    // TODO
6196                    // for user_id in &user_ids {
6197                    //     for contact in server.store.read().await.contacts_for_user(*user_id) {
6198                    //         assert_ne!(
6199                    //             contact.user_id, removed_guest_id.0 as u64,
6200                    //             "removed guest is still a contact of another peer"
6201                    //         );
6202                    //         for project in contact.projects {
6203                    //             for project_guest_id in project.guests {
6204                    //                 assert_ne!(
6205                    //                     project_guest_id, removed_guest_id.0 as u64,
6206                    //                     "removed guest appears as still participating on a project"
6207                    //                 );
6208                    //             }
6209                    //         }
6210                    //     }
6211                    // }
6212
6213                    log::info!("{} removed", guest.username);
6214                    available_guests.push(guest.username.clone());
6215                    guest_cx.update(|_| drop(guest));
6216
6217                    operations += 1;
6218                }
6219                _ => {
6220                    while operations < max_operations && rng.lock().gen_bool(0.7) {
6221                        op_start_signals
6222                            .choose(&mut *rng.lock())
6223                            .unwrap()
6224                            .unbounded_send(())
6225                            .unwrap();
6226                        operations += 1;
6227                    }
6228
6229                    if rng.lock().gen_bool(0.8) {
6230                        cx.foreground().run_until_parked();
6231                    }
6232                }
6233            }
6234        }
6235
6236        drop(op_start_signals);
6237        let mut clients = futures::future::join_all(clients).await;
6238        cx.foreground().run_until_parked();
6239
6240        let (host_client, mut host_cx, host_err) = clients.remove(0);
6241        if let Some(host_err) = host_err {
6242            panic!("host error - {}", host_err);
6243        }
6244        let host_project = host_client.project.as_ref().unwrap();
6245        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6246            project
6247                .worktrees(cx)
6248                .map(|worktree| {
6249                    let snapshot = worktree.read(cx).snapshot();
6250                    (snapshot.id(), snapshot)
6251                })
6252                .collect::<BTreeMap<_, _>>()
6253        });
6254
6255        host_client
6256            .project
6257            .as_ref()
6258            .unwrap()
6259            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6260
6261        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6262            if let Some(guest_err) = guest_err {
6263                panic!("{} error - {}", guest_client.username, guest_err);
6264            }
6265            let worktree_snapshots =
6266                guest_client
6267                    .project
6268                    .as_ref()
6269                    .unwrap()
6270                    .read_with(&guest_cx, |project, cx| {
6271                        project
6272                            .worktrees(cx)
6273                            .map(|worktree| {
6274                                let worktree = worktree.read(cx);
6275                                (worktree.id(), worktree.snapshot())
6276                            })
6277                            .collect::<BTreeMap<_, _>>()
6278                    });
6279
6280            assert_eq!(
6281                worktree_snapshots.keys().collect::<Vec<_>>(),
6282                host_worktree_snapshots.keys().collect::<Vec<_>>(),
6283                "{} has different worktrees than the host",
6284                guest_client.username
6285            );
6286            for (id, host_snapshot) in &host_worktree_snapshots {
6287                let guest_snapshot = &worktree_snapshots[id];
6288                assert_eq!(
6289                    guest_snapshot.root_name(),
6290                    host_snapshot.root_name(),
6291                    "{} has different root name than the host for worktree {}",
6292                    guest_client.username,
6293                    id
6294                );
6295                assert_eq!(
6296                    guest_snapshot.entries(false).collect::<Vec<_>>(),
6297                    host_snapshot.entries(false).collect::<Vec<_>>(),
6298                    "{} has different snapshot than the host for worktree {}",
6299                    guest_client.username,
6300                    id
6301                );
6302                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6303            }
6304
6305            guest_client
6306                .project
6307                .as_ref()
6308                .unwrap()
6309                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6310
6311            for guest_buffer in &guest_client.buffers {
6312                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6313                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6314                    project.buffer_for_id(buffer_id, cx).expect(&format!(
6315                        "host does not have buffer for guest:{}, peer:{}, id:{}",
6316                        guest_client.username, guest_client.peer_id, buffer_id
6317                    ))
6318                });
6319                let path = host_buffer
6320                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6321
6322                assert_eq!(
6323                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6324                    0,
6325                    "{}, buffer {}, path {:?} has deferred operations",
6326                    guest_client.username,
6327                    buffer_id,
6328                    path,
6329                );
6330                assert_eq!(
6331                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6332                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6333                    "{}, buffer {}, path {:?}, differs from the host's buffer",
6334                    guest_client.username,
6335                    buffer_id,
6336                    path
6337                );
6338            }
6339
6340            guest_cx.update(|_| drop(guest_client));
6341        }
6342
6343        host_cx.update(|_| drop(host_client));
6344    }
6345
6346    struct TestServer {
6347        peer: Arc<Peer>,
6348        app_state: Arc<AppState>,
6349        server: Arc<Server>,
6350        foreground: Rc<executor::Foreground>,
6351        notifications: mpsc::UnboundedReceiver<()>,
6352        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6353        forbid_connections: Arc<AtomicBool>,
6354        _test_db: TestDb,
6355    }
6356
6357    impl TestServer {
6358        async fn start(
6359            foreground: Rc<executor::Foreground>,
6360            background: Arc<executor::Background>,
6361        ) -> Self {
6362            let test_db = TestDb::fake(background);
6363            let app_state = Self::build_app_state(&test_db).await;
6364            let peer = Peer::new();
6365            let notifications = mpsc::unbounded();
6366            let server = Server::new(app_state.clone(), Some(notifications.0));
6367            Self {
6368                peer,
6369                app_state,
6370                server,
6371                foreground,
6372                notifications: notifications.1,
6373                connection_killers: Default::default(),
6374                forbid_connections: Default::default(),
6375                _test_db: test_db,
6376            }
6377        }
6378
6379        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6380            cx.update(|cx| {
6381                let settings = Settings::test(cx);
6382                cx.set_global(settings);
6383            });
6384
6385            let http = FakeHttpClient::with_404_response();
6386            let user_id =
6387                if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6388                    user.id
6389                } else {
6390                    self.app_state.db.create_user(name, false).await.unwrap()
6391                };
6392            let client_name = name.to_string();
6393            let mut client = Client::new(http.clone());
6394            let server = self.server.clone();
6395            let connection_killers = self.connection_killers.clone();
6396            let forbid_connections = self.forbid_connections.clone();
6397            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6398
6399            Arc::get_mut(&mut client)
6400                .unwrap()
6401                .override_authenticate(move |cx| {
6402                    cx.spawn(|_| async move {
6403                        let access_token = "the-token".to_string();
6404                        Ok(Credentials {
6405                            user_id: user_id.0 as u64,
6406                            access_token,
6407                        })
6408                    })
6409                })
6410                .override_establish_connection(move |credentials, cx| {
6411                    assert_eq!(credentials.user_id, user_id.0 as u64);
6412                    assert_eq!(credentials.access_token, "the-token");
6413
6414                    let server = server.clone();
6415                    let connection_killers = connection_killers.clone();
6416                    let forbid_connections = forbid_connections.clone();
6417                    let client_name = client_name.clone();
6418                    let connection_id_tx = connection_id_tx.clone();
6419                    cx.spawn(move |cx| async move {
6420                        if forbid_connections.load(SeqCst) {
6421                            Err(EstablishConnectionError::other(anyhow!(
6422                                "server is forbidding connections"
6423                            )))
6424                        } else {
6425                            let (client_conn, server_conn, killed) =
6426                                Connection::in_memory(cx.background());
6427                            connection_killers.lock().insert(user_id, killed);
6428                            cx.background()
6429                                .spawn(server.handle_connection(
6430                                    server_conn,
6431                                    client_name,
6432                                    user_id,
6433                                    Some(connection_id_tx),
6434                                    cx.background(),
6435                                ))
6436                                .detach();
6437                            Ok(client_conn)
6438                        }
6439                    })
6440                });
6441
6442            client
6443                .authenticate_and_connect(false, &cx.to_async())
6444                .await
6445                .unwrap();
6446
6447            Channel::init(&client);
6448            Project::init(&client);
6449            cx.update(|cx| {
6450                workspace::init(&client, cx);
6451            });
6452
6453            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6454            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6455
6456            let client = TestClient {
6457                client,
6458                peer_id,
6459                username: name.to_string(),
6460                user_store,
6461                language_registry: Arc::new(LanguageRegistry::test()),
6462                project: Default::default(),
6463                buffers: Default::default(),
6464            };
6465            client.wait_for_current_user(cx).await;
6466            client
6467        }
6468
6469        fn disconnect_client(&self, user_id: UserId) {
6470            self.connection_killers
6471                .lock()
6472                .remove(&user_id)
6473                .unwrap()
6474                .store(true, SeqCst);
6475        }
6476
6477        fn forbid_connections(&self) {
6478            self.forbid_connections.store(true, SeqCst);
6479        }
6480
6481        fn allow_connections(&self) {
6482            self.forbid_connections.store(false, SeqCst);
6483        }
6484
6485        async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6486            while let Some((client_a, cx_a)) = clients.pop() {
6487                for (client_b, cx_b) in &mut clients {
6488                    client_a
6489                        .user_store
6490                        .update(cx_a, |store, cx| {
6491                            store.request_contact(client_b.user_id().unwrap(), cx)
6492                        })
6493                        .await
6494                        .unwrap();
6495                    cx_a.foreground().run_until_parked();
6496                    client_b
6497                        .user_store
6498                        .update(*cx_b, |store, cx| {
6499                            store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6500                        })
6501                        .await
6502                        .unwrap();
6503                }
6504            }
6505        }
6506
6507        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6508            Arc::new(AppState {
6509                db: test_db.db().clone(),
6510                api_token: Default::default(),
6511            })
6512        }
6513
6514        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6515            self.server.store.read().await
6516        }
6517
6518        async fn condition<F>(&mut self, mut predicate: F)
6519        where
6520            F: FnMut(&Store) -> bool,
6521        {
6522            assert!(
6523                self.foreground.parking_forbidden(),
6524                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6525            );
6526            while !(predicate)(&*self.server.store.read().await) {
6527                self.foreground.start_waiting();
6528                self.notifications.next().await;
6529                self.foreground.finish_waiting();
6530            }
6531        }
6532    }
6533
6534    impl Deref for TestServer {
6535        type Target = Server;
6536
6537        fn deref(&self) -> &Self::Target {
6538            &self.server
6539        }
6540    }
6541
6542    impl Drop for TestServer {
6543        fn drop(&mut self) {
6544            self.peer.reset();
6545        }
6546    }
6547
6548    struct TestClient {
6549        client: Arc<Client>,
6550        username: String,
6551        pub peer_id: PeerId,
6552        pub user_store: ModelHandle<UserStore>,
6553        language_registry: Arc<LanguageRegistry>,
6554        project: Option<ModelHandle<Project>>,
6555        buffers: HashSet<ModelHandle<language::Buffer>>,
6556    }
6557
6558    impl Deref for TestClient {
6559        type Target = Arc<Client>;
6560
6561        fn deref(&self) -> &Self::Target {
6562            &self.client
6563        }
6564    }
6565
6566    struct ContactsSummary {
6567        pub current: Vec<String>,
6568        pub outgoing_requests: Vec<String>,
6569        pub incoming_requests: Vec<String>,
6570    }
6571
6572    impl TestClient {
6573        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6574            UserId::from_proto(
6575                self.user_store
6576                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6577            )
6578        }
6579
6580        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6581            let mut authed_user = self
6582                .user_store
6583                .read_with(cx, |user_store, _| user_store.watch_current_user());
6584            while authed_user.next().await.unwrap().is_none() {}
6585        }
6586
6587        async fn clear_contacts(&self, cx: &mut TestAppContext) {
6588            self.user_store
6589                .update(cx, |store, _| store.clear_contacts())
6590                .await;
6591        }
6592
6593        fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6594            self.user_store.read_with(cx, |store, _| ContactsSummary {
6595                current: store
6596                    .contacts()
6597                    .iter()
6598                    .map(|contact| contact.user.github_login.clone())
6599                    .collect(),
6600                outgoing_requests: store
6601                    .outgoing_contact_requests()
6602                    .iter()
6603                    .map(|user| user.github_login.clone())
6604                    .collect(),
6605                incoming_requests: store
6606                    .incoming_contact_requests()
6607                    .iter()
6608                    .map(|user| user.github_login.clone())
6609                    .collect(),
6610            })
6611        }
6612
6613        async fn build_local_project(
6614            &mut self,
6615            fs: Arc<FakeFs>,
6616            root_path: impl AsRef<Path>,
6617            cx: &mut TestAppContext,
6618        ) -> (ModelHandle<Project>, WorktreeId) {
6619            let project = cx.update(|cx| {
6620                Project::local(
6621                    self.client.clone(),
6622                    self.user_store.clone(),
6623                    self.language_registry.clone(),
6624                    fs,
6625                    cx,
6626                )
6627            });
6628            self.project = Some(project.clone());
6629            let (worktree, _) = project
6630                .update(cx, |p, cx| {
6631                    p.find_or_create_local_worktree(root_path, true, cx)
6632                })
6633                .await
6634                .unwrap();
6635            worktree
6636                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6637                .await;
6638            project
6639                .update(cx, |project, _| project.next_remote_id())
6640                .await;
6641            (project, worktree.read_with(cx, |tree, _| tree.id()))
6642        }
6643
6644        async fn build_remote_project(
6645            &mut self,
6646            project_id: u64,
6647            cx: &mut TestAppContext,
6648        ) -> ModelHandle<Project> {
6649            let project = Project::remote(
6650                project_id,
6651                self.client.clone(),
6652                self.user_store.clone(),
6653                self.language_registry.clone(),
6654                FakeFs::new(cx.background()),
6655                &mut cx.to_async(),
6656            )
6657            .await
6658            .unwrap();
6659            self.project = Some(project.clone());
6660            project
6661        }
6662
6663        fn build_workspace(
6664            &self,
6665            project: &ModelHandle<Project>,
6666            cx: &mut TestAppContext,
6667        ) -> ViewHandle<Workspace> {
6668            let (window_id, _) = cx.add_window(|_| EmptyView);
6669            cx.add_view(window_id, |cx| {
6670                let fs = project.read(cx).fs().clone();
6671                Workspace::new(
6672                    &WorkspaceParams {
6673                        fs,
6674                        project: project.clone(),
6675                        user_store: self.user_store.clone(),
6676                        languages: self.language_registry.clone(),
6677                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6678                        channel_list: cx.add_model(|cx| {
6679                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6680                        }),
6681                        client: self.client.clone(),
6682                    },
6683                    cx,
6684                )
6685            })
6686        }
6687
6688        async fn simulate_host(
6689            mut self,
6690            project: ModelHandle<Project>,
6691            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6692            rng: Arc<Mutex<StdRng>>,
6693            mut cx: TestAppContext,
6694        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6695            async fn simulate_host_internal(
6696                client: &mut TestClient,
6697                project: ModelHandle<Project>,
6698                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6699                rng: Arc<Mutex<StdRng>>,
6700                cx: &mut TestAppContext,
6701            ) -> anyhow::Result<()> {
6702                let fs = project.read_with(cx, |project, _| project.fs().clone());
6703
6704                while op_start_signal.next().await.is_some() {
6705                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6706                    let files = fs.as_fake().files().await;
6707                    match distribution {
6708                        0..=20 if !files.is_empty() => {
6709                            let path = files.choose(&mut *rng.lock()).unwrap();
6710                            let mut path = path.as_path();
6711                            while let Some(parent_path) = path.parent() {
6712                                path = parent_path;
6713                                if rng.lock().gen() {
6714                                    break;
6715                                }
6716                            }
6717
6718                            log::info!("Host: find/create local worktree {:?}", path);
6719                            let find_or_create_worktree = project.update(cx, |project, cx| {
6720                                project.find_or_create_local_worktree(path, true, cx)
6721                            });
6722                            if rng.lock().gen() {
6723                                cx.background().spawn(find_or_create_worktree).detach();
6724                            } else {
6725                                find_or_create_worktree.await?;
6726                            }
6727                        }
6728                        10..=80 if !files.is_empty() => {
6729                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6730                                let file = files.choose(&mut *rng.lock()).unwrap();
6731                                let (worktree, path) = project
6732                                    .update(cx, |project, cx| {
6733                                        project.find_or_create_local_worktree(
6734                                            file.clone(),
6735                                            true,
6736                                            cx,
6737                                        )
6738                                    })
6739                                    .await?;
6740                                let project_path =
6741                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6742                                log::info!(
6743                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6744                                    file,
6745                                    project_path.0,
6746                                    project_path.1
6747                                );
6748                                let buffer = project
6749                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6750                                    .await
6751                                    .unwrap();
6752                                client.buffers.insert(buffer.clone());
6753                                buffer
6754                            } else {
6755                                client
6756                                    .buffers
6757                                    .iter()
6758                                    .choose(&mut *rng.lock())
6759                                    .unwrap()
6760                                    .clone()
6761                            };
6762
6763                            if rng.lock().gen_bool(0.1) {
6764                                cx.update(|cx| {
6765                                    log::info!(
6766                                        "Host: dropping buffer {:?}",
6767                                        buffer.read(cx).file().unwrap().full_path(cx)
6768                                    );
6769                                    client.buffers.remove(&buffer);
6770                                    drop(buffer);
6771                                });
6772                            } else {
6773                                buffer.update(cx, |buffer, cx| {
6774                                    log::info!(
6775                                        "Host: updating buffer {:?} ({})",
6776                                        buffer.file().unwrap().full_path(cx),
6777                                        buffer.remote_id()
6778                                    );
6779
6780                                    if rng.lock().gen_bool(0.7) {
6781                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6782                                    } else {
6783                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6784                                    }
6785                                });
6786                            }
6787                        }
6788                        _ => loop {
6789                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6790                            let mut path = PathBuf::new();
6791                            path.push("/");
6792                            for _ in 0..path_component_count {
6793                                let letter = rng.lock().gen_range(b'a'..=b'z');
6794                                path.push(std::str::from_utf8(&[letter]).unwrap());
6795                            }
6796                            path.set_extension("rs");
6797                            let parent_path = path.parent().unwrap();
6798
6799                            log::info!("Host: creating file {:?}", path,);
6800
6801                            if fs.create_dir(&parent_path).await.is_ok()
6802                                && fs.create_file(&path, Default::default()).await.is_ok()
6803                            {
6804                                break;
6805                            } else {
6806                                log::info!("Host: cannot create file");
6807                            }
6808                        },
6809                    }
6810
6811                    cx.background().simulate_random_delay().await;
6812                }
6813
6814                Ok(())
6815            }
6816
6817            let result =
6818                simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
6819                    .await;
6820            log::info!("Host done");
6821            self.project = Some(project);
6822            (self, cx, result.err())
6823        }
6824
6825        pub async fn simulate_guest(
6826            mut self,
6827            guest_username: String,
6828            project: ModelHandle<Project>,
6829            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6830            rng: Arc<Mutex<StdRng>>,
6831            mut cx: TestAppContext,
6832        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6833            async fn simulate_guest_internal(
6834                client: &mut TestClient,
6835                guest_username: &str,
6836                project: ModelHandle<Project>,
6837                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6838                rng: Arc<Mutex<StdRng>>,
6839                cx: &mut TestAppContext,
6840            ) -> anyhow::Result<()> {
6841                while op_start_signal.next().await.is_some() {
6842                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6843                        let worktree = if let Some(worktree) =
6844                            project.read_with(cx, |project, cx| {
6845                                project
6846                                    .worktrees(&cx)
6847                                    .filter(|worktree| {
6848                                        let worktree = worktree.read(cx);
6849                                        worktree.is_visible()
6850                                            && worktree.entries(false).any(|e| e.is_file())
6851                                    })
6852                                    .choose(&mut *rng.lock())
6853                            }) {
6854                            worktree
6855                        } else {
6856                            cx.background().simulate_random_delay().await;
6857                            continue;
6858                        };
6859
6860                        let (worktree_root_name, project_path) =
6861                            worktree.read_with(cx, |worktree, _| {
6862                                let entry = worktree
6863                                    .entries(false)
6864                                    .filter(|e| e.is_file())
6865                                    .choose(&mut *rng.lock())
6866                                    .unwrap();
6867                                (
6868                                    worktree.root_name().to_string(),
6869                                    (worktree.id(), entry.path.clone()),
6870                                )
6871                            });
6872                        log::info!(
6873                            "{}: opening path {:?} in worktree {} ({})",
6874                            guest_username,
6875                            project_path.1,
6876                            project_path.0,
6877                            worktree_root_name,
6878                        );
6879                        let buffer = project
6880                            .update(cx, |project, cx| {
6881                                project.open_buffer(project_path.clone(), cx)
6882                            })
6883                            .await?;
6884                        log::info!(
6885                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6886                            guest_username,
6887                            project_path.1,
6888                            project_path.0,
6889                            worktree_root_name,
6890                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6891                        );
6892                        client.buffers.insert(buffer.clone());
6893                        buffer
6894                    } else {
6895                        client
6896                            .buffers
6897                            .iter()
6898                            .choose(&mut *rng.lock())
6899                            .unwrap()
6900                            .clone()
6901                    };
6902
6903                    let choice = rng.lock().gen_range(0..100);
6904                    match choice {
6905                        0..=9 => {
6906                            cx.update(|cx| {
6907                                log::info!(
6908                                    "{}: dropping buffer {:?}",
6909                                    guest_username,
6910                                    buffer.read(cx).file().unwrap().full_path(cx)
6911                                );
6912                                client.buffers.remove(&buffer);
6913                                drop(buffer);
6914                            });
6915                        }
6916                        10..=19 => {
6917                            let completions = project.update(cx, |project, cx| {
6918                                log::info!(
6919                                    "{}: requesting completions for buffer {} ({:?})",
6920                                    guest_username,
6921                                    buffer.read(cx).remote_id(),
6922                                    buffer.read(cx).file().unwrap().full_path(cx)
6923                                );
6924                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6925                                project.completions(&buffer, offset, cx)
6926                            });
6927                            let completions = cx.background().spawn(async move {
6928                                completions
6929                                    .await
6930                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6931                            });
6932                            if rng.lock().gen_bool(0.3) {
6933                                log::info!("{}: detaching completions request", guest_username);
6934                                cx.update(|cx| completions.detach_and_log_err(cx));
6935                            } else {
6936                                completions.await?;
6937                            }
6938                        }
6939                        20..=29 => {
6940                            let code_actions = project.update(cx, |project, cx| {
6941                                log::info!(
6942                                    "{}: requesting code actions for buffer {} ({:?})",
6943                                    guest_username,
6944                                    buffer.read(cx).remote_id(),
6945                                    buffer.read(cx).file().unwrap().full_path(cx)
6946                                );
6947                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6948                                project.code_actions(&buffer, range, cx)
6949                            });
6950                            let code_actions = cx.background().spawn(async move {
6951                                code_actions.await.map_err(|err| {
6952                                    anyhow!("code actions request failed: {:?}", err)
6953                                })
6954                            });
6955                            if rng.lock().gen_bool(0.3) {
6956                                log::info!("{}: detaching code actions request", guest_username);
6957                                cx.update(|cx| code_actions.detach_and_log_err(cx));
6958                            } else {
6959                                code_actions.await?;
6960                            }
6961                        }
6962                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6963                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6964                                log::info!(
6965                                    "{}: saving buffer {} ({:?})",
6966                                    guest_username,
6967                                    buffer.remote_id(),
6968                                    buffer.file().unwrap().full_path(cx)
6969                                );
6970                                (buffer.version(), buffer.save(cx))
6971                            });
6972                            let save = cx.background().spawn(async move {
6973                                let (saved_version, _) = save
6974                                    .await
6975                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6976                                assert!(saved_version.observed_all(&requested_version));
6977                                Ok::<_, anyhow::Error>(())
6978                            });
6979                            if rng.lock().gen_bool(0.3) {
6980                                log::info!("{}: detaching save request", guest_username);
6981                                cx.update(|cx| save.detach_and_log_err(cx));
6982                            } else {
6983                                save.await?;
6984                            }
6985                        }
6986                        40..=44 => {
6987                            let prepare_rename = project.update(cx, |project, cx| {
6988                                log::info!(
6989                                    "{}: preparing rename for buffer {} ({:?})",
6990                                    guest_username,
6991                                    buffer.read(cx).remote_id(),
6992                                    buffer.read(cx).file().unwrap().full_path(cx)
6993                                );
6994                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6995                                project.prepare_rename(buffer, offset, cx)
6996                            });
6997                            let prepare_rename = cx.background().spawn(async move {
6998                                prepare_rename.await.map_err(|err| {
6999                                    anyhow!("prepare rename request failed: {:?}", err)
7000                                })
7001                            });
7002                            if rng.lock().gen_bool(0.3) {
7003                                log::info!("{}: detaching prepare rename request", guest_username);
7004                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7005                            } else {
7006                                prepare_rename.await?;
7007                            }
7008                        }
7009                        45..=49 => {
7010                            let definitions = project.update(cx, |project, cx| {
7011                                log::info!(
7012                                    "{}: requesting definitions for buffer {} ({:?})",
7013                                    guest_username,
7014                                    buffer.read(cx).remote_id(),
7015                                    buffer.read(cx).file().unwrap().full_path(cx)
7016                                );
7017                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7018                                project.definition(&buffer, offset, cx)
7019                            });
7020                            let definitions = cx.background().spawn(async move {
7021                                definitions
7022                                    .await
7023                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7024                            });
7025                            if rng.lock().gen_bool(0.3) {
7026                                log::info!("{}: detaching definitions request", guest_username);
7027                                cx.update(|cx| definitions.detach_and_log_err(cx));
7028                            } else {
7029                                client
7030                                    .buffers
7031                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7032                            }
7033                        }
7034                        50..=54 => {
7035                            let highlights = project.update(cx, |project, cx| {
7036                                log::info!(
7037                                    "{}: requesting highlights for buffer {} ({:?})",
7038                                    guest_username,
7039                                    buffer.read(cx).remote_id(),
7040                                    buffer.read(cx).file().unwrap().full_path(cx)
7041                                );
7042                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7043                                project.document_highlights(&buffer, offset, cx)
7044                            });
7045                            let highlights = cx.background().spawn(async move {
7046                                highlights
7047                                    .await
7048                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7049                            });
7050                            if rng.lock().gen_bool(0.3) {
7051                                log::info!("{}: detaching highlights request", guest_username);
7052                                cx.update(|cx| highlights.detach_and_log_err(cx));
7053                            } else {
7054                                highlights.await?;
7055                            }
7056                        }
7057                        55..=59 => {
7058                            let search = project.update(cx, |project, cx| {
7059                                let query = rng.lock().gen_range('a'..='z');
7060                                log::info!("{}: project-wide search {:?}", guest_username, query);
7061                                project.search(SearchQuery::text(query, false, false), cx)
7062                            });
7063                            let search = cx.background().spawn(async move {
7064                                search
7065                                    .await
7066                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
7067                            });
7068                            if rng.lock().gen_bool(0.3) {
7069                                log::info!("{}: detaching search request", guest_username);
7070                                cx.update(|cx| search.detach_and_log_err(cx));
7071                            } else {
7072                                client.buffers.extend(search.await?.into_keys());
7073                            }
7074                        }
7075                        60..=69 => {
7076                            let worktree = project
7077                                .read_with(cx, |project, cx| {
7078                                    project
7079                                        .worktrees(&cx)
7080                                        .filter(|worktree| {
7081                                            let worktree = worktree.read(cx);
7082                                            worktree.is_visible()
7083                                                && worktree.entries(false).any(|e| e.is_file())
7084                                                && worktree
7085                                                    .root_entry()
7086                                                    .map_or(false, |e| e.is_dir())
7087                                        })
7088                                        .choose(&mut *rng.lock())
7089                                })
7090                                .unwrap();
7091                            let (worktree_id, worktree_root_name) = worktree
7092                                .read_with(cx, |worktree, _| {
7093                                    (worktree.id(), worktree.root_name().to_string())
7094                                });
7095
7096                            let mut new_name = String::new();
7097                            for _ in 0..10 {
7098                                let letter = rng.lock().gen_range('a'..='z');
7099                                new_name.push(letter);
7100                            }
7101                            let mut new_path = PathBuf::new();
7102                            new_path.push(new_name);
7103                            new_path.set_extension("rs");
7104                            log::info!(
7105                                "{}: creating {:?} in worktree {} ({})",
7106                                guest_username,
7107                                new_path,
7108                                worktree_id,
7109                                worktree_root_name,
7110                            );
7111                            project
7112                                .update(cx, |project, cx| {
7113                                    project.create_entry((worktree_id, new_path), false, cx)
7114                                })
7115                                .unwrap()
7116                                .await?;
7117                        }
7118                        _ => {
7119                            buffer.update(cx, |buffer, cx| {
7120                                log::info!(
7121                                    "{}: updating buffer {} ({:?})",
7122                                    guest_username,
7123                                    buffer.remote_id(),
7124                                    buffer.file().unwrap().full_path(cx)
7125                                );
7126                                if rng.lock().gen_bool(0.7) {
7127                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7128                                } else {
7129                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7130                                }
7131                            });
7132                        }
7133                    }
7134                    cx.background().simulate_random_delay().await;
7135                }
7136                Ok(())
7137            }
7138
7139            let result = simulate_guest_internal(
7140                &mut self,
7141                &guest_username,
7142                project.clone(),
7143                op_start_signal,
7144                rng,
7145                &mut cx,
7146            )
7147            .await;
7148            log::info!("{}: done", guest_username);
7149
7150            self.project = Some(project);
7151            (self, cx, result.err())
7152        }
7153    }
7154
7155    impl Drop for TestClient {
7156        fn drop(&mut self) {
7157            self.client.tear_down();
7158        }
7159    }
7160
7161    impl Executor for Arc<gpui::executor::Background> {
7162        type Sleep = gpui::executor::Timer;
7163
7164        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7165            self.spawn(future).detach();
7166        }
7167
7168        fn sleep(&self, duration: Duration) -> Self::Sleep {
7169            self.as_ref().timer(duration)
7170        }
7171    }
7172
7173    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7174        channel
7175            .messages()
7176            .cursor::<()>()
7177            .map(|m| {
7178                (
7179                    m.sender.github_login.clone(),
7180                    m.body.clone(),
7181                    m.is_pending(),
7182                )
7183            })
7184            .collect()
7185    }
7186
7187    struct EmptyView;
7188
7189    impl gpui::Entity for EmptyView {
7190        type Event = ();
7191    }
7192
7193    impl gpui::View for EmptyView {
7194        fn ui_name() -> &'static str {
7195            "empty view"
7196        }
7197
7198        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7199            gpui::Element::boxed(gpui::elements::Empty)
7200        }
7201    }
7202}