rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{self, ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::IntoResponse,
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::HashMap;
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::{
  40        atomic::{AtomicBool, Ordering::SeqCst},
  41        Arc,
  42    },
  43    time::Duration,
  44};
  45use store::{Store, Worktree};
  46use time::OffsetDateTime;
  47use tokio::{
  48    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  49    time::Sleep,
  50};
  51use tower::ServiceBuilder;
  52use tracing::{info_span, Instrument};
  53
  54type MessageHandler =
  55    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  56
  57struct Response<R> {
  58    server: Arc<Server>,
  59    receipt: Receipt<R>,
  60    responded: Arc<AtomicBool>,
  61}
  62
  63impl<R: RequestMessage> Response<R> {
  64    fn send(self, payload: R::Response) -> Result<()> {
  65        self.responded.store(true, SeqCst);
  66        self.server.peer.respond(self.receipt, payload)?;
  67        Ok(())
  68    }
  69}
  70
  71pub struct Server {
  72    peer: Arc<Peer>,
  73    store: RwLock<Store>,
  74    app_state: Arc<AppState>,
  75    handlers: HashMap<TypeId, MessageHandler>,
  76    notifications: Option<mpsc::UnboundedSender<()>>,
  77}
  78
  79pub trait Executor: Send + Clone {
  80    type Sleep: Send + Future;
  81    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  82    fn sleep(&self, duration: Duration) -> Self::Sleep;
  83}
  84
  85#[derive(Clone)]
  86pub struct RealExecutor;
  87
  88const MESSAGE_COUNT_PER_PAGE: usize = 100;
  89const MAX_MESSAGE_LEN: usize = 1024;
  90
  91struct StoreReadGuard<'a> {
  92    guard: RwLockReadGuard<'a, Store>,
  93    _not_send: PhantomData<Rc<()>>,
  94}
  95
  96struct StoreWriteGuard<'a> {
  97    guard: RwLockWriteGuard<'a, Store>,
  98    _not_send: PhantomData<Rc<()>>,
  99}
 100
 101impl Server {
 102    pub fn new(
 103        app_state: Arc<AppState>,
 104        notifications: Option<mpsc::UnboundedSender<()>>,
 105    ) -> Arc<Self> {
 106        let mut server = Self {
 107            peer: Peer::new(),
 108            app_state,
 109            store: Default::default(),
 110            handlers: Default::default(),
 111            notifications,
 112        };
 113
 114        server
 115            .add_request_handler(Server::ping)
 116            .add_request_handler(Server::register_project)
 117            .add_message_handler(Server::unregister_project)
 118            .add_request_handler(Server::share_project)
 119            .add_message_handler(Server::unshare_project)
 120            .add_request_handler(Server::join_project)
 121            .add_message_handler(Server::leave_project)
 122            .add_request_handler(Server::register_worktree)
 123            .add_message_handler(Server::unregister_worktree)
 124            .add_request_handler(Server::update_worktree)
 125            .add_message_handler(Server::start_language_server)
 126            .add_message_handler(Server::update_language_server)
 127            .add_message_handler(Server::update_diagnostic_summary)
 128            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 129            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 130            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 131            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 132            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 133            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 134            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 135            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 136            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 137            .add_request_handler(
 138                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 139            )
 140            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 141            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 142            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 143            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 144            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 145            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 146            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 147            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 148            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 149            .add_request_handler(Server::update_buffer)
 150            .add_message_handler(Server::update_buffer_file)
 151            .add_message_handler(Server::buffer_reloaded)
 152            .add_message_handler(Server::buffer_saved)
 153            .add_request_handler(Server::save_buffer)
 154            .add_request_handler(Server::get_channels)
 155            .add_request_handler(Server::get_users)
 156            .add_request_handler(Server::fuzzy_search_users)
 157            .add_request_handler(Server::request_contact)
 158            .add_request_handler(Server::remove_contact)
 159            .add_request_handler(Server::respond_to_contact_request)
 160            .add_request_handler(Server::join_channel)
 161            .add_message_handler(Server::leave_channel)
 162            .add_request_handler(Server::send_channel_message)
 163            .add_request_handler(Server::follow)
 164            .add_message_handler(Server::unfollow)
 165            .add_message_handler(Server::update_followers)
 166            .add_request_handler(Server::get_channel_messages);
 167
 168        Arc::new(server)
 169    }
 170
 171    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 172    where
 173        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 174        Fut: 'static + Send + Future<Output = Result<()>>,
 175        M: EnvelopedMessage,
 176    {
 177        let prev_handler = self.handlers.insert(
 178            TypeId::of::<M>(),
 179            Box::new(move |server, envelope| {
 180                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 181                let span = info_span!(
 182                    "handle message",
 183                    payload_type = envelope.payload_type_name(),
 184                    payload = format!("{:?}", envelope.payload).as_str(),
 185                );
 186                let future = (handler)(server, *envelope);
 187                async move {
 188                    if let Err(error) = future.await {
 189                        tracing::error!(%error, "error handling message");
 190                    }
 191                }
 192                .instrument(span)
 193                .boxed()
 194            }),
 195        );
 196        if prev_handler.is_some() {
 197            panic!("registered a handler for the same message twice");
 198        }
 199        self
 200    }
 201
 202    /// Handle a request while holding a lock to the store. This is useful when we're registering
 203    /// a connection but we want to respond on the connection before anybody else can send on it.
 204    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 205    where
 206        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
 207        Fut: Send + Future<Output = Result<()>>,
 208        M: RequestMessage,
 209    {
 210        let handler = Arc::new(handler);
 211        self.add_message_handler(move |server, envelope| {
 212            let receipt = envelope.receipt();
 213            let handler = handler.clone();
 214            async move {
 215                let responded = Arc::new(AtomicBool::default());
 216                let response = Response {
 217                    server: server.clone(),
 218                    responded: responded.clone(),
 219                    receipt: envelope.receipt(),
 220                };
 221                match (handler)(server.clone(), envelope, response).await {
 222                    Ok(()) => {
 223                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 224                            Ok(())
 225                        } else {
 226                            Err(anyhow!("handler did not send a response"))?
 227                        }
 228                    }
 229                    Err(error) => {
 230                        server.peer.respond_with_error(
 231                            receipt,
 232                            proto::Error {
 233                                message: error.to_string(),
 234                            },
 235                        )?;
 236                        Err(error)
 237                    }
 238                }
 239            }
 240        })
 241    }
 242
 243    pub fn handle_connection<E: Executor>(
 244        self: &Arc<Self>,
 245        connection: Connection,
 246        address: String,
 247        user_id: UserId,
 248        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 249        executor: E,
 250    ) -> impl Future<Output = Result<()>> {
 251        let mut this = self.clone();
 252        let span = info_span!("handle connection", %user_id, %address);
 253        async move {
 254            let (connection_id, handle_io, mut incoming_rx) = this
 255                .peer
 256                .add_connection(connection, {
 257                    let executor = executor.clone();
 258                    move |duration| {
 259                        let timer = executor.sleep(duration);
 260                        async move {
 261                            timer.await;
 262                        }
 263                    }
 264                })
 265                .await;
 266
 267            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 268
 269            if let Some(send_connection_id) = send_connection_id.as_mut() {
 270                let _ = send_connection_id.send(connection_id).await;
 271            }
 272
 273            let contacts = this.app_state.db.get_contacts(user_id).await?;
 274
 275            {
 276                let mut store = this.store_mut().await;
 277                store.add_connection(connection_id, user_id);
 278                this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 279            }
 280            this.update_user_contacts(user_id).await?;
 281
 282            let handle_io = handle_io.fuse();
 283            futures::pin_mut!(handle_io);
 284            loop {
 285                let next_message = incoming_rx.next().fuse();
 286                futures::pin_mut!(next_message);
 287                futures::select_biased! {
 288                    result = handle_io => {
 289                        if let Err(error) = result {
 290                            tracing::error!(%error, "error handling I/O");
 291                        }
 292                        break;
 293                    }
 294                    message = next_message => {
 295                        if let Some(message) = message {
 296                            let type_name = message.payload_type_name();
 297                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 298                            async {
 299                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 300                                    let notifications = this.notifications.clone();
 301                                    let is_background = message.is_background();
 302                                    let handle_message = (handler)(this.clone(), message);
 303                                    let handle_message = async move {
 304                                        handle_message.await;
 305                                        if let Some(mut notifications) = notifications {
 306                                            let _ = notifications.send(()).await;
 307                                        }
 308                                    };
 309                                    if is_background {
 310                                        executor.spawn_detached(handle_message);
 311                                    } else {
 312                                        handle_message.await;
 313                                    }
 314                                } else {
 315                                    tracing::error!("no message handler");
 316                                }
 317                            }.instrument(span).await;
 318                        } else {
 319                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 320                            break;
 321                        }
 322                    }
 323                }
 324            }
 325
 326            if let Err(error) = this.sign_out(connection_id).await {
 327                tracing::error!(%error, "error signing out");
 328            }
 329
 330            Ok(())
 331        }.instrument(span)
 332    }
 333
 334    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 335        self.peer.disconnect(connection_id);
 336        let removed_connection = self.store_mut().await.remove_connection(connection_id)?;
 337
 338        for (project_id, project) in removed_connection.hosted_projects {
 339            if let Some(share) = project.share {
 340                broadcast(connection_id, share.guests.keys().copied(), |conn_id| {
 341                    self.peer
 342                        .send(conn_id, proto::UnshareProject { project_id })
 343                });
 344            }
 345        }
 346
 347        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 348            broadcast(connection_id, peer_ids, |conn_id| {
 349                self.peer.send(
 350                    conn_id,
 351                    proto::RemoveProjectCollaborator {
 352                        project_id,
 353                        peer_id: connection_id.0,
 354                    },
 355                )
 356            });
 357        }
 358
 359        self.update_user_contacts(removed_connection.user_id)
 360            .await?;
 361
 362        Ok(())
 363    }
 364
 365    async fn ping(
 366        self: Arc<Server>,
 367        _: TypedEnvelope<proto::Ping>,
 368        response: Response<proto::Ping>,
 369    ) -> Result<()> {
 370        response.send(proto::Ack {})?;
 371        Ok(())
 372    }
 373
 374    async fn register_project(
 375        self: Arc<Server>,
 376        request: TypedEnvelope<proto::RegisterProject>,
 377        response: Response<proto::RegisterProject>,
 378    ) -> Result<()> {
 379        let user_id;
 380        let project_id;
 381        {
 382            let mut state = self.store_mut().await;
 383            user_id = state.user_id_for_connection(request.sender_id)?;
 384            project_id = state.register_project(request.sender_id, user_id);
 385        };
 386        self.update_user_contacts(user_id).await?;
 387        response.send(proto::RegisterProjectResponse { project_id })?;
 388        Ok(())
 389    }
 390
 391    async fn unregister_project(
 392        self: Arc<Server>,
 393        request: TypedEnvelope<proto::UnregisterProject>,
 394    ) -> Result<()> {
 395        let user_id = {
 396            let mut state = self.store_mut().await;
 397            state.unregister_project(request.payload.project_id, request.sender_id)?;
 398            state.user_id_for_connection(request.sender_id)?
 399        };
 400
 401        self.update_user_contacts(user_id).await?;
 402        Ok(())
 403    }
 404
 405    async fn share_project(
 406        self: Arc<Server>,
 407        request: TypedEnvelope<proto::ShareProject>,
 408        response: Response<proto::ShareProject>,
 409    ) -> Result<()> {
 410        let user_id = {
 411            let mut state = self.store_mut().await;
 412            state.share_project(request.payload.project_id, request.sender_id)?;
 413            state.user_id_for_connection(request.sender_id)?
 414        };
 415        self.update_user_contacts(user_id).await?;
 416        response.send(proto::Ack {})?;
 417        Ok(())
 418    }
 419
 420    async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
 421        let contacts = self.app_state.db.get_contacts(user_id).await?;
 422        let store = self.store().await;
 423        let updated_contact = store.contact_for_user(user_id, false);
 424        for contact in contacts {
 425            if let db::Contact::Accepted {
 426                user_id: contact_user_id,
 427                ..
 428            } = contact
 429            {
 430                for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
 431                    self.peer
 432                        .send(
 433                            contact_conn_id,
 434                            proto::UpdateContacts {
 435                                contacts: vec![updated_contact.clone()],
 436                                remove_contacts: Default::default(),
 437                                incoming_requests: Default::default(),
 438                                remove_incoming_requests: Default::default(),
 439                                outgoing_requests: Default::default(),
 440                                remove_outgoing_requests: Default::default(),
 441                            },
 442                        )
 443                        .trace_err();
 444                }
 445            }
 446        }
 447        Ok(())
 448    }
 449
 450    async fn unshare_project(
 451        self: Arc<Server>,
 452        request: TypedEnvelope<proto::UnshareProject>,
 453    ) -> Result<()> {
 454        let project_id = request.payload.project_id;
 455        let project;
 456        {
 457            let mut state = self.store_mut().await;
 458            project = state.unshare_project(project_id, request.sender_id)?;
 459            broadcast(request.sender_id, project.connection_ids, |conn_id| {
 460                self.peer
 461                    .send(conn_id, proto::UnshareProject { project_id })
 462            });
 463        }
 464        self.update_user_contacts(project.host_user_id).await?;
 465        Ok(())
 466    }
 467
 468    async fn join_project(
 469        self: Arc<Server>,
 470        request: TypedEnvelope<proto::JoinProject>,
 471        response: Response<proto::JoinProject>,
 472    ) -> Result<()> {
 473        let project_id = request.payload.project_id;
 474        let host_user_id;
 475        let guest_user_id;
 476        {
 477            let state = self.store().await;
 478            host_user_id = state.project(project_id)?.host_user_id;
 479            guest_user_id = state.user_id_for_connection(request.sender_id)?;
 480        };
 481
 482        let has_contact = self
 483            .app_state
 484            .db
 485            .has_contact(guest_user_id, host_user_id)
 486            .await?;
 487        if !has_contact {
 488            return Err(anyhow!("no such project"))?;
 489        }
 490
 491        {
 492            let state = &mut *self.store_mut().await;
 493            let joined = state.join_project(request.sender_id, guest_user_id, project_id)?;
 494            let share = joined.project.share()?;
 495            let peer_count = share.guests.len();
 496            let mut collaborators = Vec::with_capacity(peer_count);
 497            collaborators.push(proto::Collaborator {
 498                peer_id: joined.project.host_connection_id.0,
 499                replica_id: 0,
 500                user_id: joined.project.host_user_id.to_proto(),
 501            });
 502            let worktrees = share
 503                .worktrees
 504                .iter()
 505                .filter_map(|(id, shared_worktree)| {
 506                    let worktree = joined.project.worktrees.get(&id)?;
 507                    Some(proto::Worktree {
 508                        id: *id,
 509                        root_name: worktree.root_name.clone(),
 510                        entries: shared_worktree.entries.values().cloned().collect(),
 511                        diagnostic_summaries: shared_worktree
 512                            .diagnostic_summaries
 513                            .values()
 514                            .cloned()
 515                            .collect(),
 516                        visible: worktree.visible,
 517                        scan_id: shared_worktree.scan_id,
 518                    })
 519                })
 520                .collect();
 521            for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 522                if *peer_conn_id != request.sender_id {
 523                    collaborators.push(proto::Collaborator {
 524                        peer_id: peer_conn_id.0,
 525                        replica_id: *peer_replica_id as u32,
 526                        user_id: peer_user_id.to_proto(),
 527                    });
 528                }
 529            }
 530            broadcast(
 531                request.sender_id,
 532                joined.project.connection_ids(),
 533                |conn_id| {
 534                    self.peer.send(
 535                        conn_id,
 536                        proto::AddProjectCollaborator {
 537                            project_id,
 538                            collaborator: Some(proto::Collaborator {
 539                                peer_id: request.sender_id.0,
 540                                replica_id: joined.replica_id as u32,
 541                                user_id: guest_user_id.to_proto(),
 542                            }),
 543                        },
 544                    )
 545                },
 546            );
 547            response.send(proto::JoinProjectResponse {
 548                worktrees,
 549                replica_id: joined.replica_id as u32,
 550                collaborators,
 551                language_servers: joined.project.language_servers.clone(),
 552            })?;
 553        }
 554        self.update_user_contacts(host_user_id).await?;
 555        Ok(())
 556    }
 557
 558    async fn leave_project(
 559        self: Arc<Server>,
 560        request: TypedEnvelope<proto::LeaveProject>,
 561    ) -> Result<()> {
 562        let sender_id = request.sender_id;
 563        let project_id = request.payload.project_id;
 564        let project;
 565        {
 566            let mut state = self.store_mut().await;
 567            project = state.leave_project(sender_id, project_id)?;
 568            broadcast(sender_id, project.connection_ids, |conn_id| {
 569                self.peer.send(
 570                    conn_id,
 571                    proto::RemoveProjectCollaborator {
 572                        project_id,
 573                        peer_id: sender_id.0,
 574                    },
 575                )
 576            });
 577        }
 578        self.update_user_contacts(project.host_user_id).await?;
 579        Ok(())
 580    }
 581
 582    async fn register_worktree(
 583        self: Arc<Server>,
 584        request: TypedEnvelope<proto::RegisterWorktree>,
 585        response: Response<proto::RegisterWorktree>,
 586    ) -> Result<()> {
 587        let host_user_id;
 588        {
 589            let mut state = self.store_mut().await;
 590            host_user_id = state.user_id_for_connection(request.sender_id)?;
 591
 592            let guest_connection_ids = state
 593                .read_project(request.payload.project_id, request.sender_id)?
 594                .guest_connection_ids();
 595            state.register_worktree(
 596                request.payload.project_id,
 597                request.payload.worktree_id,
 598                request.sender_id,
 599                Worktree {
 600                    root_name: request.payload.root_name.clone(),
 601                    visible: request.payload.visible,
 602                },
 603            )?;
 604
 605            broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 606                self.peer
 607                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 608            });
 609        }
 610        self.update_user_contacts(host_user_id).await?;
 611        response.send(proto::Ack {})?;
 612        Ok(())
 613    }
 614
 615    async fn unregister_worktree(
 616        self: Arc<Server>,
 617        request: TypedEnvelope<proto::UnregisterWorktree>,
 618    ) -> Result<()> {
 619        let host_user_id;
 620        let project_id = request.payload.project_id;
 621        let worktree_id = request.payload.worktree_id;
 622        {
 623            let mut state = self.store_mut().await;
 624            let (_, guest_connection_ids) =
 625                state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 626            host_user_id = state.user_id_for_connection(request.sender_id)?;
 627            broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 628                self.peer.send(
 629                    conn_id,
 630                    proto::UnregisterWorktree {
 631                        project_id,
 632                        worktree_id,
 633                    },
 634                )
 635            });
 636        }
 637        self.update_user_contacts(host_user_id).await?;
 638        Ok(())
 639    }
 640
 641    async fn update_worktree(
 642        self: Arc<Server>,
 643        request: TypedEnvelope<proto::UpdateWorktree>,
 644        response: Response<proto::UpdateWorktree>,
 645    ) -> Result<()> {
 646        let connection_ids = self.store_mut().await.update_worktree(
 647            request.sender_id,
 648            request.payload.project_id,
 649            request.payload.worktree_id,
 650            &request.payload.removed_entries,
 651            &request.payload.updated_entries,
 652            request.payload.scan_id,
 653        )?;
 654
 655        broadcast(request.sender_id, connection_ids, |connection_id| {
 656            self.peer
 657                .forward_send(request.sender_id, connection_id, request.payload.clone())
 658        });
 659        response.send(proto::Ack {})?;
 660        Ok(())
 661    }
 662
 663    async fn update_diagnostic_summary(
 664        self: Arc<Server>,
 665        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 666    ) -> Result<()> {
 667        let summary = request
 668            .payload
 669            .summary
 670            .clone()
 671            .ok_or_else(|| anyhow!("invalid summary"))?;
 672        let receiver_ids = self.store_mut().await.update_diagnostic_summary(
 673            request.payload.project_id,
 674            request.payload.worktree_id,
 675            request.sender_id,
 676            summary,
 677        )?;
 678
 679        broadcast(request.sender_id, receiver_ids, |connection_id| {
 680            self.peer
 681                .forward_send(request.sender_id, connection_id, request.payload.clone())
 682        });
 683        Ok(())
 684    }
 685
 686    async fn start_language_server(
 687        self: Arc<Server>,
 688        request: TypedEnvelope<proto::StartLanguageServer>,
 689    ) -> Result<()> {
 690        let receiver_ids = self.store_mut().await.start_language_server(
 691            request.payload.project_id,
 692            request.sender_id,
 693            request
 694                .payload
 695                .server
 696                .clone()
 697                .ok_or_else(|| anyhow!("invalid language server"))?,
 698        )?;
 699        broadcast(request.sender_id, receiver_ids, |connection_id| {
 700            self.peer
 701                .forward_send(request.sender_id, connection_id, request.payload.clone())
 702        });
 703        Ok(())
 704    }
 705
 706    async fn update_language_server(
 707        self: Arc<Server>,
 708        request: TypedEnvelope<proto::UpdateLanguageServer>,
 709    ) -> Result<()> {
 710        let receiver_ids = self
 711            .store()
 712            .await
 713            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 714        broadcast(request.sender_id, receiver_ids, |connection_id| {
 715            self.peer
 716                .forward_send(request.sender_id, connection_id, request.payload.clone())
 717        });
 718        Ok(())
 719    }
 720
 721    async fn forward_project_request<T>(
 722        self: Arc<Server>,
 723        request: TypedEnvelope<T>,
 724        response: Response<T>,
 725    ) -> Result<()>
 726    where
 727        T: EntityMessage + RequestMessage,
 728    {
 729        let host_connection_id = self
 730            .store()
 731            .await
 732            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 733            .host_connection_id;
 734
 735        response.send(
 736            self.peer
 737                .forward_request(request.sender_id, host_connection_id, request.payload)
 738                .await?,
 739        )?;
 740        Ok(())
 741    }
 742
 743    async fn save_buffer(
 744        self: Arc<Server>,
 745        request: TypedEnvelope<proto::SaveBuffer>,
 746        response: Response<proto::SaveBuffer>,
 747    ) -> Result<()> {
 748        let host = self
 749            .store()
 750            .await
 751            .read_project(request.payload.project_id, request.sender_id)?
 752            .host_connection_id;
 753        let response_payload = self
 754            .peer
 755            .forward_request(request.sender_id, host, request.payload.clone())
 756            .await?;
 757
 758        let mut guests = self
 759            .store()
 760            .await
 761            .read_project(request.payload.project_id, request.sender_id)?
 762            .connection_ids();
 763        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 764        broadcast(host, guests, |conn_id| {
 765            self.peer
 766                .forward_send(host, conn_id, response_payload.clone())
 767        });
 768        response.send(response_payload)?;
 769        Ok(())
 770    }
 771
 772    async fn update_buffer(
 773        self: Arc<Server>,
 774        request: TypedEnvelope<proto::UpdateBuffer>,
 775        response: Response<proto::UpdateBuffer>,
 776    ) -> Result<()> {
 777        let receiver_ids = self
 778            .store()
 779            .await
 780            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 781        broadcast(request.sender_id, receiver_ids, |connection_id| {
 782            self.peer
 783                .forward_send(request.sender_id, connection_id, request.payload.clone())
 784        });
 785        response.send(proto::Ack {})?;
 786        Ok(())
 787    }
 788
 789    async fn update_buffer_file(
 790        self: Arc<Server>,
 791        request: TypedEnvelope<proto::UpdateBufferFile>,
 792    ) -> Result<()> {
 793        let receiver_ids = self
 794            .store()
 795            .await
 796            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 797        broadcast(request.sender_id, receiver_ids, |connection_id| {
 798            self.peer
 799                .forward_send(request.sender_id, connection_id, request.payload.clone())
 800        });
 801        Ok(())
 802    }
 803
 804    async fn buffer_reloaded(
 805        self: Arc<Server>,
 806        request: TypedEnvelope<proto::BufferReloaded>,
 807    ) -> Result<()> {
 808        let receiver_ids = self
 809            .store()
 810            .await
 811            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 812        broadcast(request.sender_id, receiver_ids, |connection_id| {
 813            self.peer
 814                .forward_send(request.sender_id, connection_id, request.payload.clone())
 815        });
 816        Ok(())
 817    }
 818
 819    async fn buffer_saved(
 820        self: Arc<Server>,
 821        request: TypedEnvelope<proto::BufferSaved>,
 822    ) -> Result<()> {
 823        let receiver_ids = self
 824            .store()
 825            .await
 826            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 827        broadcast(request.sender_id, receiver_ids, |connection_id| {
 828            self.peer
 829                .forward_send(request.sender_id, connection_id, request.payload.clone())
 830        });
 831        Ok(())
 832    }
 833
 834    async fn follow(
 835        self: Arc<Self>,
 836        request: TypedEnvelope<proto::Follow>,
 837        response: Response<proto::Follow>,
 838    ) -> Result<()> {
 839        let leader_id = ConnectionId(request.payload.leader_id);
 840        let follower_id = request.sender_id;
 841        if !self
 842            .store()
 843            .await
 844            .project_connection_ids(request.payload.project_id, follower_id)?
 845            .contains(&leader_id)
 846        {
 847            Err(anyhow!("no such peer"))?;
 848        }
 849        let mut response_payload = self
 850            .peer
 851            .forward_request(request.sender_id, leader_id, request.payload)
 852            .await?;
 853        response_payload
 854            .views
 855            .retain(|view| view.leader_id != Some(follower_id.0));
 856        response.send(response_payload)?;
 857        Ok(())
 858    }
 859
 860    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 861        let leader_id = ConnectionId(request.payload.leader_id);
 862        if !self
 863            .store()
 864            .await
 865            .project_connection_ids(request.payload.project_id, request.sender_id)?
 866            .contains(&leader_id)
 867        {
 868            Err(anyhow!("no such peer"))?;
 869        }
 870        self.peer
 871            .forward_send(request.sender_id, leader_id, request.payload)?;
 872        Ok(())
 873    }
 874
 875    async fn update_followers(
 876        self: Arc<Self>,
 877        request: TypedEnvelope<proto::UpdateFollowers>,
 878    ) -> Result<()> {
 879        let connection_ids = self
 880            .store()
 881            .await
 882            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 883        let leader_id = request
 884            .payload
 885            .variant
 886            .as_ref()
 887            .and_then(|variant| match variant {
 888                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 889                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 890                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 891            });
 892        for follower_id in &request.payload.follower_ids {
 893            let follower_id = ConnectionId(*follower_id);
 894            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 895                self.peer
 896                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 897            }
 898        }
 899        Ok(())
 900    }
 901
 902    async fn get_channels(
 903        self: Arc<Server>,
 904        request: TypedEnvelope<proto::GetChannels>,
 905        response: Response<proto::GetChannels>,
 906    ) -> Result<()> {
 907        let user_id = self
 908            .store()
 909            .await
 910            .user_id_for_connection(request.sender_id)?;
 911        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 912        response.send(proto::GetChannelsResponse {
 913            channels: channels
 914                .into_iter()
 915                .map(|chan| proto::Channel {
 916                    id: chan.id.to_proto(),
 917                    name: chan.name,
 918                })
 919                .collect(),
 920        })?;
 921        Ok(())
 922    }
 923
 924    async fn get_users(
 925        self: Arc<Server>,
 926        request: TypedEnvelope<proto::GetUsers>,
 927        response: Response<proto::GetUsers>,
 928    ) -> Result<()> {
 929        let user_ids = request
 930            .payload
 931            .user_ids
 932            .into_iter()
 933            .map(UserId::from_proto)
 934            .collect();
 935        let users = self
 936            .app_state
 937            .db
 938            .get_users_by_ids(user_ids)
 939            .await?
 940            .into_iter()
 941            .map(|user| proto::User {
 942                id: user.id.to_proto(),
 943                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 944                github_login: user.github_login,
 945            })
 946            .collect();
 947        response.send(proto::UsersResponse { users })?;
 948        Ok(())
 949    }
 950
 951    async fn fuzzy_search_users(
 952        self: Arc<Server>,
 953        request: TypedEnvelope<proto::FuzzySearchUsers>,
 954        response: Response<proto::FuzzySearchUsers>,
 955    ) -> Result<()> {
 956        let user_id = self
 957            .store()
 958            .await
 959            .user_id_for_connection(request.sender_id)?;
 960        let query = request.payload.query;
 961        let db = &self.app_state.db;
 962        let users = match query.len() {
 963            0 => vec![],
 964            1 | 2 => db
 965                .get_user_by_github_login(&query)
 966                .await?
 967                .into_iter()
 968                .collect(),
 969            _ => db.fuzzy_search_users(&query, 10).await?,
 970        };
 971        let users = users
 972            .into_iter()
 973            .filter(|user| user.id != user_id)
 974            .map(|user| proto::User {
 975                id: user.id.to_proto(),
 976                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 977                github_login: user.github_login,
 978            })
 979            .collect();
 980        response.send(proto::UsersResponse { users })?;
 981        Ok(())
 982    }
 983
 984    async fn request_contact(
 985        self: Arc<Server>,
 986        request: TypedEnvelope<proto::RequestContact>,
 987        response: Response<proto::RequestContact>,
 988    ) -> Result<()> {
 989        let requester_id = self
 990            .store()
 991            .await
 992            .user_id_for_connection(request.sender_id)?;
 993        let responder_id = UserId::from_proto(request.payload.responder_id);
 994        if requester_id == responder_id {
 995            return Err(anyhow!("cannot add yourself as a contact"))?;
 996        }
 997
 998        self.app_state
 999            .db
1000            .send_contact_request(requester_id, responder_id)
1001            .await?;
1002
1003        // Update outgoing contact requests of requester
1004        let mut update = proto::UpdateContacts::default();
1005        update.outgoing_requests.push(responder_id.to_proto());
1006        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1007            self.peer.send(connection_id, update.clone())?;
1008        }
1009
1010        // Update incoming contact requests of responder
1011        let mut update = proto::UpdateContacts::default();
1012        update
1013            .incoming_requests
1014            .push(proto::IncomingContactRequest {
1015                requester_id: requester_id.to_proto(),
1016                should_notify: true,
1017            });
1018        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1019            self.peer.send(connection_id, update.clone())?;
1020        }
1021
1022        response.send(proto::Ack {})?;
1023        Ok(())
1024    }
1025
1026    async fn respond_to_contact_request(
1027        self: Arc<Server>,
1028        request: TypedEnvelope<proto::RespondToContactRequest>,
1029        response: Response<proto::RespondToContactRequest>,
1030    ) -> Result<()> {
1031        let responder_id = self
1032            .store()
1033            .await
1034            .user_id_for_connection(request.sender_id)?;
1035        let requester_id = UserId::from_proto(request.payload.requester_id);
1036        if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1037            self.app_state
1038                .db
1039                .dismiss_contact_notification(responder_id, requester_id)
1040                .await?;
1041        } else {
1042            let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1043            self.app_state
1044                .db
1045                .respond_to_contact_request(responder_id, requester_id, accept)
1046                .await?;
1047
1048            let store = self.store().await;
1049            // Update responder with new contact
1050            let mut update = proto::UpdateContacts::default();
1051            if accept {
1052                update
1053                    .contacts
1054                    .push(store.contact_for_user(requester_id, false));
1055            }
1056            update
1057                .remove_incoming_requests
1058                .push(requester_id.to_proto());
1059            for connection_id in store.connection_ids_for_user(responder_id) {
1060                self.peer.send(connection_id, update.clone())?;
1061            }
1062
1063            // Update requester with new contact
1064            let mut update = proto::UpdateContacts::default();
1065            if accept {
1066                update
1067                    .contacts
1068                    .push(store.contact_for_user(responder_id, true));
1069            }
1070            update
1071                .remove_outgoing_requests
1072                .push(responder_id.to_proto());
1073            for connection_id in store.connection_ids_for_user(requester_id) {
1074                self.peer.send(connection_id, update.clone())?;
1075            }
1076        }
1077
1078        response.send(proto::Ack {})?;
1079        Ok(())
1080    }
1081
1082    async fn remove_contact(
1083        self: Arc<Server>,
1084        request: TypedEnvelope<proto::RemoveContact>,
1085        response: Response<proto::RemoveContact>,
1086    ) -> Result<()> {
1087        let requester_id = self
1088            .store()
1089            .await
1090            .user_id_for_connection(request.sender_id)?;
1091        let responder_id = UserId::from_proto(request.payload.user_id);
1092        self.app_state
1093            .db
1094            .remove_contact(requester_id, responder_id)
1095            .await?;
1096
1097        // Update outgoing contact requests of requester
1098        let mut update = proto::UpdateContacts::default();
1099        update
1100            .remove_outgoing_requests
1101            .push(responder_id.to_proto());
1102        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1103            self.peer.send(connection_id, update.clone())?;
1104        }
1105
1106        // Update incoming contact requests of responder
1107        let mut update = proto::UpdateContacts::default();
1108        update
1109            .remove_incoming_requests
1110            .push(requester_id.to_proto());
1111        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1112            self.peer.send(connection_id, update.clone())?;
1113        }
1114
1115        response.send(proto::Ack {})?;
1116        Ok(())
1117    }
1118
1119    // #[instrument(skip(self, state, user_ids))]
1120    // fn update_contacts_for_users<'a>(
1121    //     self: &Arc<Self>,
1122    //     state: &Store,
1123    //     user_ids: impl IntoIterator<Item = &'a UserId>,
1124    // ) {
1125    //     for user_id in user_ids {
1126    //         let contacts = state.contacts_for_user(*user_id);
1127    //         for connection_id in state.connection_ids_for_user(*user_id) {
1128    //             self.peer
1129    //                 .send(
1130    //                     connection_id,
1131    //                     proto::UpdateContacts {
1132    //                         contacts: contacts.clone(),
1133    //                         pending_requests_from_user_ids: Default::default(),
1134    //                         pending_requests_to_user_ids: Default::default(),
1135    //                     },
1136    //                 )
1137    //                 .trace_err();
1138    //         }
1139    //     }
1140    // }
1141
1142    async fn join_channel(
1143        self: Arc<Self>,
1144        request: TypedEnvelope<proto::JoinChannel>,
1145        response: Response<proto::JoinChannel>,
1146    ) -> Result<()> {
1147        let user_id = self
1148            .store()
1149            .await
1150            .user_id_for_connection(request.sender_id)?;
1151        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1152        if !self
1153            .app_state
1154            .db
1155            .can_user_access_channel(user_id, channel_id)
1156            .await?
1157        {
1158            Err(anyhow!("access denied"))?;
1159        }
1160
1161        self.store_mut()
1162            .await
1163            .join_channel(request.sender_id, channel_id);
1164        let messages = self
1165            .app_state
1166            .db
1167            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1168            .await?
1169            .into_iter()
1170            .map(|msg| proto::ChannelMessage {
1171                id: msg.id.to_proto(),
1172                body: msg.body,
1173                timestamp: msg.sent_at.unix_timestamp() as u64,
1174                sender_id: msg.sender_id.to_proto(),
1175                nonce: Some(msg.nonce.as_u128().into()),
1176            })
1177            .collect::<Vec<_>>();
1178        response.send(proto::JoinChannelResponse {
1179            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1180            messages,
1181        })?;
1182        Ok(())
1183    }
1184
1185    async fn leave_channel(
1186        self: Arc<Self>,
1187        request: TypedEnvelope<proto::LeaveChannel>,
1188    ) -> Result<()> {
1189        let user_id = self
1190            .store()
1191            .await
1192            .user_id_for_connection(request.sender_id)?;
1193        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1194        if !self
1195            .app_state
1196            .db
1197            .can_user_access_channel(user_id, channel_id)
1198            .await?
1199        {
1200            Err(anyhow!("access denied"))?;
1201        }
1202
1203        self.store_mut()
1204            .await
1205            .leave_channel(request.sender_id, channel_id);
1206
1207        Ok(())
1208    }
1209
1210    async fn send_channel_message(
1211        self: Arc<Self>,
1212        request: TypedEnvelope<proto::SendChannelMessage>,
1213        response: Response<proto::SendChannelMessage>,
1214    ) -> Result<()> {
1215        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1216        let user_id;
1217        let connection_ids;
1218        {
1219            let state = self.store().await;
1220            user_id = state.user_id_for_connection(request.sender_id)?;
1221            connection_ids = state.channel_connection_ids(channel_id)?;
1222        }
1223
1224        // Validate the message body.
1225        let body = request.payload.body.trim().to_string();
1226        if body.len() > MAX_MESSAGE_LEN {
1227            return Err(anyhow!("message is too long"))?;
1228        }
1229        if body.is_empty() {
1230            return Err(anyhow!("message can't be blank"))?;
1231        }
1232
1233        let timestamp = OffsetDateTime::now_utc();
1234        let nonce = request
1235            .payload
1236            .nonce
1237            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1238
1239        let message_id = self
1240            .app_state
1241            .db
1242            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1243            .await?
1244            .to_proto();
1245        let message = proto::ChannelMessage {
1246            sender_id: user_id.to_proto(),
1247            id: message_id,
1248            body,
1249            timestamp: timestamp.unix_timestamp() as u64,
1250            nonce: Some(nonce),
1251        };
1252        broadcast(request.sender_id, connection_ids, |conn_id| {
1253            self.peer.send(
1254                conn_id,
1255                proto::ChannelMessageSent {
1256                    channel_id: channel_id.to_proto(),
1257                    message: Some(message.clone()),
1258                },
1259            )
1260        });
1261        response.send(proto::SendChannelMessageResponse {
1262            message: Some(message),
1263        })?;
1264        Ok(())
1265    }
1266
1267    async fn get_channel_messages(
1268        self: Arc<Self>,
1269        request: TypedEnvelope<proto::GetChannelMessages>,
1270        response: Response<proto::GetChannelMessages>,
1271    ) -> Result<()> {
1272        let user_id = self
1273            .store()
1274            .await
1275            .user_id_for_connection(request.sender_id)?;
1276        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1277        if !self
1278            .app_state
1279            .db
1280            .can_user_access_channel(user_id, channel_id)
1281            .await?
1282        {
1283            Err(anyhow!("access denied"))?;
1284        }
1285
1286        let messages = self
1287            .app_state
1288            .db
1289            .get_channel_messages(
1290                channel_id,
1291                MESSAGE_COUNT_PER_PAGE,
1292                Some(MessageId::from_proto(request.payload.before_message_id)),
1293            )
1294            .await?
1295            .into_iter()
1296            .map(|msg| proto::ChannelMessage {
1297                id: msg.id.to_proto(),
1298                body: msg.body,
1299                timestamp: msg.sent_at.unix_timestamp() as u64,
1300                sender_id: msg.sender_id.to_proto(),
1301                nonce: Some(msg.nonce.as_u128().into()),
1302            })
1303            .collect::<Vec<_>>();
1304        response.send(proto::GetChannelMessagesResponse {
1305            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1306            messages,
1307        })?;
1308        Ok(())
1309    }
1310
1311    async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1312        #[cfg(test)]
1313        tokio::task::yield_now().await;
1314        let guard = self.store.read().await;
1315        #[cfg(test)]
1316        tokio::task::yield_now().await;
1317        StoreReadGuard {
1318            guard,
1319            _not_send: PhantomData,
1320        }
1321    }
1322
1323    async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1324        #[cfg(test)]
1325        tokio::task::yield_now().await;
1326        let guard = self.store.write().await;
1327        #[cfg(test)]
1328        tokio::task::yield_now().await;
1329        StoreWriteGuard {
1330            guard,
1331            _not_send: PhantomData,
1332        }
1333    }
1334}
1335
1336impl<'a> Deref for StoreReadGuard<'a> {
1337    type Target = Store;
1338
1339    fn deref(&self) -> &Self::Target {
1340        &*self.guard
1341    }
1342}
1343
1344impl<'a> Deref for StoreWriteGuard<'a> {
1345    type Target = Store;
1346
1347    fn deref(&self) -> &Self::Target {
1348        &*self.guard
1349    }
1350}
1351
1352impl<'a> DerefMut for StoreWriteGuard<'a> {
1353    fn deref_mut(&mut self) -> &mut Self::Target {
1354        &mut *self.guard
1355    }
1356}
1357
1358impl<'a> Drop for StoreWriteGuard<'a> {
1359    fn drop(&mut self) {
1360        #[cfg(test)]
1361        self.check_invariants();
1362
1363        let metrics = self.metrics();
1364        tracing::info!(
1365            connections = metrics.connections,
1366            registered_projects = metrics.registered_projects,
1367            shared_projects = metrics.shared_projects,
1368            "metrics"
1369        );
1370    }
1371}
1372
1373impl Executor for RealExecutor {
1374    type Sleep = Sleep;
1375
1376    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1377        tokio::task::spawn(future);
1378    }
1379
1380    fn sleep(&self, duration: Duration) -> Self::Sleep {
1381        tokio::time::sleep(duration)
1382    }
1383}
1384
1385fn broadcast<F>(
1386    sender_id: ConnectionId,
1387    receiver_ids: impl IntoIterator<Item = ConnectionId>,
1388    mut f: F,
1389) where
1390    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1391{
1392    for receiver_id in receiver_ids {
1393        if receiver_id != sender_id {
1394            f(receiver_id).trace_err();
1395        }
1396    }
1397}
1398
1399lazy_static! {
1400    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1401}
1402
1403pub struct ProtocolVersion(u32);
1404
1405impl Header for ProtocolVersion {
1406    fn name() -> &'static HeaderName {
1407        &ZED_PROTOCOL_VERSION
1408    }
1409
1410    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1411    where
1412        Self: Sized,
1413        I: Iterator<Item = &'i axum::http::HeaderValue>,
1414    {
1415        let version = values
1416            .next()
1417            .ok_or_else(|| axum::headers::Error::invalid())?
1418            .to_str()
1419            .map_err(|_| axum::headers::Error::invalid())?
1420            .parse()
1421            .map_err(|_| axum::headers::Error::invalid())?;
1422        Ok(Self(version))
1423    }
1424
1425    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1426        values.extend([self.0.to_string().parse().unwrap()]);
1427    }
1428}
1429
1430pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1431    let server = Server::new(app_state.clone(), None);
1432    Router::new()
1433        .route("/rpc", get(handle_websocket_request))
1434        .layer(
1435            ServiceBuilder::new()
1436                .layer(Extension(app_state))
1437                .layer(middleware::from_fn(auth::validate_header))
1438                .layer(Extension(server)),
1439        )
1440}
1441
1442pub async fn handle_websocket_request(
1443    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1444    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1445    Extension(server): Extension<Arc<Server>>,
1446    Extension(user_id): Extension<UserId>,
1447    ws: WebSocketUpgrade,
1448) -> axum::response::Response {
1449    if protocol_version != rpc::PROTOCOL_VERSION {
1450        return (
1451            StatusCode::UPGRADE_REQUIRED,
1452            "client must be upgraded".to_string(),
1453        )
1454            .into_response();
1455    }
1456    let socket_address = socket_address.to_string();
1457    ws.on_upgrade(move |socket| {
1458        use util::ResultExt;
1459        let socket = socket
1460            .map_ok(to_tungstenite_message)
1461            .err_into()
1462            .with(|message| async move { Ok(to_axum_message(message)) });
1463        let connection = Connection::new(Box::pin(socket));
1464        async move {
1465            server
1466                .handle_connection(connection, socket_address, user_id, None, RealExecutor)
1467                .await
1468                .log_err();
1469        }
1470    })
1471}
1472
1473fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1474    match message {
1475        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1476        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1477        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1478        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1479        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1480            code: frame.code.into(),
1481            reason: frame.reason,
1482        })),
1483    }
1484}
1485
1486fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1487    match message {
1488        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1489        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1490        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1491        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1492        AxumMessage::Close(frame) => {
1493            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1494                code: frame.code.into(),
1495                reason: frame.reason,
1496            }))
1497        }
1498    }
1499}
1500
1501pub trait ResultExt {
1502    type Ok;
1503
1504    fn trace_err(self) -> Option<Self::Ok>;
1505}
1506
1507impl<T, E> ResultExt for Result<T, E>
1508where
1509    E: std::fmt::Debug,
1510{
1511    type Ok = T;
1512
1513    fn trace_err(self) -> Option<T> {
1514        match self {
1515            Ok(value) => Some(value),
1516            Err(error) => {
1517                tracing::error!("{:?}", error);
1518                None
1519            }
1520        }
1521    }
1522}
1523
1524#[cfg(test)]
1525mod tests {
1526    use super::*;
1527    use crate::{
1528        db::{tests::TestDb, UserId},
1529        AppState,
1530    };
1531    use ::rpc::Peer;
1532    use client::{
1533        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1534        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1535    };
1536    use collections::{BTreeMap, HashSet};
1537    use editor::{
1538        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1539        ToOffset, ToggleCodeActions, Undo,
1540    };
1541    use gpui::{
1542        executor::{self, Deterministic},
1543        geometry::vector::vec2f,
1544        ModelHandle, TestAppContext, ViewHandle,
1545    };
1546    use language::{
1547        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1548        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1549    };
1550    use lsp::{self, FakeLanguageServer};
1551    use parking_lot::Mutex;
1552    use project::{
1553        fs::{FakeFs, Fs as _},
1554        search::SearchQuery,
1555        worktree::WorktreeHandle,
1556        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1557    };
1558    use rand::prelude::*;
1559    use rpc::PeerId;
1560    use serde_json::json;
1561    use settings::Settings;
1562    use sqlx::types::time::OffsetDateTime;
1563    use std::{
1564        env,
1565        ops::Deref,
1566        path::{Path, PathBuf},
1567        rc::Rc,
1568        sync::{
1569            atomic::{AtomicBool, Ordering::SeqCst},
1570            Arc,
1571        },
1572        time::Duration,
1573    };
1574    use theme::ThemeRegistry;
1575    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1576
1577    #[cfg(test)]
1578    #[ctor::ctor]
1579    fn init_logger() {
1580        if std::env::var("RUST_LOG").is_ok() {
1581            env_logger::init();
1582        }
1583    }
1584
1585    #[gpui::test(iterations = 10)]
1586    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1587        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1588        let lang_registry = Arc::new(LanguageRegistry::test());
1589        let fs = FakeFs::new(cx_a.background());
1590        cx_a.foreground().forbid_parking();
1591
1592        // Connect to a server as 2 clients.
1593        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1594        let client_a = server.create_client(cx_a, "user_a").await;
1595        let client_b = server.create_client(cx_b, "user_b").await;
1596        server
1597            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1598            .await;
1599
1600        // Share a project as client A
1601        fs.insert_tree(
1602            "/a",
1603            json!({
1604                "a.txt": "a-contents",
1605                "b.txt": "b-contents",
1606            }),
1607        )
1608        .await;
1609        let project_a = cx_a.update(|cx| {
1610            Project::local(
1611                client_a.clone(),
1612                client_a.user_store.clone(),
1613                lang_registry.clone(),
1614                fs.clone(),
1615                cx,
1616            )
1617        });
1618        let (worktree_a, _) = project_a
1619            .update(cx_a, |p, cx| {
1620                p.find_or_create_local_worktree("/a", true, cx)
1621            })
1622            .await
1623            .unwrap();
1624        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1625        worktree_a
1626            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1627            .await;
1628        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1629        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1630
1631        // Join that project as client B
1632        let project_b = Project::remote(
1633            project_id,
1634            client_b.clone(),
1635            client_b.user_store.clone(),
1636            lang_registry.clone(),
1637            fs.clone(),
1638            &mut cx_b.to_async(),
1639        )
1640        .await
1641        .unwrap();
1642
1643        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1644            assert_eq!(
1645                project
1646                    .collaborators()
1647                    .get(&client_a.peer_id)
1648                    .unwrap()
1649                    .user
1650                    .github_login,
1651                "user_a"
1652            );
1653            project.replica_id()
1654        });
1655        project_a
1656            .condition(&cx_a, |tree, _| {
1657                tree.collaborators()
1658                    .get(&client_b.peer_id)
1659                    .map_or(false, |collaborator| {
1660                        collaborator.replica_id == replica_id_b
1661                            && collaborator.user.github_login == "user_b"
1662                    })
1663            })
1664            .await;
1665
1666        // Open the same file as client B and client A.
1667        let buffer_b = project_b
1668            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1669            .await
1670            .unwrap();
1671        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1672        project_a.read_with(cx_a, |project, cx| {
1673            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1674        });
1675        let buffer_a = project_a
1676            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1677            .await
1678            .unwrap();
1679
1680        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1681
1682        // TODO
1683        // // Create a selection set as client B and see that selection set as client A.
1684        // buffer_a
1685        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1686        //     .await;
1687
1688        // Edit the buffer as client B and see that edit as client A.
1689        editor_b.update(cx_b, |editor, cx| {
1690            editor.handle_input(&Input("ok, ".into()), cx)
1691        });
1692        buffer_a
1693            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1694            .await;
1695
1696        // TODO
1697        // // Remove the selection set as client B, see those selections disappear as client A.
1698        cx_b.update(move |_| drop(editor_b));
1699        // buffer_a
1700        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1701        //     .await;
1702
1703        // Dropping the client B's project removes client B from client A's collaborators.
1704        cx_b.update(move |_| drop(project_b));
1705        project_a
1706            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1707            .await;
1708    }
1709
1710    #[gpui::test(iterations = 10)]
1711    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1712        let lang_registry = Arc::new(LanguageRegistry::test());
1713        let fs = FakeFs::new(cx_a.background());
1714        cx_a.foreground().forbid_parking();
1715
1716        // Connect to a server as 2 clients.
1717        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1718        let client_a = server.create_client(cx_a, "user_a").await;
1719        let client_b = server.create_client(cx_b, "user_b").await;
1720        server
1721            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1722            .await;
1723
1724        // Share a project as client A
1725        fs.insert_tree(
1726            "/a",
1727            json!({
1728                "a.txt": "a-contents",
1729                "b.txt": "b-contents",
1730            }),
1731        )
1732        .await;
1733        let project_a = cx_a.update(|cx| {
1734            Project::local(
1735                client_a.clone(),
1736                client_a.user_store.clone(),
1737                lang_registry.clone(),
1738                fs.clone(),
1739                cx,
1740            )
1741        });
1742        let (worktree_a, _) = project_a
1743            .update(cx_a, |p, cx| {
1744                p.find_or_create_local_worktree("/a", true, cx)
1745            })
1746            .await
1747            .unwrap();
1748        worktree_a
1749            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1750            .await;
1751        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1752        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1753        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1754        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1755
1756        // Join that project as client B
1757        let project_b = Project::remote(
1758            project_id,
1759            client_b.clone(),
1760            client_b.user_store.clone(),
1761            lang_registry.clone(),
1762            fs.clone(),
1763            &mut cx_b.to_async(),
1764        )
1765        .await
1766        .unwrap();
1767        project_b
1768            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1769            .await
1770            .unwrap();
1771
1772        // Unshare the project as client A
1773        project_a.update(cx_a, |project, cx| project.unshare(cx));
1774        project_b
1775            .condition(cx_b, |project, _| project.is_read_only())
1776            .await;
1777        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1778        cx_b.update(|_| {
1779            drop(project_b);
1780        });
1781
1782        // Share the project again and ensure guests can still join.
1783        project_a
1784            .update(cx_a, |project, cx| project.share(cx))
1785            .await
1786            .unwrap();
1787        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1788
1789        let project_b2 = Project::remote(
1790            project_id,
1791            client_b.clone(),
1792            client_b.user_store.clone(),
1793            lang_registry.clone(),
1794            fs.clone(),
1795            &mut cx_b.to_async(),
1796        )
1797        .await
1798        .unwrap();
1799        project_b2
1800            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1801            .await
1802            .unwrap();
1803    }
1804
1805    #[gpui::test(iterations = 10)]
1806    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1807        let lang_registry = Arc::new(LanguageRegistry::test());
1808        let fs = FakeFs::new(cx_a.background());
1809        cx_a.foreground().forbid_parking();
1810
1811        // Connect to a server as 2 clients.
1812        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1813        let client_a = server.create_client(cx_a, "user_a").await;
1814        let client_b = server.create_client(cx_b, "user_b").await;
1815        server
1816            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1817            .await;
1818
1819        // Share a project as client A
1820        fs.insert_tree(
1821            "/a",
1822            json!({
1823                "a.txt": "a-contents",
1824                "b.txt": "b-contents",
1825            }),
1826        )
1827        .await;
1828        let project_a = cx_a.update(|cx| {
1829            Project::local(
1830                client_a.clone(),
1831                client_a.user_store.clone(),
1832                lang_registry.clone(),
1833                fs.clone(),
1834                cx,
1835            )
1836        });
1837        let (worktree_a, _) = project_a
1838            .update(cx_a, |p, cx| {
1839                p.find_or_create_local_worktree("/a", true, cx)
1840            })
1841            .await
1842            .unwrap();
1843        worktree_a
1844            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1845            .await;
1846        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1847        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1848        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1849        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1850
1851        // Join that project as client B
1852        let project_b = Project::remote(
1853            project_id,
1854            client_b.clone(),
1855            client_b.user_store.clone(),
1856            lang_registry.clone(),
1857            fs.clone(),
1858            &mut cx_b.to_async(),
1859        )
1860        .await
1861        .unwrap();
1862        project_b
1863            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1864            .await
1865            .unwrap();
1866
1867        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1868        server.disconnect_client(client_a.current_user_id(cx_a));
1869        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1870        project_a
1871            .condition(cx_a, |project, _| project.collaborators().is_empty())
1872            .await;
1873        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1874        project_b
1875            .condition(cx_b, |project, _| project.is_read_only())
1876            .await;
1877        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1878        cx_b.update(|_| {
1879            drop(project_b);
1880        });
1881
1882        // Await reconnection
1883        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1884
1885        // Share the project again and ensure guests can still join.
1886        project_a
1887            .update(cx_a, |project, cx| project.share(cx))
1888            .await
1889            .unwrap();
1890        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1891
1892        let project_b2 = Project::remote(
1893            project_id,
1894            client_b.clone(),
1895            client_b.user_store.clone(),
1896            lang_registry.clone(),
1897            fs.clone(),
1898            &mut cx_b.to_async(),
1899        )
1900        .await
1901        .unwrap();
1902        project_b2
1903            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1904            .await
1905            .unwrap();
1906    }
1907
1908    #[gpui::test(iterations = 10)]
1909    async fn test_propagate_saves_and_fs_changes(
1910        cx_a: &mut TestAppContext,
1911        cx_b: &mut TestAppContext,
1912        cx_c: &mut TestAppContext,
1913    ) {
1914        let lang_registry = Arc::new(LanguageRegistry::test());
1915        let fs = FakeFs::new(cx_a.background());
1916        cx_a.foreground().forbid_parking();
1917
1918        // Connect to a server as 3 clients.
1919        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1920        let client_a = server.create_client(cx_a, "user_a").await;
1921        let client_b = server.create_client(cx_b, "user_b").await;
1922        let client_c = server.create_client(cx_c, "user_c").await;
1923        server
1924            .make_contacts(vec![
1925                (&client_a, cx_a),
1926                (&client_b, cx_b),
1927                (&client_c, cx_c),
1928            ])
1929            .await;
1930
1931        // Share a worktree as client A.
1932        fs.insert_tree(
1933            "/a",
1934            json!({
1935                "file1": "",
1936                "file2": ""
1937            }),
1938        )
1939        .await;
1940        let project_a = cx_a.update(|cx| {
1941            Project::local(
1942                client_a.clone(),
1943                client_a.user_store.clone(),
1944                lang_registry.clone(),
1945                fs.clone(),
1946                cx,
1947            )
1948        });
1949        let (worktree_a, _) = project_a
1950            .update(cx_a, |p, cx| {
1951                p.find_or_create_local_worktree("/a", true, cx)
1952            })
1953            .await
1954            .unwrap();
1955        worktree_a
1956            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1957            .await;
1958        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1959        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1960        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1961
1962        // Join that worktree as clients B and C.
1963        let project_b = Project::remote(
1964            project_id,
1965            client_b.clone(),
1966            client_b.user_store.clone(),
1967            lang_registry.clone(),
1968            fs.clone(),
1969            &mut cx_b.to_async(),
1970        )
1971        .await
1972        .unwrap();
1973        let project_c = Project::remote(
1974            project_id,
1975            client_c.clone(),
1976            client_c.user_store.clone(),
1977            lang_registry.clone(),
1978            fs.clone(),
1979            &mut cx_c.to_async(),
1980        )
1981        .await
1982        .unwrap();
1983        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1984        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1985
1986        // Open and edit a buffer as both guests B and C.
1987        let buffer_b = project_b
1988            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1989            .await
1990            .unwrap();
1991        let buffer_c = project_c
1992            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1993            .await
1994            .unwrap();
1995        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1996        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1997
1998        // Open and edit that buffer as the host.
1999        let buffer_a = project_a
2000            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2001            .await
2002            .unwrap();
2003
2004        buffer_a
2005            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
2006            .await;
2007        buffer_a.update(cx_a, |buf, cx| {
2008            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
2009        });
2010
2011        // Wait for edits to propagate
2012        buffer_a
2013            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2014            .await;
2015        buffer_b
2016            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2017            .await;
2018        buffer_c
2019            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2020            .await;
2021
2022        // Edit the buffer as the host and concurrently save as guest B.
2023        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2024        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2025        save_b.await.unwrap();
2026        assert_eq!(
2027            fs.load("/a/file1".as_ref()).await.unwrap(),
2028            "hi-a, i-am-c, i-am-b, i-am-a"
2029        );
2030        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2031        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2032        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2033
2034        worktree_a.flush_fs_events(cx_a).await;
2035
2036        // Make changes on host's file system, see those changes on guest worktrees.
2037        fs.rename(
2038            "/a/file1".as_ref(),
2039            "/a/file1-renamed".as_ref(),
2040            Default::default(),
2041        )
2042        .await
2043        .unwrap();
2044
2045        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2046            .await
2047            .unwrap();
2048        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2049
2050        worktree_a
2051            .condition(&cx_a, |tree, _| {
2052                tree.paths()
2053                    .map(|p| p.to_string_lossy())
2054                    .collect::<Vec<_>>()
2055                    == ["file1-renamed", "file3", "file4"]
2056            })
2057            .await;
2058        worktree_b
2059            .condition(&cx_b, |tree, _| {
2060                tree.paths()
2061                    .map(|p| p.to_string_lossy())
2062                    .collect::<Vec<_>>()
2063                    == ["file1-renamed", "file3", "file4"]
2064            })
2065            .await;
2066        worktree_c
2067            .condition(&cx_c, |tree, _| {
2068                tree.paths()
2069                    .map(|p| p.to_string_lossy())
2070                    .collect::<Vec<_>>()
2071                    == ["file1-renamed", "file3", "file4"]
2072            })
2073            .await;
2074
2075        // Ensure buffer files are updated as well.
2076        buffer_a
2077            .condition(&cx_a, |buf, _| {
2078                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2079            })
2080            .await;
2081        buffer_b
2082            .condition(&cx_b, |buf, _| {
2083                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2084            })
2085            .await;
2086        buffer_c
2087            .condition(&cx_c, |buf, _| {
2088                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2089            })
2090            .await;
2091    }
2092
2093    #[gpui::test(iterations = 10)]
2094    async fn test_fs_operations(
2095        executor: Arc<Deterministic>,
2096        cx_a: &mut TestAppContext,
2097        cx_b: &mut TestAppContext,
2098    ) {
2099        executor.forbid_parking();
2100        let fs = FakeFs::new(cx_a.background());
2101
2102        // Connect to a server as 2 clients.
2103        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2104        let mut client_a = server.create_client(cx_a, "user_a").await;
2105        let mut client_b = server.create_client(cx_b, "user_b").await;
2106        server
2107            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2108            .await;
2109
2110        // Share a project as client A
2111        fs.insert_tree(
2112            "/dir",
2113            json!({
2114                "a.txt": "a-contents",
2115                "b.txt": "b-contents",
2116            }),
2117        )
2118        .await;
2119
2120        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2121        let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
2122        project_a
2123            .update(cx_a, |project, cx| project.share(cx))
2124            .await
2125            .unwrap();
2126
2127        let project_b = client_b.build_remote_project(project_id, cx_b).await;
2128
2129        let worktree_a =
2130            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2131        let worktree_b =
2132            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2133
2134        let entry = project_b
2135            .update(cx_b, |project, cx| {
2136                project
2137                    .create_entry((worktree_id, "c.txt"), false, cx)
2138                    .unwrap()
2139            })
2140            .await
2141            .unwrap();
2142        worktree_a.read_with(cx_a, |worktree, _| {
2143            assert_eq!(
2144                worktree
2145                    .paths()
2146                    .map(|p| p.to_string_lossy())
2147                    .collect::<Vec<_>>(),
2148                ["a.txt", "b.txt", "c.txt"]
2149            );
2150        });
2151        worktree_b.read_with(cx_b, |worktree, _| {
2152            assert_eq!(
2153                worktree
2154                    .paths()
2155                    .map(|p| p.to_string_lossy())
2156                    .collect::<Vec<_>>(),
2157                ["a.txt", "b.txt", "c.txt"]
2158            );
2159        });
2160
2161        project_b
2162            .update(cx_b, |project, cx| {
2163                project.rename_entry(entry.id, Path::new("d.txt"), cx)
2164            })
2165            .unwrap()
2166            .await
2167            .unwrap();
2168        worktree_a.read_with(cx_a, |worktree, _| {
2169            assert_eq!(
2170                worktree
2171                    .paths()
2172                    .map(|p| p.to_string_lossy())
2173                    .collect::<Vec<_>>(),
2174                ["a.txt", "b.txt", "d.txt"]
2175            );
2176        });
2177        worktree_b.read_with(cx_b, |worktree, _| {
2178            assert_eq!(
2179                worktree
2180                    .paths()
2181                    .map(|p| p.to_string_lossy())
2182                    .collect::<Vec<_>>(),
2183                ["a.txt", "b.txt", "d.txt"]
2184            );
2185        });
2186
2187        let dir_entry = project_b
2188            .update(cx_b, |project, cx| {
2189                project
2190                    .create_entry((worktree_id, "DIR"), true, cx)
2191                    .unwrap()
2192            })
2193            .await
2194            .unwrap();
2195        worktree_a.read_with(cx_a, |worktree, _| {
2196            assert_eq!(
2197                worktree
2198                    .paths()
2199                    .map(|p| p.to_string_lossy())
2200                    .collect::<Vec<_>>(),
2201                ["DIR", "a.txt", "b.txt", "d.txt"]
2202            );
2203        });
2204        worktree_b.read_with(cx_b, |worktree, _| {
2205            assert_eq!(
2206                worktree
2207                    .paths()
2208                    .map(|p| p.to_string_lossy())
2209                    .collect::<Vec<_>>(),
2210                ["DIR", "a.txt", "b.txt", "d.txt"]
2211            );
2212        });
2213
2214        project_b
2215            .update(cx_b, |project, cx| {
2216                project.delete_entry(dir_entry.id, cx).unwrap()
2217            })
2218            .await
2219            .unwrap();
2220        worktree_a.read_with(cx_a, |worktree, _| {
2221            assert_eq!(
2222                worktree
2223                    .paths()
2224                    .map(|p| p.to_string_lossy())
2225                    .collect::<Vec<_>>(),
2226                ["a.txt", "b.txt", "d.txt"]
2227            );
2228        });
2229        worktree_b.read_with(cx_b, |worktree, _| {
2230            assert_eq!(
2231                worktree
2232                    .paths()
2233                    .map(|p| p.to_string_lossy())
2234                    .collect::<Vec<_>>(),
2235                ["a.txt", "b.txt", "d.txt"]
2236            );
2237        });
2238
2239        project_b
2240            .update(cx_b, |project, cx| {
2241                project.delete_entry(entry.id, cx).unwrap()
2242            })
2243            .await
2244            .unwrap();
2245        worktree_a.read_with(cx_a, |worktree, _| {
2246            assert_eq!(
2247                worktree
2248                    .paths()
2249                    .map(|p| p.to_string_lossy())
2250                    .collect::<Vec<_>>(),
2251                ["a.txt", "b.txt"]
2252            );
2253        });
2254        worktree_b.read_with(cx_b, |worktree, _| {
2255            assert_eq!(
2256                worktree
2257                    .paths()
2258                    .map(|p| p.to_string_lossy())
2259                    .collect::<Vec<_>>(),
2260                ["a.txt", "b.txt"]
2261            );
2262        });
2263    }
2264
2265    #[gpui::test(iterations = 10)]
2266    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2267        cx_a.foreground().forbid_parking();
2268        let lang_registry = Arc::new(LanguageRegistry::test());
2269        let fs = FakeFs::new(cx_a.background());
2270
2271        // Connect to a server as 2 clients.
2272        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2273        let client_a = server.create_client(cx_a, "user_a").await;
2274        let client_b = server.create_client(cx_b, "user_b").await;
2275        server
2276            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2277            .await;
2278
2279        // Share a project as client A
2280        fs.insert_tree(
2281            "/dir",
2282            json!({
2283                "a.txt": "a-contents",
2284            }),
2285        )
2286        .await;
2287
2288        let project_a = cx_a.update(|cx| {
2289            Project::local(
2290                client_a.clone(),
2291                client_a.user_store.clone(),
2292                lang_registry.clone(),
2293                fs.clone(),
2294                cx,
2295            )
2296        });
2297        let (worktree_a, _) = project_a
2298            .update(cx_a, |p, cx| {
2299                p.find_or_create_local_worktree("/dir", true, cx)
2300            })
2301            .await
2302            .unwrap();
2303        worktree_a
2304            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2305            .await;
2306        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2307        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2308        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2309
2310        // Join that project as client B
2311        let project_b = Project::remote(
2312            project_id,
2313            client_b.clone(),
2314            client_b.user_store.clone(),
2315            lang_registry.clone(),
2316            fs.clone(),
2317            &mut cx_b.to_async(),
2318        )
2319        .await
2320        .unwrap();
2321
2322        // Open a buffer as client B
2323        let buffer_b = project_b
2324            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2325            .await
2326            .unwrap();
2327
2328        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2329        buffer_b.read_with(cx_b, |buf, _| {
2330            assert!(buf.is_dirty());
2331            assert!(!buf.has_conflict());
2332        });
2333
2334        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2335        buffer_b
2336            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2337            .await;
2338        buffer_b.read_with(cx_b, |buf, _| {
2339            assert!(!buf.has_conflict());
2340        });
2341
2342        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2343        buffer_b.read_with(cx_b, |buf, _| {
2344            assert!(buf.is_dirty());
2345            assert!(!buf.has_conflict());
2346        });
2347    }
2348
2349    #[gpui::test(iterations = 10)]
2350    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2351        cx_a.foreground().forbid_parking();
2352        let lang_registry = Arc::new(LanguageRegistry::test());
2353        let fs = FakeFs::new(cx_a.background());
2354
2355        // Connect to a server as 2 clients.
2356        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2357        let client_a = server.create_client(cx_a, "user_a").await;
2358        let client_b = server.create_client(cx_b, "user_b").await;
2359        server
2360            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2361            .await;
2362
2363        // Share a project as client A
2364        fs.insert_tree(
2365            "/dir",
2366            json!({
2367                "a.txt": "a-contents",
2368            }),
2369        )
2370        .await;
2371
2372        let project_a = cx_a.update(|cx| {
2373            Project::local(
2374                client_a.clone(),
2375                client_a.user_store.clone(),
2376                lang_registry.clone(),
2377                fs.clone(),
2378                cx,
2379            )
2380        });
2381        let (worktree_a, _) = project_a
2382            .update(cx_a, |p, cx| {
2383                p.find_or_create_local_worktree("/dir", true, cx)
2384            })
2385            .await
2386            .unwrap();
2387        worktree_a
2388            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2389            .await;
2390        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2391        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2392        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2393
2394        // Join that project as client B
2395        let project_b = Project::remote(
2396            project_id,
2397            client_b.clone(),
2398            client_b.user_store.clone(),
2399            lang_registry.clone(),
2400            fs.clone(),
2401            &mut cx_b.to_async(),
2402        )
2403        .await
2404        .unwrap();
2405        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2406
2407        // Open a buffer as client B
2408        let buffer_b = project_b
2409            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2410            .await
2411            .unwrap();
2412        buffer_b.read_with(cx_b, |buf, _| {
2413            assert!(!buf.is_dirty());
2414            assert!(!buf.has_conflict());
2415        });
2416
2417        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2418            .await
2419            .unwrap();
2420        buffer_b
2421            .condition(&cx_b, |buf, _| {
2422                buf.text() == "new contents" && !buf.is_dirty()
2423            })
2424            .await;
2425        buffer_b.read_with(cx_b, |buf, _| {
2426            assert!(!buf.has_conflict());
2427        });
2428    }
2429
2430    #[gpui::test(iterations = 10)]
2431    async fn test_editing_while_guest_opens_buffer(
2432        cx_a: &mut TestAppContext,
2433        cx_b: &mut TestAppContext,
2434    ) {
2435        cx_a.foreground().forbid_parking();
2436        let lang_registry = Arc::new(LanguageRegistry::test());
2437        let fs = FakeFs::new(cx_a.background());
2438
2439        // Connect to a server as 2 clients.
2440        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2441        let client_a = server.create_client(cx_a, "user_a").await;
2442        let client_b = server.create_client(cx_b, "user_b").await;
2443        server
2444            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2445            .await;
2446
2447        // Share a project as client A
2448        fs.insert_tree(
2449            "/dir",
2450            json!({
2451                "a.txt": "a-contents",
2452            }),
2453        )
2454        .await;
2455        let project_a = cx_a.update(|cx| {
2456            Project::local(
2457                client_a.clone(),
2458                client_a.user_store.clone(),
2459                lang_registry.clone(),
2460                fs.clone(),
2461                cx,
2462            )
2463        });
2464        let (worktree_a, _) = project_a
2465            .update(cx_a, |p, cx| {
2466                p.find_or_create_local_worktree("/dir", true, cx)
2467            })
2468            .await
2469            .unwrap();
2470        worktree_a
2471            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2472            .await;
2473        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2474        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2475        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2476
2477        // Join that project as client B
2478        let project_b = Project::remote(
2479            project_id,
2480            client_b.clone(),
2481            client_b.user_store.clone(),
2482            lang_registry.clone(),
2483            fs.clone(),
2484            &mut cx_b.to_async(),
2485        )
2486        .await
2487        .unwrap();
2488
2489        // Open a buffer as client A
2490        let buffer_a = project_a
2491            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2492            .await
2493            .unwrap();
2494
2495        // Start opening the same buffer as client B
2496        let buffer_b = cx_b
2497            .background()
2498            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2499
2500        // Edit the buffer as client A while client B is still opening it.
2501        cx_b.background().simulate_random_delay().await;
2502        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2503        cx_b.background().simulate_random_delay().await;
2504        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2505
2506        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2507        let buffer_b = buffer_b.await.unwrap();
2508        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2509    }
2510
2511    #[gpui::test(iterations = 10)]
2512    async fn test_leaving_worktree_while_opening_buffer(
2513        cx_a: &mut TestAppContext,
2514        cx_b: &mut TestAppContext,
2515    ) {
2516        cx_a.foreground().forbid_parking();
2517        let lang_registry = Arc::new(LanguageRegistry::test());
2518        let fs = FakeFs::new(cx_a.background());
2519
2520        // Connect to a server as 2 clients.
2521        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2522        let client_a = server.create_client(cx_a, "user_a").await;
2523        let client_b = server.create_client(cx_b, "user_b").await;
2524        server
2525            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2526            .await;
2527
2528        // Share a project as client A
2529        fs.insert_tree(
2530            "/dir",
2531            json!({
2532                "a.txt": "a-contents",
2533            }),
2534        )
2535        .await;
2536        let project_a = cx_a.update(|cx| {
2537            Project::local(
2538                client_a.clone(),
2539                client_a.user_store.clone(),
2540                lang_registry.clone(),
2541                fs.clone(),
2542                cx,
2543            )
2544        });
2545        let (worktree_a, _) = project_a
2546            .update(cx_a, |p, cx| {
2547                p.find_or_create_local_worktree("/dir", true, cx)
2548            })
2549            .await
2550            .unwrap();
2551        worktree_a
2552            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2553            .await;
2554        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2555        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2556        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2557
2558        // Join that project as client B
2559        let project_b = Project::remote(
2560            project_id,
2561            client_b.clone(),
2562            client_b.user_store.clone(),
2563            lang_registry.clone(),
2564            fs.clone(),
2565            &mut cx_b.to_async(),
2566        )
2567        .await
2568        .unwrap();
2569
2570        // See that a guest has joined as client A.
2571        project_a
2572            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2573            .await;
2574
2575        // Begin opening a buffer as client B, but leave the project before the open completes.
2576        let buffer_b = cx_b
2577            .background()
2578            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2579        cx_b.update(|_| drop(project_b));
2580        drop(buffer_b);
2581
2582        // See that the guest has left.
2583        project_a
2584            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2585            .await;
2586    }
2587
2588    #[gpui::test(iterations = 10)]
2589    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2590        cx_a.foreground().forbid_parking();
2591        let lang_registry = Arc::new(LanguageRegistry::test());
2592        let fs = FakeFs::new(cx_a.background());
2593
2594        // Connect to a server as 2 clients.
2595        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2596        let client_a = server.create_client(cx_a, "user_a").await;
2597        let client_b = server.create_client(cx_b, "user_b").await;
2598        server
2599            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2600            .await;
2601
2602        // Share a project as client A
2603        fs.insert_tree(
2604            "/a",
2605            json!({
2606                "a.txt": "a-contents",
2607                "b.txt": "b-contents",
2608            }),
2609        )
2610        .await;
2611        let project_a = cx_a.update(|cx| {
2612            Project::local(
2613                client_a.clone(),
2614                client_a.user_store.clone(),
2615                lang_registry.clone(),
2616                fs.clone(),
2617                cx,
2618            )
2619        });
2620        let (worktree_a, _) = project_a
2621            .update(cx_a, |p, cx| {
2622                p.find_or_create_local_worktree("/a", true, cx)
2623            })
2624            .await
2625            .unwrap();
2626        worktree_a
2627            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2628            .await;
2629        let project_id = project_a
2630            .update(cx_a, |project, _| project.next_remote_id())
2631            .await;
2632        project_a
2633            .update(cx_a, |project, cx| project.share(cx))
2634            .await
2635            .unwrap();
2636
2637        // Join that project as client B
2638        let _project_b = Project::remote(
2639            project_id,
2640            client_b.clone(),
2641            client_b.user_store.clone(),
2642            lang_registry.clone(),
2643            fs.clone(),
2644            &mut cx_b.to_async(),
2645        )
2646        .await
2647        .unwrap();
2648
2649        // Client A sees that a guest has joined.
2650        project_a
2651            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2652            .await;
2653
2654        // Drop client B's connection and ensure client A observes client B leaving the project.
2655        client_b.disconnect(&cx_b.to_async()).unwrap();
2656        project_a
2657            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2658            .await;
2659
2660        // Rejoin the project as client B
2661        let _project_b = Project::remote(
2662            project_id,
2663            client_b.clone(),
2664            client_b.user_store.clone(),
2665            lang_registry.clone(),
2666            fs.clone(),
2667            &mut cx_b.to_async(),
2668        )
2669        .await
2670        .unwrap();
2671
2672        // Client A sees that a guest has re-joined.
2673        project_a
2674            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2675            .await;
2676
2677        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2678        client_b.wait_for_current_user(cx_b).await;
2679        server.disconnect_client(client_b.current_user_id(cx_b));
2680        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2681        project_a
2682            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2683            .await;
2684    }
2685
2686    #[gpui::test(iterations = 10)]
2687    async fn test_collaborating_with_diagnostics(
2688        cx_a: &mut TestAppContext,
2689        cx_b: &mut TestAppContext,
2690    ) {
2691        cx_a.foreground().forbid_parking();
2692        let lang_registry = Arc::new(LanguageRegistry::test());
2693        let fs = FakeFs::new(cx_a.background());
2694
2695        // Set up a fake language server.
2696        let mut language = Language::new(
2697            LanguageConfig {
2698                name: "Rust".into(),
2699                path_suffixes: vec!["rs".to_string()],
2700                ..Default::default()
2701            },
2702            Some(tree_sitter_rust::language()),
2703        );
2704        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2705        lang_registry.add(Arc::new(language));
2706
2707        // Connect to a server as 2 clients.
2708        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2709        let client_a = server.create_client(cx_a, "user_a").await;
2710        let client_b = server.create_client(cx_b, "user_b").await;
2711        server
2712            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2713            .await;
2714
2715        // Share a project as client A
2716        fs.insert_tree(
2717            "/a",
2718            json!({
2719                "a.rs": "let one = two",
2720                "other.rs": "",
2721            }),
2722        )
2723        .await;
2724        let project_a = cx_a.update(|cx| {
2725            Project::local(
2726                client_a.clone(),
2727                client_a.user_store.clone(),
2728                lang_registry.clone(),
2729                fs.clone(),
2730                cx,
2731            )
2732        });
2733        let (worktree_a, _) = project_a
2734            .update(cx_a, |p, cx| {
2735                p.find_or_create_local_worktree("/a", true, cx)
2736            })
2737            .await
2738            .unwrap();
2739        worktree_a
2740            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2741            .await;
2742        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2743        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2744        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2745
2746        // Cause the language server to start.
2747        let _ = cx_a
2748            .background()
2749            .spawn(project_a.update(cx_a, |project, cx| {
2750                project.open_buffer(
2751                    ProjectPath {
2752                        worktree_id,
2753                        path: Path::new("other.rs").into(),
2754                    },
2755                    cx,
2756                )
2757            }))
2758            .await
2759            .unwrap();
2760
2761        // Simulate a language server reporting errors for a file.
2762        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2763        fake_language_server
2764            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2765            .await;
2766        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2767            lsp::PublishDiagnosticsParams {
2768                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2769                version: None,
2770                diagnostics: vec![lsp::Diagnostic {
2771                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2772                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2773                    message: "message 1".to_string(),
2774                    ..Default::default()
2775                }],
2776            },
2777        );
2778
2779        // Wait for server to see the diagnostics update.
2780        server
2781            .condition(|store| {
2782                let worktree = store
2783                    .project(project_id)
2784                    .unwrap()
2785                    .share
2786                    .as_ref()
2787                    .unwrap()
2788                    .worktrees
2789                    .get(&worktree_id.to_proto())
2790                    .unwrap();
2791
2792                !worktree.diagnostic_summaries.is_empty()
2793            })
2794            .await;
2795
2796        // Join the worktree as client B.
2797        let project_b = Project::remote(
2798            project_id,
2799            client_b.clone(),
2800            client_b.user_store.clone(),
2801            lang_registry.clone(),
2802            fs.clone(),
2803            &mut cx_b.to_async(),
2804        )
2805        .await
2806        .unwrap();
2807
2808        project_b.read_with(cx_b, |project, cx| {
2809            assert_eq!(
2810                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2811                &[(
2812                    ProjectPath {
2813                        worktree_id,
2814                        path: Arc::from(Path::new("a.rs")),
2815                    },
2816                    DiagnosticSummary {
2817                        error_count: 1,
2818                        warning_count: 0,
2819                        ..Default::default()
2820                    },
2821                )]
2822            )
2823        });
2824
2825        // Simulate a language server reporting more errors for a file.
2826        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2827            lsp::PublishDiagnosticsParams {
2828                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2829                version: None,
2830                diagnostics: vec![
2831                    lsp::Diagnostic {
2832                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2833                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2834                        message: "message 1".to_string(),
2835                        ..Default::default()
2836                    },
2837                    lsp::Diagnostic {
2838                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2839                        range: lsp::Range::new(
2840                            lsp::Position::new(0, 10),
2841                            lsp::Position::new(0, 13),
2842                        ),
2843                        message: "message 2".to_string(),
2844                        ..Default::default()
2845                    },
2846                ],
2847            },
2848        );
2849
2850        // Client b gets the updated summaries
2851        project_b
2852            .condition(&cx_b, |project, cx| {
2853                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2854                    == &[(
2855                        ProjectPath {
2856                            worktree_id,
2857                            path: Arc::from(Path::new("a.rs")),
2858                        },
2859                        DiagnosticSummary {
2860                            error_count: 1,
2861                            warning_count: 1,
2862                            ..Default::default()
2863                        },
2864                    )]
2865            })
2866            .await;
2867
2868        // Open the file with the errors on client B. They should be present.
2869        let buffer_b = cx_b
2870            .background()
2871            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2872            .await
2873            .unwrap();
2874
2875        buffer_b.read_with(cx_b, |buffer, _| {
2876            assert_eq!(
2877                buffer
2878                    .snapshot()
2879                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2880                    .map(|entry| entry)
2881                    .collect::<Vec<_>>(),
2882                &[
2883                    DiagnosticEntry {
2884                        range: Point::new(0, 4)..Point::new(0, 7),
2885                        diagnostic: Diagnostic {
2886                            group_id: 0,
2887                            message: "message 1".to_string(),
2888                            severity: lsp::DiagnosticSeverity::ERROR,
2889                            is_primary: true,
2890                            ..Default::default()
2891                        }
2892                    },
2893                    DiagnosticEntry {
2894                        range: Point::new(0, 10)..Point::new(0, 13),
2895                        diagnostic: Diagnostic {
2896                            group_id: 1,
2897                            severity: lsp::DiagnosticSeverity::WARNING,
2898                            message: "message 2".to_string(),
2899                            is_primary: true,
2900                            ..Default::default()
2901                        }
2902                    }
2903                ]
2904            );
2905        });
2906
2907        // Simulate a language server reporting no errors for a file.
2908        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2909            lsp::PublishDiagnosticsParams {
2910                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2911                version: None,
2912                diagnostics: vec![],
2913            },
2914        );
2915        project_a
2916            .condition(cx_a, |project, cx| {
2917                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2918            })
2919            .await;
2920        project_b
2921            .condition(cx_b, |project, cx| {
2922                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2923            })
2924            .await;
2925    }
2926
2927    #[gpui::test(iterations = 10)]
2928    async fn test_collaborating_with_completion(
2929        cx_a: &mut TestAppContext,
2930        cx_b: &mut TestAppContext,
2931    ) {
2932        cx_a.foreground().forbid_parking();
2933        let lang_registry = Arc::new(LanguageRegistry::test());
2934        let fs = FakeFs::new(cx_a.background());
2935
2936        // Set up a fake language server.
2937        let mut language = Language::new(
2938            LanguageConfig {
2939                name: "Rust".into(),
2940                path_suffixes: vec!["rs".to_string()],
2941                ..Default::default()
2942            },
2943            Some(tree_sitter_rust::language()),
2944        );
2945        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2946            capabilities: lsp::ServerCapabilities {
2947                completion_provider: Some(lsp::CompletionOptions {
2948                    trigger_characters: Some(vec![".".to_string()]),
2949                    ..Default::default()
2950                }),
2951                ..Default::default()
2952            },
2953            ..Default::default()
2954        });
2955        lang_registry.add(Arc::new(language));
2956
2957        // Connect to a server as 2 clients.
2958        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2959        let client_a = server.create_client(cx_a, "user_a").await;
2960        let client_b = server.create_client(cx_b, "user_b").await;
2961        server
2962            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2963            .await;
2964
2965        // Share a project as client A
2966        fs.insert_tree(
2967            "/a",
2968            json!({
2969                "main.rs": "fn main() { a }",
2970                "other.rs": "",
2971            }),
2972        )
2973        .await;
2974        let project_a = cx_a.update(|cx| {
2975            Project::local(
2976                client_a.clone(),
2977                client_a.user_store.clone(),
2978                lang_registry.clone(),
2979                fs.clone(),
2980                cx,
2981            )
2982        });
2983        let (worktree_a, _) = project_a
2984            .update(cx_a, |p, cx| {
2985                p.find_or_create_local_worktree("/a", true, cx)
2986            })
2987            .await
2988            .unwrap();
2989        worktree_a
2990            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2991            .await;
2992        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2993        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2994        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2995
2996        // Join the worktree as client B.
2997        let project_b = Project::remote(
2998            project_id,
2999            client_b.clone(),
3000            client_b.user_store.clone(),
3001            lang_registry.clone(),
3002            fs.clone(),
3003            &mut cx_b.to_async(),
3004        )
3005        .await
3006        .unwrap();
3007
3008        // Open a file in an editor as the guest.
3009        let buffer_b = project_b
3010            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3011            .await
3012            .unwrap();
3013        let (window_b, _) = cx_b.add_window(|_| EmptyView);
3014        let editor_b = cx_b.add_view(window_b, |cx| {
3015            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
3016        });
3017
3018        let fake_language_server = fake_language_servers.next().await.unwrap();
3019        buffer_b
3020            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3021            .await;
3022
3023        // Type a completion trigger character as the guest.
3024        editor_b.update(cx_b, |editor, cx| {
3025            editor.select_ranges([13..13], None, cx);
3026            editor.handle_input(&Input(".".into()), cx);
3027            cx.focus(&editor_b);
3028        });
3029
3030        // Receive a completion request as the host's language server.
3031        // Return some completions from the host's language server.
3032        cx_a.foreground().start_waiting();
3033        fake_language_server
3034            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3035                assert_eq!(
3036                    params.text_document_position.text_document.uri,
3037                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3038                );
3039                assert_eq!(
3040                    params.text_document_position.position,
3041                    lsp::Position::new(0, 14),
3042                );
3043
3044                Ok(Some(lsp::CompletionResponse::Array(vec![
3045                    lsp::CompletionItem {
3046                        label: "first_method(…)".into(),
3047                        detail: Some("fn(&mut self, B) -> C".into()),
3048                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3049                            new_text: "first_method($1)".to_string(),
3050                            range: lsp::Range::new(
3051                                lsp::Position::new(0, 14),
3052                                lsp::Position::new(0, 14),
3053                            ),
3054                        })),
3055                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3056                        ..Default::default()
3057                    },
3058                    lsp::CompletionItem {
3059                        label: "second_method(…)".into(),
3060                        detail: Some("fn(&mut self, C) -> D<E>".into()),
3061                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3062                            new_text: "second_method()".to_string(),
3063                            range: lsp::Range::new(
3064                                lsp::Position::new(0, 14),
3065                                lsp::Position::new(0, 14),
3066                            ),
3067                        })),
3068                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3069                        ..Default::default()
3070                    },
3071                ])))
3072            })
3073            .next()
3074            .await
3075            .unwrap();
3076        cx_a.foreground().finish_waiting();
3077
3078        // Open the buffer on the host.
3079        let buffer_a = project_a
3080            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3081            .await
3082            .unwrap();
3083        buffer_a
3084            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3085            .await;
3086
3087        // Confirm a completion on the guest.
3088        editor_b
3089            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3090            .await;
3091        editor_b.update(cx_b, |editor, cx| {
3092            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3093            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3094        });
3095
3096        // Return a resolved completion from the host's language server.
3097        // The resolved completion has an additional text edit.
3098        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3099            |params, _| async move {
3100                assert_eq!(params.label, "first_method(…)");
3101                Ok(lsp::CompletionItem {
3102                    label: "first_method(…)".into(),
3103                    detail: Some("fn(&mut self, B) -> C".into()),
3104                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3105                        new_text: "first_method($1)".to_string(),
3106                        range: lsp::Range::new(
3107                            lsp::Position::new(0, 14),
3108                            lsp::Position::new(0, 14),
3109                        ),
3110                    })),
3111                    additional_text_edits: Some(vec![lsp::TextEdit {
3112                        new_text: "use d::SomeTrait;\n".to_string(),
3113                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3114                    }]),
3115                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3116                    ..Default::default()
3117                })
3118            },
3119        );
3120
3121        // The additional edit is applied.
3122        buffer_a
3123            .condition(&cx_a, |buffer, _| {
3124                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3125            })
3126            .await;
3127        buffer_b
3128            .condition(&cx_b, |buffer, _| {
3129                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3130            })
3131            .await;
3132    }
3133
3134    #[gpui::test(iterations = 10)]
3135    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3136        cx_a.foreground().forbid_parking();
3137        let lang_registry = Arc::new(LanguageRegistry::test());
3138        let fs = FakeFs::new(cx_a.background());
3139
3140        // Connect to a server as 2 clients.
3141        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3142        let client_a = server.create_client(cx_a, "user_a").await;
3143        let client_b = server.create_client(cx_b, "user_b").await;
3144        server
3145            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3146            .await;
3147
3148        // Share a project as client A
3149        fs.insert_tree(
3150            "/a",
3151            json!({
3152                "a.rs": "let one = 1;",
3153            }),
3154        )
3155        .await;
3156        let project_a = cx_a.update(|cx| {
3157            Project::local(
3158                client_a.clone(),
3159                client_a.user_store.clone(),
3160                lang_registry.clone(),
3161                fs.clone(),
3162                cx,
3163            )
3164        });
3165        let (worktree_a, _) = project_a
3166            .update(cx_a, |p, cx| {
3167                p.find_or_create_local_worktree("/a", true, cx)
3168            })
3169            .await
3170            .unwrap();
3171        worktree_a
3172            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3173            .await;
3174        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3175        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3176        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3177        let buffer_a = project_a
3178            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3179            .await
3180            .unwrap();
3181
3182        // Join the worktree as client B.
3183        let project_b = Project::remote(
3184            project_id,
3185            client_b.clone(),
3186            client_b.user_store.clone(),
3187            lang_registry.clone(),
3188            fs.clone(),
3189            &mut cx_b.to_async(),
3190        )
3191        .await
3192        .unwrap();
3193
3194        let buffer_b = cx_b
3195            .background()
3196            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3197            .await
3198            .unwrap();
3199        buffer_b.update(cx_b, |buffer, cx| {
3200            buffer.edit([(4..7, "six")], cx);
3201            buffer.edit([(10..11, "6")], cx);
3202            assert_eq!(buffer.text(), "let six = 6;");
3203            assert!(buffer.is_dirty());
3204            assert!(!buffer.has_conflict());
3205        });
3206        buffer_a
3207            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3208            .await;
3209
3210        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3211            .await
3212            .unwrap();
3213        buffer_a
3214            .condition(cx_a, |buffer, _| buffer.has_conflict())
3215            .await;
3216        buffer_b
3217            .condition(cx_b, |buffer, _| buffer.has_conflict())
3218            .await;
3219
3220        project_b
3221            .update(cx_b, |project, cx| {
3222                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3223            })
3224            .await
3225            .unwrap();
3226        buffer_a.read_with(cx_a, |buffer, _| {
3227            assert_eq!(buffer.text(), "let seven = 7;");
3228            assert!(!buffer.is_dirty());
3229            assert!(!buffer.has_conflict());
3230        });
3231        buffer_b.read_with(cx_b, |buffer, _| {
3232            assert_eq!(buffer.text(), "let seven = 7;");
3233            assert!(!buffer.is_dirty());
3234            assert!(!buffer.has_conflict());
3235        });
3236
3237        buffer_a.update(cx_a, |buffer, cx| {
3238            // Undoing on the host is a no-op when the reload was initiated by the guest.
3239            buffer.undo(cx);
3240            assert_eq!(buffer.text(), "let seven = 7;");
3241            assert!(!buffer.is_dirty());
3242            assert!(!buffer.has_conflict());
3243        });
3244        buffer_b.update(cx_b, |buffer, cx| {
3245            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3246            buffer.undo(cx);
3247            assert_eq!(buffer.text(), "let six = 6;");
3248            assert!(buffer.is_dirty());
3249            assert!(!buffer.has_conflict());
3250        });
3251    }
3252
3253    #[gpui::test(iterations = 10)]
3254    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3255        cx_a.foreground().forbid_parking();
3256        let lang_registry = Arc::new(LanguageRegistry::test());
3257        let fs = FakeFs::new(cx_a.background());
3258
3259        // Set up a fake language server.
3260        let mut language = Language::new(
3261            LanguageConfig {
3262                name: "Rust".into(),
3263                path_suffixes: vec!["rs".to_string()],
3264                ..Default::default()
3265            },
3266            Some(tree_sitter_rust::language()),
3267        );
3268        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3269        lang_registry.add(Arc::new(language));
3270
3271        // Connect to a server as 2 clients.
3272        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3273        let client_a = server.create_client(cx_a, "user_a").await;
3274        let client_b = server.create_client(cx_b, "user_b").await;
3275        server
3276            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3277            .await;
3278
3279        // Share a project as client A
3280        fs.insert_tree(
3281            "/a",
3282            json!({
3283                "a.rs": "let one = two",
3284            }),
3285        )
3286        .await;
3287        let project_a = cx_a.update(|cx| {
3288            Project::local(
3289                client_a.clone(),
3290                client_a.user_store.clone(),
3291                lang_registry.clone(),
3292                fs.clone(),
3293                cx,
3294            )
3295        });
3296        let (worktree_a, _) = project_a
3297            .update(cx_a, |p, cx| {
3298                p.find_or_create_local_worktree("/a", true, cx)
3299            })
3300            .await
3301            .unwrap();
3302        worktree_a
3303            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3304            .await;
3305        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3306        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3307        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3308
3309        // Join the worktree as client B.
3310        let project_b = Project::remote(
3311            project_id,
3312            client_b.clone(),
3313            client_b.user_store.clone(),
3314            lang_registry.clone(),
3315            fs.clone(),
3316            &mut cx_b.to_async(),
3317        )
3318        .await
3319        .unwrap();
3320
3321        let buffer_b = cx_b
3322            .background()
3323            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3324            .await
3325            .unwrap();
3326
3327        let fake_language_server = fake_language_servers.next().await.unwrap();
3328        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3329            Ok(Some(vec![
3330                lsp::TextEdit {
3331                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3332                    new_text: "h".to_string(),
3333                },
3334                lsp::TextEdit {
3335                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3336                    new_text: "y".to_string(),
3337                },
3338            ]))
3339        });
3340
3341        project_b
3342            .update(cx_b, |project, cx| {
3343                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3344            })
3345            .await
3346            .unwrap();
3347        assert_eq!(
3348            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3349            "let honey = two"
3350        );
3351    }
3352
3353    #[gpui::test(iterations = 10)]
3354    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3355        cx_a.foreground().forbid_parking();
3356        let lang_registry = Arc::new(LanguageRegistry::test());
3357        let fs = FakeFs::new(cx_a.background());
3358        fs.insert_tree(
3359            "/root-1",
3360            json!({
3361                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3362            }),
3363        )
3364        .await;
3365        fs.insert_tree(
3366            "/root-2",
3367            json!({
3368                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3369            }),
3370        )
3371        .await;
3372
3373        // Set up a fake language server.
3374        let mut language = Language::new(
3375            LanguageConfig {
3376                name: "Rust".into(),
3377                path_suffixes: vec!["rs".to_string()],
3378                ..Default::default()
3379            },
3380            Some(tree_sitter_rust::language()),
3381        );
3382        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3383        lang_registry.add(Arc::new(language));
3384
3385        // Connect to a server as 2 clients.
3386        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3387        let client_a = server.create_client(cx_a, "user_a").await;
3388        let client_b = server.create_client(cx_b, "user_b").await;
3389        server
3390            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3391            .await;
3392
3393        // Share a project as client A
3394        let project_a = cx_a.update(|cx| {
3395            Project::local(
3396                client_a.clone(),
3397                client_a.user_store.clone(),
3398                lang_registry.clone(),
3399                fs.clone(),
3400                cx,
3401            )
3402        });
3403        let (worktree_a, _) = project_a
3404            .update(cx_a, |p, cx| {
3405                p.find_or_create_local_worktree("/root-1", true, cx)
3406            })
3407            .await
3408            .unwrap();
3409        worktree_a
3410            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3411            .await;
3412        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3413        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3414        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3415
3416        // Join the worktree as client B.
3417        let project_b = Project::remote(
3418            project_id,
3419            client_b.clone(),
3420            client_b.user_store.clone(),
3421            lang_registry.clone(),
3422            fs.clone(),
3423            &mut cx_b.to_async(),
3424        )
3425        .await
3426        .unwrap();
3427
3428        // Open the file on client B.
3429        let buffer_b = cx_b
3430            .background()
3431            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3432            .await
3433            .unwrap();
3434
3435        // Request the definition of a symbol as the guest.
3436        let fake_language_server = fake_language_servers.next().await.unwrap();
3437        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3438            |_, _| async move {
3439                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3440                    lsp::Location::new(
3441                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3442                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3443                    ),
3444                )))
3445            },
3446        );
3447
3448        let definitions_1 = project_b
3449            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3450            .await
3451            .unwrap();
3452        cx_b.read(|cx| {
3453            assert_eq!(definitions_1.len(), 1);
3454            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3455            let target_buffer = definitions_1[0].buffer.read(cx);
3456            assert_eq!(
3457                target_buffer.text(),
3458                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3459            );
3460            assert_eq!(
3461                definitions_1[0].range.to_point(target_buffer),
3462                Point::new(0, 6)..Point::new(0, 9)
3463            );
3464        });
3465
3466        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3467        // the previous call to `definition`.
3468        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3469            |_, _| async move {
3470                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3471                    lsp::Location::new(
3472                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3473                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3474                    ),
3475                )))
3476            },
3477        );
3478
3479        let definitions_2 = project_b
3480            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3481            .await
3482            .unwrap();
3483        cx_b.read(|cx| {
3484            assert_eq!(definitions_2.len(), 1);
3485            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3486            let target_buffer = definitions_2[0].buffer.read(cx);
3487            assert_eq!(
3488                target_buffer.text(),
3489                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3490            );
3491            assert_eq!(
3492                definitions_2[0].range.to_point(target_buffer),
3493                Point::new(1, 6)..Point::new(1, 11)
3494            );
3495        });
3496        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3497    }
3498
3499    #[gpui::test(iterations = 10)]
3500    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3501        cx_a.foreground().forbid_parking();
3502        let lang_registry = Arc::new(LanguageRegistry::test());
3503        let fs = FakeFs::new(cx_a.background());
3504        fs.insert_tree(
3505            "/root-1",
3506            json!({
3507                "one.rs": "const ONE: usize = 1;",
3508                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3509            }),
3510        )
3511        .await;
3512        fs.insert_tree(
3513            "/root-2",
3514            json!({
3515                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3516            }),
3517        )
3518        .await;
3519
3520        // Set up a fake language server.
3521        let mut language = Language::new(
3522            LanguageConfig {
3523                name: "Rust".into(),
3524                path_suffixes: vec!["rs".to_string()],
3525                ..Default::default()
3526            },
3527            Some(tree_sitter_rust::language()),
3528        );
3529        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3530        lang_registry.add(Arc::new(language));
3531
3532        // Connect to a server as 2 clients.
3533        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3534        let client_a = server.create_client(cx_a, "user_a").await;
3535        let client_b = server.create_client(cx_b, "user_b").await;
3536        server
3537            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3538            .await;
3539
3540        // Share a project as client A
3541        let project_a = cx_a.update(|cx| {
3542            Project::local(
3543                client_a.clone(),
3544                client_a.user_store.clone(),
3545                lang_registry.clone(),
3546                fs.clone(),
3547                cx,
3548            )
3549        });
3550        let (worktree_a, _) = project_a
3551            .update(cx_a, |p, cx| {
3552                p.find_or_create_local_worktree("/root-1", true, cx)
3553            })
3554            .await
3555            .unwrap();
3556        worktree_a
3557            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3558            .await;
3559        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3560        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3561        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3562
3563        // Join the worktree as client B.
3564        let project_b = Project::remote(
3565            project_id,
3566            client_b.clone(),
3567            client_b.user_store.clone(),
3568            lang_registry.clone(),
3569            fs.clone(),
3570            &mut cx_b.to_async(),
3571        )
3572        .await
3573        .unwrap();
3574
3575        // Open the file on client B.
3576        let buffer_b = cx_b
3577            .background()
3578            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3579            .await
3580            .unwrap();
3581
3582        // Request references to a symbol as the guest.
3583        let fake_language_server = fake_language_servers.next().await.unwrap();
3584        fake_language_server.handle_request::<lsp::request::References, _, _>(
3585            |params, _| async move {
3586                assert_eq!(
3587                    params.text_document_position.text_document.uri.as_str(),
3588                    "file:///root-1/one.rs"
3589                );
3590                Ok(Some(vec![
3591                    lsp::Location {
3592                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3593                        range: lsp::Range::new(
3594                            lsp::Position::new(0, 24),
3595                            lsp::Position::new(0, 27),
3596                        ),
3597                    },
3598                    lsp::Location {
3599                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3600                        range: lsp::Range::new(
3601                            lsp::Position::new(0, 35),
3602                            lsp::Position::new(0, 38),
3603                        ),
3604                    },
3605                    lsp::Location {
3606                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3607                        range: lsp::Range::new(
3608                            lsp::Position::new(0, 37),
3609                            lsp::Position::new(0, 40),
3610                        ),
3611                    },
3612                ]))
3613            },
3614        );
3615
3616        let references = project_b
3617            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3618            .await
3619            .unwrap();
3620        cx_b.read(|cx| {
3621            assert_eq!(references.len(), 3);
3622            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3623
3624            let two_buffer = references[0].buffer.read(cx);
3625            let three_buffer = references[2].buffer.read(cx);
3626            assert_eq!(
3627                two_buffer.file().unwrap().path().as_ref(),
3628                Path::new("two.rs")
3629            );
3630            assert_eq!(references[1].buffer, references[0].buffer);
3631            assert_eq!(
3632                three_buffer.file().unwrap().full_path(cx),
3633                Path::new("three.rs")
3634            );
3635
3636            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3637            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3638            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3639        });
3640    }
3641
3642    #[gpui::test(iterations = 10)]
3643    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3644        cx_a.foreground().forbid_parking();
3645        let lang_registry = Arc::new(LanguageRegistry::test());
3646        let fs = FakeFs::new(cx_a.background());
3647        fs.insert_tree(
3648            "/root-1",
3649            json!({
3650                "a": "hello world",
3651                "b": "goodnight moon",
3652                "c": "a world of goo",
3653                "d": "world champion of clown world",
3654            }),
3655        )
3656        .await;
3657        fs.insert_tree(
3658            "/root-2",
3659            json!({
3660                "e": "disney world is fun",
3661            }),
3662        )
3663        .await;
3664
3665        // Connect to a server as 2 clients.
3666        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3667        let client_a = server.create_client(cx_a, "user_a").await;
3668        let client_b = server.create_client(cx_b, "user_b").await;
3669        server
3670            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3671            .await;
3672
3673        // Share a project as client A
3674        let project_a = cx_a.update(|cx| {
3675            Project::local(
3676                client_a.clone(),
3677                client_a.user_store.clone(),
3678                lang_registry.clone(),
3679                fs.clone(),
3680                cx,
3681            )
3682        });
3683        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3684
3685        let (worktree_1, _) = project_a
3686            .update(cx_a, |p, cx| {
3687                p.find_or_create_local_worktree("/root-1", true, cx)
3688            })
3689            .await
3690            .unwrap();
3691        worktree_1
3692            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3693            .await;
3694        let (worktree_2, _) = project_a
3695            .update(cx_a, |p, cx| {
3696                p.find_or_create_local_worktree("/root-2", true, cx)
3697            })
3698            .await
3699            .unwrap();
3700        worktree_2
3701            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3702            .await;
3703
3704        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3705
3706        // Join the worktree as client B.
3707        let project_b = Project::remote(
3708            project_id,
3709            client_b.clone(),
3710            client_b.user_store.clone(),
3711            lang_registry.clone(),
3712            fs.clone(),
3713            &mut cx_b.to_async(),
3714        )
3715        .await
3716        .unwrap();
3717
3718        let results = project_b
3719            .update(cx_b, |project, cx| {
3720                project.search(SearchQuery::text("world", false, false), cx)
3721            })
3722            .await
3723            .unwrap();
3724
3725        let mut ranges_by_path = results
3726            .into_iter()
3727            .map(|(buffer, ranges)| {
3728                buffer.read_with(cx_b, |buffer, cx| {
3729                    let path = buffer.file().unwrap().full_path(cx);
3730                    let offset_ranges = ranges
3731                        .into_iter()
3732                        .map(|range| range.to_offset(buffer))
3733                        .collect::<Vec<_>>();
3734                    (path, offset_ranges)
3735                })
3736            })
3737            .collect::<Vec<_>>();
3738        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3739
3740        assert_eq!(
3741            ranges_by_path,
3742            &[
3743                (PathBuf::from("root-1/a"), vec![6..11]),
3744                (PathBuf::from("root-1/c"), vec![2..7]),
3745                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3746                (PathBuf::from("root-2/e"), vec![7..12]),
3747            ]
3748        );
3749    }
3750
3751    #[gpui::test(iterations = 10)]
3752    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3753        cx_a.foreground().forbid_parking();
3754        let lang_registry = Arc::new(LanguageRegistry::test());
3755        let fs = FakeFs::new(cx_a.background());
3756        fs.insert_tree(
3757            "/root-1",
3758            json!({
3759                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3760            }),
3761        )
3762        .await;
3763
3764        // Set up a fake language server.
3765        let mut language = Language::new(
3766            LanguageConfig {
3767                name: "Rust".into(),
3768                path_suffixes: vec!["rs".to_string()],
3769                ..Default::default()
3770            },
3771            Some(tree_sitter_rust::language()),
3772        );
3773        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3774        lang_registry.add(Arc::new(language));
3775
3776        // Connect to a server as 2 clients.
3777        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3778        let client_a = server.create_client(cx_a, "user_a").await;
3779        let client_b = server.create_client(cx_b, "user_b").await;
3780        server
3781            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3782            .await;
3783
3784        // Share a project as client A
3785        let project_a = cx_a.update(|cx| {
3786            Project::local(
3787                client_a.clone(),
3788                client_a.user_store.clone(),
3789                lang_registry.clone(),
3790                fs.clone(),
3791                cx,
3792            )
3793        });
3794        let (worktree_a, _) = project_a
3795            .update(cx_a, |p, cx| {
3796                p.find_or_create_local_worktree("/root-1", true, cx)
3797            })
3798            .await
3799            .unwrap();
3800        worktree_a
3801            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3802            .await;
3803        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3804        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3805        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3806
3807        // Join the worktree as client B.
3808        let project_b = Project::remote(
3809            project_id,
3810            client_b.clone(),
3811            client_b.user_store.clone(),
3812            lang_registry.clone(),
3813            fs.clone(),
3814            &mut cx_b.to_async(),
3815        )
3816        .await
3817        .unwrap();
3818
3819        // Open the file on client B.
3820        let buffer_b = cx_b
3821            .background()
3822            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3823            .await
3824            .unwrap();
3825
3826        // Request document highlights as the guest.
3827        let fake_language_server = fake_language_servers.next().await.unwrap();
3828        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3829            |params, _| async move {
3830                assert_eq!(
3831                    params
3832                        .text_document_position_params
3833                        .text_document
3834                        .uri
3835                        .as_str(),
3836                    "file:///root-1/main.rs"
3837                );
3838                assert_eq!(
3839                    params.text_document_position_params.position,
3840                    lsp::Position::new(0, 34)
3841                );
3842                Ok(Some(vec![
3843                    lsp::DocumentHighlight {
3844                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3845                        range: lsp::Range::new(
3846                            lsp::Position::new(0, 10),
3847                            lsp::Position::new(0, 16),
3848                        ),
3849                    },
3850                    lsp::DocumentHighlight {
3851                        kind: Some(lsp::DocumentHighlightKind::READ),
3852                        range: lsp::Range::new(
3853                            lsp::Position::new(0, 32),
3854                            lsp::Position::new(0, 38),
3855                        ),
3856                    },
3857                    lsp::DocumentHighlight {
3858                        kind: Some(lsp::DocumentHighlightKind::READ),
3859                        range: lsp::Range::new(
3860                            lsp::Position::new(0, 41),
3861                            lsp::Position::new(0, 47),
3862                        ),
3863                    },
3864                ]))
3865            },
3866        );
3867
3868        let highlights = project_b
3869            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3870            .await
3871            .unwrap();
3872        buffer_b.read_with(cx_b, |buffer, _| {
3873            let snapshot = buffer.snapshot();
3874
3875            let highlights = highlights
3876                .into_iter()
3877                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3878                .collect::<Vec<_>>();
3879            assert_eq!(
3880                highlights,
3881                &[
3882                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3883                    (lsp::DocumentHighlightKind::READ, 32..38),
3884                    (lsp::DocumentHighlightKind::READ, 41..47)
3885                ]
3886            )
3887        });
3888    }
3889
3890    #[gpui::test(iterations = 10)]
3891    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3892        cx_a.foreground().forbid_parking();
3893        let lang_registry = Arc::new(LanguageRegistry::test());
3894        let fs = FakeFs::new(cx_a.background());
3895        fs.insert_tree(
3896            "/code",
3897            json!({
3898                "crate-1": {
3899                    "one.rs": "const ONE: usize = 1;",
3900                },
3901                "crate-2": {
3902                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3903                },
3904                "private": {
3905                    "passwords.txt": "the-password",
3906                }
3907            }),
3908        )
3909        .await;
3910
3911        // Set up a fake language server.
3912        let mut language = Language::new(
3913            LanguageConfig {
3914                name: "Rust".into(),
3915                path_suffixes: vec!["rs".to_string()],
3916                ..Default::default()
3917            },
3918            Some(tree_sitter_rust::language()),
3919        );
3920        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3921        lang_registry.add(Arc::new(language));
3922
3923        // Connect to a server as 2 clients.
3924        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3925        let client_a = server.create_client(cx_a, "user_a").await;
3926        let client_b = server.create_client(cx_b, "user_b").await;
3927        server
3928            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3929            .await;
3930
3931        // Share a project as client A
3932        let project_a = cx_a.update(|cx| {
3933            Project::local(
3934                client_a.clone(),
3935                client_a.user_store.clone(),
3936                lang_registry.clone(),
3937                fs.clone(),
3938                cx,
3939            )
3940        });
3941        let (worktree_a, _) = project_a
3942            .update(cx_a, |p, cx| {
3943                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3944            })
3945            .await
3946            .unwrap();
3947        worktree_a
3948            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3949            .await;
3950        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3951        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3952        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3953
3954        // Join the worktree as client B.
3955        let project_b = Project::remote(
3956            project_id,
3957            client_b.clone(),
3958            client_b.user_store.clone(),
3959            lang_registry.clone(),
3960            fs.clone(),
3961            &mut cx_b.to_async(),
3962        )
3963        .await
3964        .unwrap();
3965
3966        // Cause the language server to start.
3967        let _buffer = cx_b
3968            .background()
3969            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3970            .await
3971            .unwrap();
3972
3973        let fake_language_server = fake_language_servers.next().await.unwrap();
3974        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3975            |_, _| async move {
3976                #[allow(deprecated)]
3977                Ok(Some(vec![lsp::SymbolInformation {
3978                    name: "TWO".into(),
3979                    location: lsp::Location {
3980                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3981                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3982                    },
3983                    kind: lsp::SymbolKind::CONSTANT,
3984                    tags: None,
3985                    container_name: None,
3986                    deprecated: None,
3987                }]))
3988            },
3989        );
3990
3991        // Request the definition of a symbol as the guest.
3992        let symbols = project_b
3993            .update(cx_b, |p, cx| p.symbols("two", cx))
3994            .await
3995            .unwrap();
3996        assert_eq!(symbols.len(), 1);
3997        assert_eq!(symbols[0].name, "TWO");
3998
3999        // Open one of the returned symbols.
4000        let buffer_b_2 = project_b
4001            .update(cx_b, |project, cx| {
4002                project.open_buffer_for_symbol(&symbols[0], cx)
4003            })
4004            .await
4005            .unwrap();
4006        buffer_b_2.read_with(cx_b, |buffer, _| {
4007            assert_eq!(
4008                buffer.file().unwrap().path().as_ref(),
4009                Path::new("../crate-2/two.rs")
4010            );
4011        });
4012
4013        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
4014        let mut fake_symbol = symbols[0].clone();
4015        fake_symbol.path = Path::new("/code/secrets").into();
4016        let error = project_b
4017            .update(cx_b, |project, cx| {
4018                project.open_buffer_for_symbol(&fake_symbol, cx)
4019            })
4020            .await
4021            .unwrap_err();
4022        assert!(error.to_string().contains("invalid symbol signature"));
4023    }
4024
4025    #[gpui::test(iterations = 10)]
4026    async fn test_open_buffer_while_getting_definition_pointing_to_it(
4027        cx_a: &mut TestAppContext,
4028        cx_b: &mut TestAppContext,
4029        mut rng: StdRng,
4030    ) {
4031        cx_a.foreground().forbid_parking();
4032        let lang_registry = Arc::new(LanguageRegistry::test());
4033        let fs = FakeFs::new(cx_a.background());
4034        fs.insert_tree(
4035            "/root",
4036            json!({
4037                "a.rs": "const ONE: usize = b::TWO;",
4038                "b.rs": "const TWO: usize = 2",
4039            }),
4040        )
4041        .await;
4042
4043        // Set up a fake language server.
4044        let mut language = Language::new(
4045            LanguageConfig {
4046                name: "Rust".into(),
4047                path_suffixes: vec!["rs".to_string()],
4048                ..Default::default()
4049            },
4050            Some(tree_sitter_rust::language()),
4051        );
4052        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4053        lang_registry.add(Arc::new(language));
4054
4055        // Connect to a server as 2 clients.
4056        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4057        let client_a = server.create_client(cx_a, "user_a").await;
4058        let client_b = server.create_client(cx_b, "user_b").await;
4059        server
4060            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4061            .await;
4062
4063        // Share a project as client A
4064        let project_a = cx_a.update(|cx| {
4065            Project::local(
4066                client_a.clone(),
4067                client_a.user_store.clone(),
4068                lang_registry.clone(),
4069                fs.clone(),
4070                cx,
4071            )
4072        });
4073
4074        let (worktree_a, _) = project_a
4075            .update(cx_a, |p, cx| {
4076                p.find_or_create_local_worktree("/root", true, cx)
4077            })
4078            .await
4079            .unwrap();
4080        worktree_a
4081            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4082            .await;
4083        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4084        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4085        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4086
4087        // Join the worktree as client B.
4088        let project_b = Project::remote(
4089            project_id,
4090            client_b.clone(),
4091            client_b.user_store.clone(),
4092            lang_registry.clone(),
4093            fs.clone(),
4094            &mut cx_b.to_async(),
4095        )
4096        .await
4097        .unwrap();
4098
4099        let buffer_b1 = cx_b
4100            .background()
4101            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4102            .await
4103            .unwrap();
4104
4105        let fake_language_server = fake_language_servers.next().await.unwrap();
4106        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4107            |_, _| async move {
4108                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4109                    lsp::Location::new(
4110                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
4111                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4112                    ),
4113                )))
4114            },
4115        );
4116
4117        let definitions;
4118        let buffer_b2;
4119        if rng.gen() {
4120            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4121            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4122        } else {
4123            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4124            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4125        }
4126
4127        let buffer_b2 = buffer_b2.await.unwrap();
4128        let definitions = definitions.await.unwrap();
4129        assert_eq!(definitions.len(), 1);
4130        assert_eq!(definitions[0].buffer, buffer_b2);
4131    }
4132
4133    #[gpui::test(iterations = 10)]
4134    async fn test_collaborating_with_code_actions(
4135        cx_a: &mut TestAppContext,
4136        cx_b: &mut TestAppContext,
4137    ) {
4138        cx_a.foreground().forbid_parking();
4139        let lang_registry = Arc::new(LanguageRegistry::test());
4140        let fs = FakeFs::new(cx_a.background());
4141        cx_b.update(|cx| editor::init(cx));
4142
4143        // Set up a fake language server.
4144        let mut language = Language::new(
4145            LanguageConfig {
4146                name: "Rust".into(),
4147                path_suffixes: vec!["rs".to_string()],
4148                ..Default::default()
4149            },
4150            Some(tree_sitter_rust::language()),
4151        );
4152        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4153        lang_registry.add(Arc::new(language));
4154
4155        // Connect to a server as 2 clients.
4156        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4157        let client_a = server.create_client(cx_a, "user_a").await;
4158        let client_b = server.create_client(cx_b, "user_b").await;
4159        server
4160            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4161            .await;
4162
4163        // Share a project as client A
4164        fs.insert_tree(
4165            "/a",
4166            json!({
4167                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4168                "other.rs": "pub fn foo() -> usize { 4 }",
4169            }),
4170        )
4171        .await;
4172        let project_a = cx_a.update(|cx| {
4173            Project::local(
4174                client_a.clone(),
4175                client_a.user_store.clone(),
4176                lang_registry.clone(),
4177                fs.clone(),
4178                cx,
4179            )
4180        });
4181        let (worktree_a, _) = project_a
4182            .update(cx_a, |p, cx| {
4183                p.find_or_create_local_worktree("/a", true, cx)
4184            })
4185            .await
4186            .unwrap();
4187        worktree_a
4188            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4189            .await;
4190        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4191        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4192        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4193
4194        // Join the worktree as client B.
4195        let project_b = Project::remote(
4196            project_id,
4197            client_b.clone(),
4198            client_b.user_store.clone(),
4199            lang_registry.clone(),
4200            fs.clone(),
4201            &mut cx_b.to_async(),
4202        )
4203        .await
4204        .unwrap();
4205        let mut params = cx_b.update(WorkspaceParams::test);
4206        params.languages = lang_registry.clone();
4207        params.client = client_b.client.clone();
4208        params.user_store = client_b.user_store.clone();
4209        params.project = project_b;
4210
4211        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4212        let editor_b = workspace_b
4213            .update(cx_b, |workspace, cx| {
4214                workspace.open_path((worktree_id, "main.rs"), true, cx)
4215            })
4216            .await
4217            .unwrap()
4218            .downcast::<Editor>()
4219            .unwrap();
4220
4221        let mut fake_language_server = fake_language_servers.next().await.unwrap();
4222        fake_language_server
4223            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4224                assert_eq!(
4225                    params.text_document.uri,
4226                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4227                );
4228                assert_eq!(params.range.start, lsp::Position::new(0, 0));
4229                assert_eq!(params.range.end, lsp::Position::new(0, 0));
4230                Ok(None)
4231            })
4232            .next()
4233            .await;
4234
4235        // Move cursor to a location that contains code actions.
4236        editor_b.update(cx_b, |editor, cx| {
4237            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4238            cx.focus(&editor_b);
4239        });
4240
4241        fake_language_server
4242            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4243                assert_eq!(
4244                    params.text_document.uri,
4245                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4246                );
4247                assert_eq!(params.range.start, lsp::Position::new(1, 31));
4248                assert_eq!(params.range.end, lsp::Position::new(1, 31));
4249
4250                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4251                    lsp::CodeAction {
4252                        title: "Inline into all callers".to_string(),
4253                        edit: Some(lsp::WorkspaceEdit {
4254                            changes: Some(
4255                                [
4256                                    (
4257                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
4258                                        vec![lsp::TextEdit::new(
4259                                            lsp::Range::new(
4260                                                lsp::Position::new(1, 22),
4261                                                lsp::Position::new(1, 34),
4262                                            ),
4263                                            "4".to_string(),
4264                                        )],
4265                                    ),
4266                                    (
4267                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
4268                                        vec![lsp::TextEdit::new(
4269                                            lsp::Range::new(
4270                                                lsp::Position::new(0, 0),
4271                                                lsp::Position::new(0, 27),
4272                                            ),
4273                                            "".to_string(),
4274                                        )],
4275                                    ),
4276                                ]
4277                                .into_iter()
4278                                .collect(),
4279                            ),
4280                            ..Default::default()
4281                        }),
4282                        data: Some(json!({
4283                            "codeActionParams": {
4284                                "range": {
4285                                    "start": {"line": 1, "column": 31},
4286                                    "end": {"line": 1, "column": 31},
4287                                }
4288                            }
4289                        })),
4290                        ..Default::default()
4291                    },
4292                )]))
4293            })
4294            .next()
4295            .await;
4296
4297        // Toggle code actions and wait for them to display.
4298        editor_b.update(cx_b, |editor, cx| {
4299            editor.toggle_code_actions(
4300                &ToggleCodeActions {
4301                    deployed_from_indicator: false,
4302                },
4303                cx,
4304            );
4305        });
4306        editor_b
4307            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4308            .await;
4309
4310        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4311
4312        // Confirming the code action will trigger a resolve request.
4313        let confirm_action = workspace_b
4314            .update(cx_b, |workspace, cx| {
4315                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4316            })
4317            .unwrap();
4318        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4319            |_, _| async move {
4320                Ok(lsp::CodeAction {
4321                    title: "Inline into all callers".to_string(),
4322                    edit: Some(lsp::WorkspaceEdit {
4323                        changes: Some(
4324                            [
4325                                (
4326                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4327                                    vec![lsp::TextEdit::new(
4328                                        lsp::Range::new(
4329                                            lsp::Position::new(1, 22),
4330                                            lsp::Position::new(1, 34),
4331                                        ),
4332                                        "4".to_string(),
4333                                    )],
4334                                ),
4335                                (
4336                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4337                                    vec![lsp::TextEdit::new(
4338                                        lsp::Range::new(
4339                                            lsp::Position::new(0, 0),
4340                                            lsp::Position::new(0, 27),
4341                                        ),
4342                                        "".to_string(),
4343                                    )],
4344                                ),
4345                            ]
4346                            .into_iter()
4347                            .collect(),
4348                        ),
4349                        ..Default::default()
4350                    }),
4351                    ..Default::default()
4352                })
4353            },
4354        );
4355
4356        // After the action is confirmed, an editor containing both modified files is opened.
4357        confirm_action.await.unwrap();
4358        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4359            workspace
4360                .active_item(cx)
4361                .unwrap()
4362                .downcast::<Editor>()
4363                .unwrap()
4364        });
4365        code_action_editor.update(cx_b, |editor, cx| {
4366            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4367            editor.undo(&Undo, cx);
4368            assert_eq!(
4369                editor.text(cx),
4370                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4371            );
4372            editor.redo(&Redo, cx);
4373            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4374        });
4375    }
4376
4377    #[gpui::test(iterations = 10)]
4378    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4379        cx_a.foreground().forbid_parking();
4380        let lang_registry = Arc::new(LanguageRegistry::test());
4381        let fs = FakeFs::new(cx_a.background());
4382        cx_b.update(|cx| editor::init(cx));
4383
4384        // Set up a fake language server.
4385        let mut language = Language::new(
4386            LanguageConfig {
4387                name: "Rust".into(),
4388                path_suffixes: vec!["rs".to_string()],
4389                ..Default::default()
4390            },
4391            Some(tree_sitter_rust::language()),
4392        );
4393        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4394            capabilities: lsp::ServerCapabilities {
4395                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4396                    prepare_provider: Some(true),
4397                    work_done_progress_options: Default::default(),
4398                })),
4399                ..Default::default()
4400            },
4401            ..Default::default()
4402        });
4403        lang_registry.add(Arc::new(language));
4404
4405        // Connect to a server as 2 clients.
4406        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4407        let client_a = server.create_client(cx_a, "user_a").await;
4408        let client_b = server.create_client(cx_b, "user_b").await;
4409        server
4410            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4411            .await;
4412
4413        // Share a project as client A
4414        fs.insert_tree(
4415            "/dir",
4416            json!({
4417                "one.rs": "const ONE: usize = 1;",
4418                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4419            }),
4420        )
4421        .await;
4422        let project_a = cx_a.update(|cx| {
4423            Project::local(
4424                client_a.clone(),
4425                client_a.user_store.clone(),
4426                lang_registry.clone(),
4427                fs.clone(),
4428                cx,
4429            )
4430        });
4431        let (worktree_a, _) = project_a
4432            .update(cx_a, |p, cx| {
4433                p.find_or_create_local_worktree("/dir", true, cx)
4434            })
4435            .await
4436            .unwrap();
4437        worktree_a
4438            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4439            .await;
4440        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4441        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4442        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4443
4444        // Join the worktree as client B.
4445        let project_b = Project::remote(
4446            project_id,
4447            client_b.clone(),
4448            client_b.user_store.clone(),
4449            lang_registry.clone(),
4450            fs.clone(),
4451            &mut cx_b.to_async(),
4452        )
4453        .await
4454        .unwrap();
4455        let mut params = cx_b.update(WorkspaceParams::test);
4456        params.languages = lang_registry.clone();
4457        params.client = client_b.client.clone();
4458        params.user_store = client_b.user_store.clone();
4459        params.project = project_b;
4460
4461        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4462        let editor_b = workspace_b
4463            .update(cx_b, |workspace, cx| {
4464                workspace.open_path((worktree_id, "one.rs"), true, cx)
4465            })
4466            .await
4467            .unwrap()
4468            .downcast::<Editor>()
4469            .unwrap();
4470        let fake_language_server = fake_language_servers.next().await.unwrap();
4471
4472        // Move cursor to a location that can be renamed.
4473        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4474            editor.select_ranges([7..7], None, cx);
4475            editor.rename(&Rename, cx).unwrap()
4476        });
4477
4478        fake_language_server
4479            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4480                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4481                assert_eq!(params.position, lsp::Position::new(0, 7));
4482                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4483                    lsp::Position::new(0, 6),
4484                    lsp::Position::new(0, 9),
4485                ))))
4486            })
4487            .next()
4488            .await
4489            .unwrap();
4490        prepare_rename.await.unwrap();
4491        editor_b.update(cx_b, |editor, cx| {
4492            let rename = editor.pending_rename().unwrap();
4493            let buffer = editor.buffer().read(cx).snapshot(cx);
4494            assert_eq!(
4495                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4496                6..9
4497            );
4498            rename.editor.update(cx, |rename_editor, cx| {
4499                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4500                    rename_buffer.edit([(0..3, "THREE")], cx);
4501                });
4502            });
4503        });
4504
4505        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4506            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4507        });
4508        fake_language_server
4509            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4510                assert_eq!(
4511                    params.text_document_position.text_document.uri.as_str(),
4512                    "file:///dir/one.rs"
4513                );
4514                assert_eq!(
4515                    params.text_document_position.position,
4516                    lsp::Position::new(0, 6)
4517                );
4518                assert_eq!(params.new_name, "THREE");
4519                Ok(Some(lsp::WorkspaceEdit {
4520                    changes: Some(
4521                        [
4522                            (
4523                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4524                                vec![lsp::TextEdit::new(
4525                                    lsp::Range::new(
4526                                        lsp::Position::new(0, 6),
4527                                        lsp::Position::new(0, 9),
4528                                    ),
4529                                    "THREE".to_string(),
4530                                )],
4531                            ),
4532                            (
4533                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4534                                vec![
4535                                    lsp::TextEdit::new(
4536                                        lsp::Range::new(
4537                                            lsp::Position::new(0, 24),
4538                                            lsp::Position::new(0, 27),
4539                                        ),
4540                                        "THREE".to_string(),
4541                                    ),
4542                                    lsp::TextEdit::new(
4543                                        lsp::Range::new(
4544                                            lsp::Position::new(0, 35),
4545                                            lsp::Position::new(0, 38),
4546                                        ),
4547                                        "THREE".to_string(),
4548                                    ),
4549                                ],
4550                            ),
4551                        ]
4552                        .into_iter()
4553                        .collect(),
4554                    ),
4555                    ..Default::default()
4556                }))
4557            })
4558            .next()
4559            .await
4560            .unwrap();
4561        confirm_rename.await.unwrap();
4562
4563        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4564            workspace
4565                .active_item(cx)
4566                .unwrap()
4567                .downcast::<Editor>()
4568                .unwrap()
4569        });
4570        rename_editor.update(cx_b, |editor, cx| {
4571            assert_eq!(
4572                editor.text(cx),
4573                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4574            );
4575            editor.undo(&Undo, cx);
4576            assert_eq!(
4577                editor.text(cx),
4578                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4579            );
4580            editor.redo(&Redo, cx);
4581            assert_eq!(
4582                editor.text(cx),
4583                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4584            );
4585        });
4586
4587        // Ensure temporary rename edits cannot be undone/redone.
4588        editor_b.update(cx_b, |editor, cx| {
4589            editor.undo(&Undo, cx);
4590            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4591            editor.undo(&Undo, cx);
4592            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4593            editor.redo(&Redo, cx);
4594            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4595        })
4596    }
4597
4598    #[gpui::test(iterations = 10)]
4599    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4600        cx_a.foreground().forbid_parking();
4601
4602        // Connect to a server as 2 clients.
4603        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4604        let client_a = server.create_client(cx_a, "user_a").await;
4605        let client_b = server.create_client(cx_b, "user_b").await;
4606
4607        // Create an org that includes these 2 users.
4608        let db = &server.app_state.db;
4609        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4610        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4611            .await
4612            .unwrap();
4613        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4614            .await
4615            .unwrap();
4616
4617        // Create a channel that includes all the users.
4618        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4619        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4620            .await
4621            .unwrap();
4622        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4623            .await
4624            .unwrap();
4625        db.create_channel_message(
4626            channel_id,
4627            client_b.current_user_id(&cx_b),
4628            "hello A, it's B.",
4629            OffsetDateTime::now_utc(),
4630            1,
4631        )
4632        .await
4633        .unwrap();
4634
4635        let channels_a = cx_a
4636            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4637        channels_a
4638            .condition(cx_a, |list, _| list.available_channels().is_some())
4639            .await;
4640        channels_a.read_with(cx_a, |list, _| {
4641            assert_eq!(
4642                list.available_channels().unwrap(),
4643                &[ChannelDetails {
4644                    id: channel_id.to_proto(),
4645                    name: "test-channel".to_string()
4646                }]
4647            )
4648        });
4649        let channel_a = channels_a.update(cx_a, |this, cx| {
4650            this.get_channel(channel_id.to_proto(), cx).unwrap()
4651        });
4652        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4653        channel_a
4654            .condition(&cx_a, |channel, _| {
4655                channel_messages(channel)
4656                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4657            })
4658            .await;
4659
4660        let channels_b = cx_b
4661            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4662        channels_b
4663            .condition(cx_b, |list, _| list.available_channels().is_some())
4664            .await;
4665        channels_b.read_with(cx_b, |list, _| {
4666            assert_eq!(
4667                list.available_channels().unwrap(),
4668                &[ChannelDetails {
4669                    id: channel_id.to_proto(),
4670                    name: "test-channel".to_string()
4671                }]
4672            )
4673        });
4674
4675        let channel_b = channels_b.update(cx_b, |this, cx| {
4676            this.get_channel(channel_id.to_proto(), cx).unwrap()
4677        });
4678        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4679        channel_b
4680            .condition(&cx_b, |channel, _| {
4681                channel_messages(channel)
4682                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4683            })
4684            .await;
4685
4686        channel_a
4687            .update(cx_a, |channel, cx| {
4688                channel
4689                    .send_message("oh, hi B.".to_string(), cx)
4690                    .unwrap()
4691                    .detach();
4692                let task = channel.send_message("sup".to_string(), cx).unwrap();
4693                assert_eq!(
4694                    channel_messages(channel),
4695                    &[
4696                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4697                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4698                        ("user_a".to_string(), "sup".to_string(), true)
4699                    ]
4700                );
4701                task
4702            })
4703            .await
4704            .unwrap();
4705
4706        channel_b
4707            .condition(&cx_b, |channel, _| {
4708                channel_messages(channel)
4709                    == [
4710                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4711                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4712                        ("user_a".to_string(), "sup".to_string(), false),
4713                    ]
4714            })
4715            .await;
4716
4717        assert_eq!(
4718            server
4719                .state()
4720                .await
4721                .channel(channel_id)
4722                .unwrap()
4723                .connection_ids
4724                .len(),
4725            2
4726        );
4727        cx_b.update(|_| drop(channel_b));
4728        server
4729            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4730            .await;
4731
4732        cx_a.update(|_| drop(channel_a));
4733        server
4734            .condition(|state| state.channel(channel_id).is_none())
4735            .await;
4736    }
4737
4738    #[gpui::test(iterations = 10)]
4739    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4740        cx_a.foreground().forbid_parking();
4741
4742        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4743        let client_a = server.create_client(cx_a, "user_a").await;
4744
4745        let db = &server.app_state.db;
4746        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4747        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4748        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4749            .await
4750            .unwrap();
4751        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4752            .await
4753            .unwrap();
4754
4755        let channels_a = cx_a
4756            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4757        channels_a
4758            .condition(cx_a, |list, _| list.available_channels().is_some())
4759            .await;
4760        let channel_a = channels_a.update(cx_a, |this, cx| {
4761            this.get_channel(channel_id.to_proto(), cx).unwrap()
4762        });
4763
4764        // Messages aren't allowed to be too long.
4765        channel_a
4766            .update(cx_a, |channel, cx| {
4767                let long_body = "this is long.\n".repeat(1024);
4768                channel.send_message(long_body, cx).unwrap()
4769            })
4770            .await
4771            .unwrap_err();
4772
4773        // Messages aren't allowed to be blank.
4774        channel_a.update(cx_a, |channel, cx| {
4775            channel.send_message(String::new(), cx).unwrap_err()
4776        });
4777
4778        // Leading and trailing whitespace are trimmed.
4779        channel_a
4780            .update(cx_a, |channel, cx| {
4781                channel
4782                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4783                    .unwrap()
4784            })
4785            .await
4786            .unwrap();
4787        assert_eq!(
4788            db.get_channel_messages(channel_id, 10, None)
4789                .await
4790                .unwrap()
4791                .iter()
4792                .map(|m| &m.body)
4793                .collect::<Vec<_>>(),
4794            &["surrounded by whitespace"]
4795        );
4796    }
4797
4798    #[gpui::test(iterations = 10)]
4799    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4800        cx_a.foreground().forbid_parking();
4801
4802        // Connect to a server as 2 clients.
4803        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4804        let client_a = server.create_client(cx_a, "user_a").await;
4805        let client_b = server.create_client(cx_b, "user_b").await;
4806        let mut status_b = client_b.status();
4807
4808        // Create an org that includes these 2 users.
4809        let db = &server.app_state.db;
4810        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4811        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4812            .await
4813            .unwrap();
4814        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4815            .await
4816            .unwrap();
4817
4818        // Create a channel that includes all the users.
4819        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4820        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4821            .await
4822            .unwrap();
4823        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4824            .await
4825            .unwrap();
4826        db.create_channel_message(
4827            channel_id,
4828            client_b.current_user_id(&cx_b),
4829            "hello A, it's B.",
4830            OffsetDateTime::now_utc(),
4831            2,
4832        )
4833        .await
4834        .unwrap();
4835
4836        let channels_a = cx_a
4837            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4838        channels_a
4839            .condition(cx_a, |list, _| list.available_channels().is_some())
4840            .await;
4841
4842        channels_a.read_with(cx_a, |list, _| {
4843            assert_eq!(
4844                list.available_channels().unwrap(),
4845                &[ChannelDetails {
4846                    id: channel_id.to_proto(),
4847                    name: "test-channel".to_string()
4848                }]
4849            )
4850        });
4851        let channel_a = channels_a.update(cx_a, |this, cx| {
4852            this.get_channel(channel_id.to_proto(), cx).unwrap()
4853        });
4854        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4855        channel_a
4856            .condition(&cx_a, |channel, _| {
4857                channel_messages(channel)
4858                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4859            })
4860            .await;
4861
4862        let channels_b = cx_b
4863            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4864        channels_b
4865            .condition(cx_b, |list, _| list.available_channels().is_some())
4866            .await;
4867        channels_b.read_with(cx_b, |list, _| {
4868            assert_eq!(
4869                list.available_channels().unwrap(),
4870                &[ChannelDetails {
4871                    id: channel_id.to_proto(),
4872                    name: "test-channel".to_string()
4873                }]
4874            )
4875        });
4876
4877        let channel_b = channels_b.update(cx_b, |this, cx| {
4878            this.get_channel(channel_id.to_proto(), cx).unwrap()
4879        });
4880        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4881        channel_b
4882            .condition(&cx_b, |channel, _| {
4883                channel_messages(channel)
4884                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4885            })
4886            .await;
4887
4888        // Disconnect client B, ensuring we can still access its cached channel data.
4889        server.forbid_connections();
4890        server.disconnect_client(client_b.current_user_id(&cx_b));
4891        cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4892        while !matches!(
4893            status_b.next().await,
4894            Some(client::Status::ReconnectionError { .. })
4895        ) {}
4896
4897        channels_b.read_with(cx_b, |channels, _| {
4898            assert_eq!(
4899                channels.available_channels().unwrap(),
4900                [ChannelDetails {
4901                    id: channel_id.to_proto(),
4902                    name: "test-channel".to_string()
4903                }]
4904            )
4905        });
4906        channel_b.read_with(cx_b, |channel, _| {
4907            assert_eq!(
4908                channel_messages(channel),
4909                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4910            )
4911        });
4912
4913        // Send a message from client B while it is disconnected.
4914        channel_b
4915            .update(cx_b, |channel, cx| {
4916                let task = channel
4917                    .send_message("can you see this?".to_string(), cx)
4918                    .unwrap();
4919                assert_eq!(
4920                    channel_messages(channel),
4921                    &[
4922                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4923                        ("user_b".to_string(), "can you see this?".to_string(), true)
4924                    ]
4925                );
4926                task
4927            })
4928            .await
4929            .unwrap_err();
4930
4931        // Send a message from client A while B is disconnected.
4932        channel_a
4933            .update(cx_a, |channel, cx| {
4934                channel
4935                    .send_message("oh, hi B.".to_string(), cx)
4936                    .unwrap()
4937                    .detach();
4938                let task = channel.send_message("sup".to_string(), cx).unwrap();
4939                assert_eq!(
4940                    channel_messages(channel),
4941                    &[
4942                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4943                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4944                        ("user_a".to_string(), "sup".to_string(), true)
4945                    ]
4946                );
4947                task
4948            })
4949            .await
4950            .unwrap();
4951
4952        // Give client B a chance to reconnect.
4953        server.allow_connections();
4954        cx_b.foreground().advance_clock(Duration::from_secs(10));
4955
4956        // Verify that B sees the new messages upon reconnection, as well as the message client B
4957        // sent while offline.
4958        channel_b
4959            .condition(&cx_b, |channel, _| {
4960                channel_messages(channel)
4961                    == [
4962                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4963                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4964                        ("user_a".to_string(), "sup".to_string(), false),
4965                        ("user_b".to_string(), "can you see this?".to_string(), false),
4966                    ]
4967            })
4968            .await;
4969
4970        // Ensure client A and B can communicate normally after reconnection.
4971        channel_a
4972            .update(cx_a, |channel, cx| {
4973                channel.send_message("you online?".to_string(), cx).unwrap()
4974            })
4975            .await
4976            .unwrap();
4977        channel_b
4978            .condition(&cx_b, |channel, _| {
4979                channel_messages(channel)
4980                    == [
4981                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4982                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4983                        ("user_a".to_string(), "sup".to_string(), false),
4984                        ("user_b".to_string(), "can you see this?".to_string(), false),
4985                        ("user_a".to_string(), "you online?".to_string(), false),
4986                    ]
4987            })
4988            .await;
4989
4990        channel_b
4991            .update(cx_b, |channel, cx| {
4992                channel.send_message("yep".to_string(), cx).unwrap()
4993            })
4994            .await
4995            .unwrap();
4996        channel_a
4997            .condition(&cx_a, |channel, _| {
4998                channel_messages(channel)
4999                    == [
5000                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5001                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
5002                        ("user_a".to_string(), "sup".to_string(), false),
5003                        ("user_b".to_string(), "can you see this?".to_string(), false),
5004                        ("user_a".to_string(), "you online?".to_string(), false),
5005                        ("user_b".to_string(), "yep".to_string(), false),
5006                    ]
5007            })
5008            .await;
5009    }
5010
5011    #[gpui::test(iterations = 10)]
5012    async fn test_contacts(
5013        deterministic: Arc<Deterministic>,
5014        cx_a: &mut TestAppContext,
5015        cx_b: &mut TestAppContext,
5016        cx_c: &mut TestAppContext,
5017    ) {
5018        cx_a.foreground().forbid_parking();
5019        let lang_registry = Arc::new(LanguageRegistry::test());
5020        let fs = FakeFs::new(cx_a.background());
5021
5022        // Connect to a server as 3 clients.
5023        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5024        let client_a = server.create_client(cx_a, "user_a").await;
5025        let client_b = server.create_client(cx_b, "user_b").await;
5026        let client_c = server.create_client(cx_c, "user_c").await;
5027        server
5028            .make_contacts(vec![
5029                (&client_a, cx_a),
5030                (&client_b, cx_b),
5031                (&client_c, cx_c),
5032            ])
5033            .await;
5034
5035        deterministic.run_until_parked();
5036        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5037            client.user_store.read_with(*cx, |store, _| {
5038                assert_eq!(
5039                    contacts(store),
5040                    [
5041                        ("user_a", true, vec![]),
5042                        ("user_b", true, vec![]),
5043                        ("user_c", true, vec![])
5044                    ]
5045                )
5046            });
5047        }
5048
5049        // Share a worktree as client A.
5050        fs.create_dir(Path::new("/a")).await.unwrap();
5051
5052        let project_a = cx_a.update(|cx| {
5053            Project::local(
5054                client_a.clone(),
5055                client_a.user_store.clone(),
5056                lang_registry.clone(),
5057                fs.clone(),
5058                cx,
5059            )
5060        });
5061        let (worktree_a, _) = project_a
5062            .update(cx_a, |p, cx| {
5063                p.find_or_create_local_worktree("/a", true, cx)
5064            })
5065            .await
5066            .unwrap();
5067        worktree_a
5068            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
5069            .await;
5070
5071        deterministic.run_until_parked();
5072        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5073            client.user_store.read_with(*cx, |store, _| {
5074                assert_eq!(
5075                    contacts(store),
5076                    [
5077                        ("user_a", true, vec![("a", false, vec![])]),
5078                        ("user_b", true, vec![]),
5079                        ("user_c", true, vec![])
5080                    ]
5081                )
5082            });
5083        }
5084
5085        let project_id = project_a
5086            .update(cx_a, |project, _| project.next_remote_id())
5087            .await;
5088        project_a
5089            .update(cx_a, |project, cx| project.share(cx))
5090            .await
5091            .unwrap();
5092
5093        deterministic.run_until_parked();
5094        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5095            client.user_store.read_with(*cx, |store, _| {
5096                assert_eq!(
5097                    contacts(store),
5098                    [
5099                        ("user_a", true, vec![("a", true, vec![])]),
5100                        ("user_b", true, vec![]),
5101                        ("user_c", true, vec![])
5102                    ]
5103                )
5104            });
5105        }
5106
5107        let _project_b = Project::remote(
5108            project_id,
5109            client_b.clone(),
5110            client_b.user_store.clone(),
5111            lang_registry.clone(),
5112            fs.clone(),
5113            &mut cx_b.to_async(),
5114        )
5115        .await
5116        .unwrap();
5117
5118        deterministic.run_until_parked();
5119        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5120            client.user_store.read_with(*cx, |store, _| {
5121                assert_eq!(
5122                    contacts(store),
5123                    [
5124                        ("user_a", true, vec![("a", true, vec!["user_b"])]),
5125                        ("user_b", true, vec![]),
5126                        ("user_c", true, vec![])
5127                    ]
5128                )
5129            });
5130        }
5131
5132        project_a
5133            .condition(&cx_a, |project, _| {
5134                project.collaborators().contains_key(&client_b.peer_id)
5135            })
5136            .await;
5137
5138        cx_a.update(move |_| drop(project_a));
5139        deterministic.run_until_parked();
5140        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5141            client.user_store.read_with(*cx, |store, _| {
5142                assert_eq!(
5143                    contacts(store),
5144                    [
5145                        ("user_a", true, vec![]),
5146                        ("user_b", true, vec![]),
5147                        ("user_c", true, vec![])
5148                    ]
5149                )
5150            });
5151        }
5152
5153        server.disconnect_client(client_c.current_user_id(cx_c));
5154        server.forbid_connections();
5155        deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5156        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5157            client.user_store.read_with(*cx, |store, _| {
5158                assert_eq!(
5159                    contacts(store),
5160                    [
5161                        ("user_a", true, vec![]),
5162                        ("user_b", true, vec![]),
5163                        ("user_c", false, vec![])
5164                    ]
5165                )
5166            });
5167        }
5168        client_c
5169            .user_store
5170            .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5171
5172        server.allow_connections();
5173        client_c
5174            .authenticate_and_connect(false, &cx_c.to_async())
5175            .await
5176            .unwrap();
5177
5178        deterministic.run_until_parked();
5179        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5180            client.user_store.read_with(*cx, |store, _| {
5181                assert_eq!(
5182                    contacts(store),
5183                    [
5184                        ("user_a", true, vec![]),
5185                        ("user_b", true, vec![]),
5186                        ("user_c", true, vec![])
5187                    ]
5188                )
5189            });
5190        }
5191
5192        fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, bool, Vec<&str>)>)> {
5193            user_store
5194                .contacts()
5195                .iter()
5196                .map(|contact| {
5197                    let worktrees = contact
5198                        .projects
5199                        .iter()
5200                        .map(|p| {
5201                            (
5202                                p.worktree_root_names[0].as_str(),
5203                                p.is_shared,
5204                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5205                            )
5206                        })
5207                        .collect();
5208                    (
5209                        contact.user.github_login.as_str(),
5210                        contact.online,
5211                        worktrees,
5212                    )
5213                })
5214                .collect()
5215        }
5216    }
5217
5218    #[gpui::test(iterations = 10)]
5219    async fn test_contact_requests(
5220        executor: Arc<Deterministic>,
5221        cx_a: &mut TestAppContext,
5222        cx_a2: &mut TestAppContext,
5223        cx_b: &mut TestAppContext,
5224        cx_b2: &mut TestAppContext,
5225        cx_c: &mut TestAppContext,
5226        cx_c2: &mut TestAppContext,
5227    ) {
5228        cx_a.foreground().forbid_parking();
5229
5230        // Connect to a server as 3 clients.
5231        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5232        let client_a = server.create_client(cx_a, "user_a").await;
5233        let client_a2 = server.create_client(cx_a2, "user_a").await;
5234        let client_b = server.create_client(cx_b, "user_b").await;
5235        let client_b2 = server.create_client(cx_b2, "user_b").await;
5236        let client_c = server.create_client(cx_c, "user_c").await;
5237        let client_c2 = server.create_client(cx_c2, "user_c").await;
5238
5239        assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5240        assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5241        assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5242
5243        // User A and User C request that user B become their contact.
5244        client_a
5245            .user_store
5246            .update(cx_a, |store, cx| {
5247                store.request_contact(client_b.user_id().unwrap(), cx)
5248            })
5249            .await
5250            .unwrap();
5251        client_c
5252            .user_store
5253            .update(cx_c, |store, cx| {
5254                store.request_contact(client_b.user_id().unwrap(), cx)
5255            })
5256            .await
5257            .unwrap();
5258        executor.run_until_parked();
5259
5260        // All users see the pending request appear in all their clients.
5261        assert_eq!(
5262            client_a.summarize_contacts(&cx_a).outgoing_requests,
5263            &["user_b"]
5264        );
5265        assert_eq!(
5266            client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5267            &["user_b"]
5268        );
5269        assert_eq!(
5270            client_b.summarize_contacts(&cx_b).incoming_requests,
5271            &["user_a", "user_c"]
5272        );
5273        assert_eq!(
5274            client_b2.summarize_contacts(&cx_b2).incoming_requests,
5275            &["user_a", "user_c"]
5276        );
5277        assert_eq!(
5278            client_c.summarize_contacts(&cx_c).outgoing_requests,
5279            &["user_b"]
5280        );
5281        assert_eq!(
5282            client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5283            &["user_b"]
5284        );
5285
5286        // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5287        disconnect_and_reconnect(&client_a, cx_a).await;
5288        disconnect_and_reconnect(&client_b, cx_b).await;
5289        disconnect_and_reconnect(&client_c, cx_c).await;
5290        executor.run_until_parked();
5291        assert_eq!(
5292            client_a.summarize_contacts(&cx_a).outgoing_requests,
5293            &["user_b"]
5294        );
5295        assert_eq!(
5296            client_b.summarize_contacts(&cx_b).incoming_requests,
5297            &["user_a", "user_c"]
5298        );
5299        assert_eq!(
5300            client_c.summarize_contacts(&cx_c).outgoing_requests,
5301            &["user_b"]
5302        );
5303
5304        // User B accepts the request from user A.
5305        client_b
5306            .user_store
5307            .update(cx_b, |store, cx| {
5308                store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5309            })
5310            .await
5311            .unwrap();
5312
5313        executor.run_until_parked();
5314
5315        // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5316        let contacts_b = client_b.summarize_contacts(&cx_b);
5317        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5318        assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5319        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5320        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5321        assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5322
5323        // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5324        let contacts_a = client_a.summarize_contacts(&cx_a);
5325        assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5326        assert!(contacts_a.outgoing_requests.is_empty());
5327        let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5328        assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5329        assert!(contacts_a2.outgoing_requests.is_empty());
5330
5331        // Contacts are present upon connecting (tested here via disconnect/reconnect)
5332        disconnect_and_reconnect(&client_a, cx_a).await;
5333        disconnect_and_reconnect(&client_b, cx_b).await;
5334        disconnect_and_reconnect(&client_c, cx_c).await;
5335        executor.run_until_parked();
5336        assert_eq!(
5337            client_a.summarize_contacts(&cx_a).current,
5338            &["user_a", "user_b"]
5339        );
5340        assert_eq!(
5341            client_b.summarize_contacts(&cx_b).current,
5342            &["user_a", "user_b"]
5343        );
5344        assert_eq!(
5345            client_b.summarize_contacts(&cx_b).incoming_requests,
5346            &["user_c"]
5347        );
5348        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5349        assert_eq!(
5350            client_c.summarize_contacts(&cx_c).outgoing_requests,
5351            &["user_b"]
5352        );
5353
5354        // User B rejects the request from user C.
5355        client_b
5356            .user_store
5357            .update(cx_b, |store, cx| {
5358                store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5359            })
5360            .await
5361            .unwrap();
5362
5363        executor.run_until_parked();
5364
5365        // User B doesn't see user C as their contact, and the incoming request from them is removed.
5366        let contacts_b = client_b.summarize_contacts(&cx_b);
5367        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5368        assert!(contacts_b.incoming_requests.is_empty());
5369        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5370        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5371        assert!(contacts_b2.incoming_requests.is_empty());
5372
5373        // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5374        let contacts_c = client_c.summarize_contacts(&cx_c);
5375        assert_eq!(contacts_c.current, &["user_c"]);
5376        assert!(contacts_c.outgoing_requests.is_empty());
5377        let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5378        assert_eq!(contacts_c2.current, &["user_c"]);
5379        assert!(contacts_c2.outgoing_requests.is_empty());
5380
5381        // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5382        disconnect_and_reconnect(&client_a, cx_a).await;
5383        disconnect_and_reconnect(&client_b, cx_b).await;
5384        disconnect_and_reconnect(&client_c, cx_c).await;
5385        executor.run_until_parked();
5386        assert_eq!(
5387            client_a.summarize_contacts(&cx_a).current,
5388            &["user_a", "user_b"]
5389        );
5390        assert_eq!(
5391            client_b.summarize_contacts(&cx_b).current,
5392            &["user_a", "user_b"]
5393        );
5394        assert!(client_b
5395            .summarize_contacts(&cx_b)
5396            .incoming_requests
5397            .is_empty());
5398        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5399        assert!(client_c
5400            .summarize_contacts(&cx_c)
5401            .outgoing_requests
5402            .is_empty());
5403
5404        async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5405            client.disconnect(&cx.to_async()).unwrap();
5406            client.clear_contacts(cx).await;
5407            client
5408                .authenticate_and_connect(false, &cx.to_async())
5409                .await
5410                .unwrap();
5411        }
5412    }
5413
5414    #[gpui::test(iterations = 10)]
5415    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5416        cx_a.foreground().forbid_parking();
5417        let fs = FakeFs::new(cx_a.background());
5418
5419        // 2 clients connect to a server.
5420        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5421        let mut client_a = server.create_client(cx_a, "user_a").await;
5422        let mut client_b = server.create_client(cx_b, "user_b").await;
5423        server
5424            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5425            .await;
5426        cx_a.update(editor::init);
5427        cx_b.update(editor::init);
5428
5429        // Client A shares a project.
5430        fs.insert_tree(
5431            "/a",
5432            json!({
5433                "1.txt": "one",
5434                "2.txt": "two",
5435                "3.txt": "three",
5436            }),
5437        )
5438        .await;
5439        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5440        project_a
5441            .update(cx_a, |project, cx| project.share(cx))
5442            .await
5443            .unwrap();
5444
5445        // Client B joins the project.
5446        let project_b = client_b
5447            .build_remote_project(
5448                project_a
5449                    .read_with(cx_a, |project, _| project.remote_id())
5450                    .unwrap(),
5451                cx_b,
5452            )
5453            .await;
5454
5455        // Client A opens some editors.
5456        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5457        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5458        let editor_a1 = workspace_a
5459            .update(cx_a, |workspace, cx| {
5460                workspace.open_path((worktree_id, "1.txt"), true, cx)
5461            })
5462            .await
5463            .unwrap()
5464            .downcast::<Editor>()
5465            .unwrap();
5466        let editor_a2 = workspace_a
5467            .update(cx_a, |workspace, cx| {
5468                workspace.open_path((worktree_id, "2.txt"), true, cx)
5469            })
5470            .await
5471            .unwrap()
5472            .downcast::<Editor>()
5473            .unwrap();
5474
5475        // Client B opens an editor.
5476        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5477        let editor_b1 = workspace_b
5478            .update(cx_b, |workspace, cx| {
5479                workspace.open_path((worktree_id, "1.txt"), true, cx)
5480            })
5481            .await
5482            .unwrap()
5483            .downcast::<Editor>()
5484            .unwrap();
5485
5486        let client_a_id = project_b.read_with(cx_b, |project, _| {
5487            project.collaborators().values().next().unwrap().peer_id
5488        });
5489        let client_b_id = project_a.read_with(cx_a, |project, _| {
5490            project.collaborators().values().next().unwrap().peer_id
5491        });
5492
5493        // When client B starts following client A, all visible view states are replicated to client B.
5494        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5495        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5496        workspace_b
5497            .update(cx_b, |workspace, cx| {
5498                workspace
5499                    .toggle_follow(&ToggleFollow(client_a_id), cx)
5500                    .unwrap()
5501            })
5502            .await
5503            .unwrap();
5504        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5505            workspace
5506                .active_item(cx)
5507                .unwrap()
5508                .downcast::<Editor>()
5509                .unwrap()
5510        });
5511        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5512        assert_eq!(
5513            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5514            Some((worktree_id, "2.txt").into())
5515        );
5516        assert_eq!(
5517            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5518            vec![2..3]
5519        );
5520        assert_eq!(
5521            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5522            vec![0..1]
5523        );
5524
5525        // When client A activates a different editor, client B does so as well.
5526        workspace_a.update(cx_a, |workspace, cx| {
5527            workspace.activate_item(&editor_a1, cx)
5528        });
5529        workspace_b
5530            .condition(cx_b, |workspace, cx| {
5531                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5532            })
5533            .await;
5534
5535        // When client A navigates back and forth, client B does so as well.
5536        workspace_a
5537            .update(cx_a, |workspace, cx| {
5538                workspace::Pane::go_back(workspace, None, cx)
5539            })
5540            .await;
5541        workspace_b
5542            .condition(cx_b, |workspace, cx| {
5543                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5544            })
5545            .await;
5546
5547        workspace_a
5548            .update(cx_a, |workspace, cx| {
5549                workspace::Pane::go_forward(workspace, None, cx)
5550            })
5551            .await;
5552        workspace_b
5553            .condition(cx_b, |workspace, cx| {
5554                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5555            })
5556            .await;
5557
5558        // Changes to client A's editor are reflected on client B.
5559        editor_a1.update(cx_a, |editor, cx| {
5560            editor.select_ranges([1..1, 2..2], None, cx);
5561        });
5562        editor_b1
5563            .condition(cx_b, |editor, cx| {
5564                editor.selected_ranges(cx) == vec![1..1, 2..2]
5565            })
5566            .await;
5567
5568        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5569        editor_b1
5570            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5571            .await;
5572
5573        editor_a1.update(cx_a, |editor, cx| {
5574            editor.select_ranges([3..3], None, cx);
5575            editor.set_scroll_position(vec2f(0., 100.), cx);
5576        });
5577        editor_b1
5578            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5579            .await;
5580
5581        // After unfollowing, client B stops receiving updates from client A.
5582        workspace_b.update(cx_b, |workspace, cx| {
5583            workspace.unfollow(&workspace.active_pane().clone(), cx)
5584        });
5585        workspace_a.update(cx_a, |workspace, cx| {
5586            workspace.activate_item(&editor_a2, cx)
5587        });
5588        cx_a.foreground().run_until_parked();
5589        assert_eq!(
5590            workspace_b.read_with(cx_b, |workspace, cx| workspace
5591                .active_item(cx)
5592                .unwrap()
5593                .id()),
5594            editor_b1.id()
5595        );
5596
5597        // Client A starts following client B.
5598        workspace_a
5599            .update(cx_a, |workspace, cx| {
5600                workspace
5601                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5602                    .unwrap()
5603            })
5604            .await
5605            .unwrap();
5606        assert_eq!(
5607            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5608            Some(client_b_id)
5609        );
5610        assert_eq!(
5611            workspace_a.read_with(cx_a, |workspace, cx| workspace
5612                .active_item(cx)
5613                .unwrap()
5614                .id()),
5615            editor_a1.id()
5616        );
5617
5618        // Following interrupts when client B disconnects.
5619        client_b.disconnect(&cx_b.to_async()).unwrap();
5620        cx_a.foreground().run_until_parked();
5621        assert_eq!(
5622            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5623            None
5624        );
5625    }
5626
5627    #[gpui::test(iterations = 10)]
5628    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5629        cx_a.foreground().forbid_parking();
5630        let fs = FakeFs::new(cx_a.background());
5631
5632        // 2 clients connect to a server.
5633        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5634        let mut client_a = server.create_client(cx_a, "user_a").await;
5635        let mut client_b = server.create_client(cx_b, "user_b").await;
5636        server
5637            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5638            .await;
5639        cx_a.update(editor::init);
5640        cx_b.update(editor::init);
5641
5642        // Client A shares a project.
5643        fs.insert_tree(
5644            "/a",
5645            json!({
5646                "1.txt": "one",
5647                "2.txt": "two",
5648                "3.txt": "three",
5649                "4.txt": "four",
5650            }),
5651        )
5652        .await;
5653        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5654        project_a
5655            .update(cx_a, |project, cx| project.share(cx))
5656            .await
5657            .unwrap();
5658
5659        // Client B joins the project.
5660        let project_b = client_b
5661            .build_remote_project(
5662                project_a
5663                    .read_with(cx_a, |project, _| project.remote_id())
5664                    .unwrap(),
5665                cx_b,
5666            )
5667            .await;
5668
5669        // Client A opens some editors.
5670        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5671        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5672        let _editor_a1 = workspace_a
5673            .update(cx_a, |workspace, cx| {
5674                workspace.open_path((worktree_id, "1.txt"), true, cx)
5675            })
5676            .await
5677            .unwrap()
5678            .downcast::<Editor>()
5679            .unwrap();
5680
5681        // Client B opens an editor.
5682        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5683        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5684        let _editor_b1 = workspace_b
5685            .update(cx_b, |workspace, cx| {
5686                workspace.open_path((worktree_id, "2.txt"), true, cx)
5687            })
5688            .await
5689            .unwrap()
5690            .downcast::<Editor>()
5691            .unwrap();
5692
5693        // Clients A and B follow each other in split panes
5694        workspace_a
5695            .update(cx_a, |workspace, cx| {
5696                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5697                assert_ne!(*workspace.active_pane(), pane_a1);
5698                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5699                workspace
5700                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5701                    .unwrap()
5702            })
5703            .await
5704            .unwrap();
5705        workspace_b
5706            .update(cx_b, |workspace, cx| {
5707                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5708                assert_ne!(*workspace.active_pane(), pane_b1);
5709                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5710                workspace
5711                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5712                    .unwrap()
5713            })
5714            .await
5715            .unwrap();
5716
5717        workspace_a
5718            .update(cx_a, |workspace, cx| {
5719                workspace.activate_next_pane(cx);
5720                assert_eq!(*workspace.active_pane(), pane_a1);
5721                workspace.open_path((worktree_id, "3.txt"), true, cx)
5722            })
5723            .await
5724            .unwrap();
5725        workspace_b
5726            .update(cx_b, |workspace, cx| {
5727                workspace.activate_next_pane(cx);
5728                assert_eq!(*workspace.active_pane(), pane_b1);
5729                workspace.open_path((worktree_id, "4.txt"), true, cx)
5730            })
5731            .await
5732            .unwrap();
5733        cx_a.foreground().run_until_parked();
5734
5735        // Ensure leader updates don't change the active pane of followers
5736        workspace_a.read_with(cx_a, |workspace, _| {
5737            assert_eq!(*workspace.active_pane(), pane_a1);
5738        });
5739        workspace_b.read_with(cx_b, |workspace, _| {
5740            assert_eq!(*workspace.active_pane(), pane_b1);
5741        });
5742
5743        // Ensure peers following each other doesn't cause an infinite loop.
5744        assert_eq!(
5745            workspace_a.read_with(cx_a, |workspace, cx| workspace
5746                .active_item(cx)
5747                .unwrap()
5748                .project_path(cx)),
5749            Some((worktree_id, "3.txt").into())
5750        );
5751        workspace_a.update(cx_a, |workspace, cx| {
5752            assert_eq!(
5753                workspace.active_item(cx).unwrap().project_path(cx),
5754                Some((worktree_id, "3.txt").into())
5755            );
5756            workspace.activate_next_pane(cx);
5757            assert_eq!(
5758                workspace.active_item(cx).unwrap().project_path(cx),
5759                Some((worktree_id, "4.txt").into())
5760            );
5761        });
5762        workspace_b.update(cx_b, |workspace, cx| {
5763            assert_eq!(
5764                workspace.active_item(cx).unwrap().project_path(cx),
5765                Some((worktree_id, "4.txt").into())
5766            );
5767            workspace.activate_next_pane(cx);
5768            assert_eq!(
5769                workspace.active_item(cx).unwrap().project_path(cx),
5770                Some((worktree_id, "3.txt").into())
5771            );
5772        });
5773    }
5774
5775    #[gpui::test(iterations = 10)]
5776    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5777        cx_a.foreground().forbid_parking();
5778        let fs = FakeFs::new(cx_a.background());
5779
5780        // 2 clients connect to a server.
5781        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5782        let mut client_a = server.create_client(cx_a, "user_a").await;
5783        let mut client_b = server.create_client(cx_b, "user_b").await;
5784        server
5785            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5786            .await;
5787        cx_a.update(editor::init);
5788        cx_b.update(editor::init);
5789
5790        // Client A shares a project.
5791        fs.insert_tree(
5792            "/a",
5793            json!({
5794                "1.txt": "one",
5795                "2.txt": "two",
5796                "3.txt": "three",
5797            }),
5798        )
5799        .await;
5800        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5801        project_a
5802            .update(cx_a, |project, cx| project.share(cx))
5803            .await
5804            .unwrap();
5805
5806        // Client B joins the project.
5807        let project_b = client_b
5808            .build_remote_project(
5809                project_a
5810                    .read_with(cx_a, |project, _| project.remote_id())
5811                    .unwrap(),
5812                cx_b,
5813            )
5814            .await;
5815
5816        // Client A opens some editors.
5817        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5818        let _editor_a1 = workspace_a
5819            .update(cx_a, |workspace, cx| {
5820                workspace.open_path((worktree_id, "1.txt"), true, cx)
5821            })
5822            .await
5823            .unwrap()
5824            .downcast::<Editor>()
5825            .unwrap();
5826
5827        // Client B starts following client A.
5828        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5829        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5830        let leader_id = project_b.read_with(cx_b, |project, _| {
5831            project.collaborators().values().next().unwrap().peer_id
5832        });
5833        workspace_b
5834            .update(cx_b, |workspace, cx| {
5835                workspace
5836                    .toggle_follow(&ToggleFollow(leader_id), cx)
5837                    .unwrap()
5838            })
5839            .await
5840            .unwrap();
5841        assert_eq!(
5842            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5843            Some(leader_id)
5844        );
5845        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5846            workspace
5847                .active_item(cx)
5848                .unwrap()
5849                .downcast::<Editor>()
5850                .unwrap()
5851        });
5852
5853        // When client B moves, it automatically stops following client A.
5854        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5855        assert_eq!(
5856            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5857            None
5858        );
5859
5860        workspace_b
5861            .update(cx_b, |workspace, cx| {
5862                workspace
5863                    .toggle_follow(&ToggleFollow(leader_id), cx)
5864                    .unwrap()
5865            })
5866            .await
5867            .unwrap();
5868        assert_eq!(
5869            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5870            Some(leader_id)
5871        );
5872
5873        // When client B edits, it automatically stops following client A.
5874        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5875        assert_eq!(
5876            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5877            None
5878        );
5879
5880        workspace_b
5881            .update(cx_b, |workspace, cx| {
5882                workspace
5883                    .toggle_follow(&ToggleFollow(leader_id), cx)
5884                    .unwrap()
5885            })
5886            .await
5887            .unwrap();
5888        assert_eq!(
5889            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5890            Some(leader_id)
5891        );
5892
5893        // When client B scrolls, it automatically stops following client A.
5894        editor_b2.update(cx_b, |editor, cx| {
5895            editor.set_scroll_position(vec2f(0., 3.), cx)
5896        });
5897        assert_eq!(
5898            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5899            None
5900        );
5901
5902        workspace_b
5903            .update(cx_b, |workspace, cx| {
5904                workspace
5905                    .toggle_follow(&ToggleFollow(leader_id), cx)
5906                    .unwrap()
5907            })
5908            .await
5909            .unwrap();
5910        assert_eq!(
5911            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5912            Some(leader_id)
5913        );
5914
5915        // When client B activates a different pane, it continues following client A in the original pane.
5916        workspace_b.update(cx_b, |workspace, cx| {
5917            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5918        });
5919        assert_eq!(
5920            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5921            Some(leader_id)
5922        );
5923
5924        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5925        assert_eq!(
5926            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5927            Some(leader_id)
5928        );
5929
5930        // When client B activates a different item in the original pane, it automatically stops following client A.
5931        workspace_b
5932            .update(cx_b, |workspace, cx| {
5933                workspace.open_path((worktree_id, "2.txt"), true, cx)
5934            })
5935            .await
5936            .unwrap();
5937        assert_eq!(
5938            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5939            None
5940        );
5941    }
5942
5943    #[gpui::test(iterations = 100)]
5944    async fn test_random_collaboration(
5945        cx: &mut TestAppContext,
5946        deterministic: Arc<Deterministic>,
5947        rng: StdRng,
5948    ) {
5949        cx.foreground().forbid_parking();
5950        let max_peers = env::var("MAX_PEERS")
5951            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5952            .unwrap_or(5);
5953        assert!(max_peers <= 5);
5954
5955        let max_operations = env::var("OPERATIONS")
5956            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5957            .unwrap_or(10);
5958
5959        let rng = Arc::new(Mutex::new(rng));
5960
5961        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5962        let host_language_registry = Arc::new(LanguageRegistry::test());
5963
5964        let fs = FakeFs::new(cx.background());
5965        fs.insert_tree("/_collab", json!({"init": ""})).await;
5966
5967        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5968        let db = server.app_state.db.clone();
5969        let host_user_id = db.create_user("host", false).await.unwrap();
5970        for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
5971            let guest_user_id = db.create_user(username, false).await.unwrap();
5972            server
5973                .app_state
5974                .db
5975                .send_contact_request(guest_user_id, host_user_id)
5976                .await
5977                .unwrap();
5978            server
5979                .app_state
5980                .db
5981                .respond_to_contact_request(host_user_id, guest_user_id, true)
5982                .await
5983                .unwrap();
5984        }
5985
5986        let mut clients = Vec::new();
5987        let mut user_ids = Vec::new();
5988        let mut op_start_signals = Vec::new();
5989
5990        let mut next_entity_id = 100000;
5991        let mut host_cx = TestAppContext::new(
5992            cx.foreground_platform(),
5993            cx.platform(),
5994            deterministic.build_foreground(next_entity_id),
5995            deterministic.build_background(),
5996            cx.font_cache(),
5997            cx.leak_detector(),
5998            next_entity_id,
5999        );
6000        let host = server.create_client(&mut host_cx, "host").await;
6001        let host_project = host_cx.update(|cx| {
6002            Project::local(
6003                host.client.clone(),
6004                host.user_store.clone(),
6005                host_language_registry.clone(),
6006                fs.clone(),
6007                cx,
6008            )
6009        });
6010        let host_project_id = host_project
6011            .update(&mut host_cx, |p, _| p.next_remote_id())
6012            .await;
6013
6014        let (collab_worktree, _) = host_project
6015            .update(&mut host_cx, |project, cx| {
6016                project.find_or_create_local_worktree("/_collab", true, cx)
6017            })
6018            .await
6019            .unwrap();
6020        collab_worktree
6021            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
6022            .await;
6023        host_project
6024            .update(&mut host_cx, |project, cx| project.share(cx))
6025            .await
6026            .unwrap();
6027
6028        // Set up fake language servers.
6029        let mut language = Language::new(
6030            LanguageConfig {
6031                name: "Rust".into(),
6032                path_suffixes: vec!["rs".to_string()],
6033                ..Default::default()
6034            },
6035            None,
6036        );
6037        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6038            name: "the-fake-language-server",
6039            capabilities: lsp::LanguageServer::full_capabilities(),
6040            initializer: Some(Box::new({
6041                let rng = rng.clone();
6042                let fs = fs.clone();
6043                let project = host_project.downgrade();
6044                move |fake_server: &mut FakeLanguageServer| {
6045                    fake_server.handle_request::<lsp::request::Completion, _, _>(
6046                        |_, _| async move {
6047                            Ok(Some(lsp::CompletionResponse::Array(vec![
6048                                lsp::CompletionItem {
6049                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6050                                        range: lsp::Range::new(
6051                                            lsp::Position::new(0, 0),
6052                                            lsp::Position::new(0, 0),
6053                                        ),
6054                                        new_text: "the-new-text".to_string(),
6055                                    })),
6056                                    ..Default::default()
6057                                },
6058                            ])))
6059                        },
6060                    );
6061
6062                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6063                        |_, _| async move {
6064                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6065                                lsp::CodeAction {
6066                                    title: "the-code-action".to_string(),
6067                                    ..Default::default()
6068                                },
6069                            )]))
6070                        },
6071                    );
6072
6073                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6074                        |params, _| async move {
6075                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6076                                params.position,
6077                                params.position,
6078                            ))))
6079                        },
6080                    );
6081
6082                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6083                        let fs = fs.clone();
6084                        let rng = rng.clone();
6085                        move |_, _| {
6086                            let fs = fs.clone();
6087                            let rng = rng.clone();
6088                            async move {
6089                                let files = fs.files().await;
6090                                let mut rng = rng.lock();
6091                                let count = rng.gen_range::<usize, _>(1..3);
6092                                let files = (0..count)
6093                                    .map(|_| files.choose(&mut *rng).unwrap())
6094                                    .collect::<Vec<_>>();
6095                                log::info!("LSP: Returning definitions in files {:?}", &files);
6096                                Ok(Some(lsp::GotoDefinitionResponse::Array(
6097                                    files
6098                                        .into_iter()
6099                                        .map(|file| lsp::Location {
6100                                            uri: lsp::Url::from_file_path(file).unwrap(),
6101                                            range: Default::default(),
6102                                        })
6103                                        .collect(),
6104                                )))
6105                            }
6106                        }
6107                    });
6108
6109                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6110                        let rng = rng.clone();
6111                        let project = project.clone();
6112                        move |params, mut cx| {
6113                            let highlights = if let Some(project) = project.upgrade(&cx) {
6114                                project.update(&mut cx, |project, cx| {
6115                                    let path = params
6116                                        .text_document_position_params
6117                                        .text_document
6118                                        .uri
6119                                        .to_file_path()
6120                                        .unwrap();
6121                                    let (worktree, relative_path) =
6122                                        project.find_local_worktree(&path, cx)?;
6123                                    let project_path =
6124                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
6125                                    let buffer =
6126                                        project.get_open_buffer(&project_path, cx)?.read(cx);
6127
6128                                    let mut highlights = Vec::new();
6129                                    let highlight_count = rng.lock().gen_range(1..=5);
6130                                    let mut prev_end = 0;
6131                                    for _ in 0..highlight_count {
6132                                        let range =
6133                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
6134
6135                                        highlights.push(lsp::DocumentHighlight {
6136                                            range: range_to_lsp(range.to_point_utf16(buffer)),
6137                                            kind: Some(lsp::DocumentHighlightKind::READ),
6138                                        });
6139                                        prev_end = range.end;
6140                                    }
6141                                    Some(highlights)
6142                                })
6143                            } else {
6144                                None
6145                            };
6146                            async move { Ok(highlights) }
6147                        }
6148                    });
6149                }
6150            })),
6151            ..Default::default()
6152        });
6153        host_language_registry.add(Arc::new(language));
6154
6155        let op_start_signal = futures::channel::mpsc::unbounded();
6156        user_ids.push(host.current_user_id(&host_cx));
6157        op_start_signals.push(op_start_signal.0);
6158        clients.push(host_cx.foreground().spawn(host.simulate_host(
6159            host_project,
6160            op_start_signal.1,
6161            rng.clone(),
6162            host_cx,
6163        )));
6164
6165        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6166            rng.lock().gen_range(0..max_operations)
6167        } else {
6168            max_operations
6169        };
6170        let mut available_guests = vec![
6171            "guest-1".to_string(),
6172            "guest-2".to_string(),
6173            "guest-3".to_string(),
6174            "guest-4".to_string(),
6175        ];
6176        let mut operations = 0;
6177        while operations < max_operations {
6178            if operations == disconnect_host_at {
6179                server.disconnect_client(user_ids[0]);
6180                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6181                drop(op_start_signals);
6182                let mut clients = futures::future::join_all(clients).await;
6183                cx.foreground().run_until_parked();
6184
6185                let (host, mut host_cx, host_err) = clients.remove(0);
6186                if let Some(host_err) = host_err {
6187                    log::error!("host error - {}", host_err);
6188                }
6189                host.project
6190                    .as_ref()
6191                    .unwrap()
6192                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6193                for (guest, mut guest_cx, guest_err) in clients {
6194                    if let Some(guest_err) = guest_err {
6195                        log::error!("{} error - {}", guest.username, guest_err);
6196                    }
6197                    // TODO
6198                    // let contacts = server
6199                    //     .store
6200                    //     .read()
6201                    //     .await
6202                    //     .contacts_for_user(guest.current_user_id(&guest_cx));
6203                    // assert!(!contacts
6204                    //     .iter()
6205                    //     .flat_map(|contact| &contact.projects)
6206                    //     .any(|project| project.id == host_project_id));
6207                    guest
6208                        .project
6209                        .as_ref()
6210                        .unwrap()
6211                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6212                    guest_cx.update(|_| drop(guest));
6213                }
6214                host_cx.update(|_| drop(host));
6215
6216                return;
6217            }
6218
6219            let distribution = rng.lock().gen_range(0..100);
6220            match distribution {
6221                0..=19 if !available_guests.is_empty() => {
6222                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
6223                    let guest_username = available_guests.remove(guest_ix);
6224                    log::info!("Adding new connection for {}", guest_username);
6225                    next_entity_id += 100000;
6226                    let mut guest_cx = TestAppContext::new(
6227                        cx.foreground_platform(),
6228                        cx.platform(),
6229                        deterministic.build_foreground(next_entity_id),
6230                        deterministic.build_background(),
6231                        cx.font_cache(),
6232                        cx.leak_detector(),
6233                        next_entity_id,
6234                    );
6235                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
6236                    let guest_project = Project::remote(
6237                        host_project_id,
6238                        guest.client.clone(),
6239                        guest.user_store.clone(),
6240                        guest_lang_registry.clone(),
6241                        FakeFs::new(cx.background()),
6242                        &mut guest_cx.to_async(),
6243                    )
6244                    .await
6245                    .unwrap();
6246                    let op_start_signal = futures::channel::mpsc::unbounded();
6247                    user_ids.push(guest.current_user_id(&guest_cx));
6248                    op_start_signals.push(op_start_signal.0);
6249                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6250                        guest_username.clone(),
6251                        guest_project,
6252                        op_start_signal.1,
6253                        rng.clone(),
6254                        guest_cx,
6255                    )));
6256
6257                    log::info!("Added connection for {}", guest_username);
6258                    operations += 1;
6259                }
6260                20..=29 if clients.len() > 1 => {
6261                    let guest_ix = rng.lock().gen_range(1..clients.len());
6262                    log::info!("Removing guest {}", user_ids[guest_ix]);
6263                    let removed_guest_id = user_ids.remove(guest_ix);
6264                    let guest = clients.remove(guest_ix);
6265                    op_start_signals.remove(guest_ix);
6266                    server.disconnect_client(removed_guest_id);
6267                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6268                    let (guest, mut guest_cx, guest_err) = guest.await;
6269                    if let Some(guest_err) = guest_err {
6270                        log::error!("{} error - {}", guest.username, guest_err);
6271                    }
6272                    guest
6273                        .project
6274                        .as_ref()
6275                        .unwrap()
6276                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6277                    // TODO
6278                    // for user_id in &user_ids {
6279                    //     for contact in server.store.read().await.contacts_for_user(*user_id) {
6280                    //         assert_ne!(
6281                    //             contact.user_id, removed_guest_id.0 as u64,
6282                    //             "removed guest is still a contact of another peer"
6283                    //         );
6284                    //         for project in contact.projects {
6285                    //             for project_guest_id in project.guests {
6286                    //                 assert_ne!(
6287                    //                     project_guest_id, removed_guest_id.0 as u64,
6288                    //                     "removed guest appears as still participating on a project"
6289                    //                 );
6290                    //             }
6291                    //         }
6292                    //     }
6293                    // }
6294
6295                    log::info!("{} removed", guest.username);
6296                    available_guests.push(guest.username.clone());
6297                    guest_cx.update(|_| drop(guest));
6298
6299                    operations += 1;
6300                }
6301                _ => {
6302                    while operations < max_operations && rng.lock().gen_bool(0.7) {
6303                        op_start_signals
6304                            .choose(&mut *rng.lock())
6305                            .unwrap()
6306                            .unbounded_send(())
6307                            .unwrap();
6308                        operations += 1;
6309                    }
6310
6311                    if rng.lock().gen_bool(0.8) {
6312                        cx.foreground().run_until_parked();
6313                    }
6314                }
6315            }
6316        }
6317
6318        drop(op_start_signals);
6319        let mut clients = futures::future::join_all(clients).await;
6320        cx.foreground().run_until_parked();
6321
6322        let (host_client, mut host_cx, host_err) = clients.remove(0);
6323        if let Some(host_err) = host_err {
6324            panic!("host error - {}", host_err);
6325        }
6326        let host_project = host_client.project.as_ref().unwrap();
6327        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6328            project
6329                .worktrees(cx)
6330                .map(|worktree| {
6331                    let snapshot = worktree.read(cx).snapshot();
6332                    (snapshot.id(), snapshot)
6333                })
6334                .collect::<BTreeMap<_, _>>()
6335        });
6336
6337        host_client
6338            .project
6339            .as_ref()
6340            .unwrap()
6341            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6342
6343        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6344            if let Some(guest_err) = guest_err {
6345                panic!("{} error - {}", guest_client.username, guest_err);
6346            }
6347            let worktree_snapshots =
6348                guest_client
6349                    .project
6350                    .as_ref()
6351                    .unwrap()
6352                    .read_with(&guest_cx, |project, cx| {
6353                        project
6354                            .worktrees(cx)
6355                            .map(|worktree| {
6356                                let worktree = worktree.read(cx);
6357                                (worktree.id(), worktree.snapshot())
6358                            })
6359                            .collect::<BTreeMap<_, _>>()
6360                    });
6361
6362            assert_eq!(
6363                worktree_snapshots.keys().collect::<Vec<_>>(),
6364                host_worktree_snapshots.keys().collect::<Vec<_>>(),
6365                "{} has different worktrees than the host",
6366                guest_client.username
6367            );
6368            for (id, host_snapshot) in &host_worktree_snapshots {
6369                let guest_snapshot = &worktree_snapshots[id];
6370                assert_eq!(
6371                    guest_snapshot.root_name(),
6372                    host_snapshot.root_name(),
6373                    "{} has different root name than the host for worktree {}",
6374                    guest_client.username,
6375                    id
6376                );
6377                assert_eq!(
6378                    guest_snapshot.entries(false).collect::<Vec<_>>(),
6379                    host_snapshot.entries(false).collect::<Vec<_>>(),
6380                    "{} has different snapshot than the host for worktree {}",
6381                    guest_client.username,
6382                    id
6383                );
6384                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6385            }
6386
6387            guest_client
6388                .project
6389                .as_ref()
6390                .unwrap()
6391                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6392
6393            for guest_buffer in &guest_client.buffers {
6394                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6395                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6396                    project.buffer_for_id(buffer_id, cx).expect(&format!(
6397                        "host does not have buffer for guest:{}, peer:{}, id:{}",
6398                        guest_client.username, guest_client.peer_id, buffer_id
6399                    ))
6400                });
6401                let path = host_buffer
6402                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6403
6404                assert_eq!(
6405                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6406                    0,
6407                    "{}, buffer {}, path {:?} has deferred operations",
6408                    guest_client.username,
6409                    buffer_id,
6410                    path,
6411                );
6412                assert_eq!(
6413                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6414                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6415                    "{}, buffer {}, path {:?}, differs from the host's buffer",
6416                    guest_client.username,
6417                    buffer_id,
6418                    path
6419                );
6420            }
6421
6422            guest_cx.update(|_| drop(guest_client));
6423        }
6424
6425        host_cx.update(|_| drop(host_client));
6426    }
6427
6428    struct TestServer {
6429        peer: Arc<Peer>,
6430        app_state: Arc<AppState>,
6431        server: Arc<Server>,
6432        foreground: Rc<executor::Foreground>,
6433        notifications: mpsc::UnboundedReceiver<()>,
6434        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6435        forbid_connections: Arc<AtomicBool>,
6436        _test_db: TestDb,
6437    }
6438
6439    impl TestServer {
6440        async fn start(
6441            foreground: Rc<executor::Foreground>,
6442            background: Arc<executor::Background>,
6443        ) -> Self {
6444            let test_db = TestDb::fake(background);
6445            let app_state = Self::build_app_state(&test_db).await;
6446            let peer = Peer::new();
6447            let notifications = mpsc::unbounded();
6448            let server = Server::new(app_state.clone(), Some(notifications.0));
6449            Self {
6450                peer,
6451                app_state,
6452                server,
6453                foreground,
6454                notifications: notifications.1,
6455                connection_killers: Default::default(),
6456                forbid_connections: Default::default(),
6457                _test_db: test_db,
6458            }
6459        }
6460
6461        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6462            cx.update(|cx| {
6463                let settings = Settings::test(cx);
6464                cx.set_global(settings);
6465            });
6466
6467            let http = FakeHttpClient::with_404_response();
6468            let user_id =
6469                if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6470                    user.id
6471                } else {
6472                    self.app_state.db.create_user(name, false).await.unwrap()
6473                };
6474            let client_name = name.to_string();
6475            let mut client = Client::new(http.clone());
6476            let server = self.server.clone();
6477            let connection_killers = self.connection_killers.clone();
6478            let forbid_connections = self.forbid_connections.clone();
6479            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6480
6481            Arc::get_mut(&mut client)
6482                .unwrap()
6483                .override_authenticate(move |cx| {
6484                    cx.spawn(|_| async move {
6485                        let access_token = "the-token".to_string();
6486                        Ok(Credentials {
6487                            user_id: user_id.0 as u64,
6488                            access_token,
6489                        })
6490                    })
6491                })
6492                .override_establish_connection(move |credentials, cx| {
6493                    assert_eq!(credentials.user_id, user_id.0 as u64);
6494                    assert_eq!(credentials.access_token, "the-token");
6495
6496                    let server = server.clone();
6497                    let connection_killers = connection_killers.clone();
6498                    let forbid_connections = forbid_connections.clone();
6499                    let client_name = client_name.clone();
6500                    let connection_id_tx = connection_id_tx.clone();
6501                    cx.spawn(move |cx| async move {
6502                        if forbid_connections.load(SeqCst) {
6503                            Err(EstablishConnectionError::other(anyhow!(
6504                                "server is forbidding connections"
6505                            )))
6506                        } else {
6507                            let (client_conn, server_conn, killed) =
6508                                Connection::in_memory(cx.background());
6509                            connection_killers.lock().insert(user_id, killed);
6510                            cx.background()
6511                                .spawn(server.handle_connection(
6512                                    server_conn,
6513                                    client_name,
6514                                    user_id,
6515                                    Some(connection_id_tx),
6516                                    cx.background(),
6517                                ))
6518                                .detach();
6519                            Ok(client_conn)
6520                        }
6521                    })
6522                });
6523
6524            client
6525                .authenticate_and_connect(false, &cx.to_async())
6526                .await
6527                .unwrap();
6528
6529            Channel::init(&client);
6530            Project::init(&client);
6531            cx.update(|cx| {
6532                workspace::init(&client, cx);
6533            });
6534
6535            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6536            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6537
6538            let client = TestClient {
6539                client,
6540                peer_id,
6541                username: name.to_string(),
6542                user_store,
6543                language_registry: Arc::new(LanguageRegistry::test()),
6544                project: Default::default(),
6545                buffers: Default::default(),
6546            };
6547            client.wait_for_current_user(cx).await;
6548            client
6549        }
6550
6551        fn disconnect_client(&self, user_id: UserId) {
6552            self.connection_killers
6553                .lock()
6554                .remove(&user_id)
6555                .unwrap()
6556                .store(true, SeqCst);
6557        }
6558
6559        fn forbid_connections(&self) {
6560            self.forbid_connections.store(true, SeqCst);
6561        }
6562
6563        fn allow_connections(&self) {
6564            self.forbid_connections.store(false, SeqCst);
6565        }
6566
6567        async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6568            while let Some((client_a, cx_a)) = clients.pop() {
6569                for (client_b, cx_b) in &mut clients {
6570                    client_a
6571                        .user_store
6572                        .update(cx_a, |store, cx| {
6573                            store.request_contact(client_b.user_id().unwrap(), cx)
6574                        })
6575                        .await
6576                        .unwrap();
6577                    cx_a.foreground().run_until_parked();
6578                    client_b
6579                        .user_store
6580                        .update(*cx_b, |store, cx| {
6581                            store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6582                        })
6583                        .await
6584                        .unwrap();
6585                }
6586            }
6587        }
6588
6589        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6590            Arc::new(AppState {
6591                db: test_db.db().clone(),
6592                api_token: Default::default(),
6593            })
6594        }
6595
6596        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6597            self.server.store.read().await
6598        }
6599
6600        async fn condition<F>(&mut self, mut predicate: F)
6601        where
6602            F: FnMut(&Store) -> bool,
6603        {
6604            assert!(
6605                self.foreground.parking_forbidden(),
6606                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6607            );
6608            while !(predicate)(&*self.server.store.read().await) {
6609                self.foreground.start_waiting();
6610                self.notifications.next().await;
6611                self.foreground.finish_waiting();
6612            }
6613        }
6614    }
6615
6616    impl Deref for TestServer {
6617        type Target = Server;
6618
6619        fn deref(&self) -> &Self::Target {
6620            &self.server
6621        }
6622    }
6623
6624    impl Drop for TestServer {
6625        fn drop(&mut self) {
6626            self.peer.reset();
6627        }
6628    }
6629
6630    struct TestClient {
6631        client: Arc<Client>,
6632        username: String,
6633        pub peer_id: PeerId,
6634        pub user_store: ModelHandle<UserStore>,
6635        language_registry: Arc<LanguageRegistry>,
6636        project: Option<ModelHandle<Project>>,
6637        buffers: HashSet<ModelHandle<language::Buffer>>,
6638    }
6639
6640    impl Deref for TestClient {
6641        type Target = Arc<Client>;
6642
6643        fn deref(&self) -> &Self::Target {
6644            &self.client
6645        }
6646    }
6647
6648    struct ContactsSummary {
6649        pub current: Vec<String>,
6650        pub outgoing_requests: Vec<String>,
6651        pub incoming_requests: Vec<String>,
6652    }
6653
6654    impl TestClient {
6655        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6656            UserId::from_proto(
6657                self.user_store
6658                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6659            )
6660        }
6661
6662        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6663            let mut authed_user = self
6664                .user_store
6665                .read_with(cx, |user_store, _| user_store.watch_current_user());
6666            while authed_user.next().await.unwrap().is_none() {}
6667        }
6668
6669        async fn clear_contacts(&self, cx: &mut TestAppContext) {
6670            self.user_store
6671                .update(cx, |store, _| store.clear_contacts())
6672                .await;
6673        }
6674
6675        fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6676            self.user_store.read_with(cx, |store, _| ContactsSummary {
6677                current: store
6678                    .contacts()
6679                    .iter()
6680                    .map(|contact| contact.user.github_login.clone())
6681                    .collect(),
6682                outgoing_requests: store
6683                    .outgoing_contact_requests()
6684                    .iter()
6685                    .map(|user| user.github_login.clone())
6686                    .collect(),
6687                incoming_requests: store
6688                    .incoming_contact_requests()
6689                    .iter()
6690                    .map(|user| user.github_login.clone())
6691                    .collect(),
6692            })
6693        }
6694
6695        async fn build_local_project(
6696            &mut self,
6697            fs: Arc<FakeFs>,
6698            root_path: impl AsRef<Path>,
6699            cx: &mut TestAppContext,
6700        ) -> (ModelHandle<Project>, WorktreeId) {
6701            let project = cx.update(|cx| {
6702                Project::local(
6703                    self.client.clone(),
6704                    self.user_store.clone(),
6705                    self.language_registry.clone(),
6706                    fs,
6707                    cx,
6708                )
6709            });
6710            self.project = Some(project.clone());
6711            let (worktree, _) = project
6712                .update(cx, |p, cx| {
6713                    p.find_or_create_local_worktree(root_path, true, cx)
6714                })
6715                .await
6716                .unwrap();
6717            worktree
6718                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6719                .await;
6720            project
6721                .update(cx, |project, _| project.next_remote_id())
6722                .await;
6723            (project, worktree.read_with(cx, |tree, _| tree.id()))
6724        }
6725
6726        async fn build_remote_project(
6727            &mut self,
6728            project_id: u64,
6729            cx: &mut TestAppContext,
6730        ) -> ModelHandle<Project> {
6731            let project = Project::remote(
6732                project_id,
6733                self.client.clone(),
6734                self.user_store.clone(),
6735                self.language_registry.clone(),
6736                FakeFs::new(cx.background()),
6737                &mut cx.to_async(),
6738            )
6739            .await
6740            .unwrap();
6741            self.project = Some(project.clone());
6742            project
6743        }
6744
6745        fn build_workspace(
6746            &self,
6747            project: &ModelHandle<Project>,
6748            cx: &mut TestAppContext,
6749        ) -> ViewHandle<Workspace> {
6750            let (window_id, _) = cx.add_window(|_| EmptyView);
6751            cx.add_view(window_id, |cx| {
6752                let fs = project.read(cx).fs().clone();
6753                Workspace::new(
6754                    &WorkspaceParams {
6755                        fs,
6756                        project: project.clone(),
6757                        user_store: self.user_store.clone(),
6758                        languages: self.language_registry.clone(),
6759                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6760                        channel_list: cx.add_model(|cx| {
6761                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6762                        }),
6763                        client: self.client.clone(),
6764                    },
6765                    cx,
6766                )
6767            })
6768        }
6769
6770        async fn simulate_host(
6771            mut self,
6772            project: ModelHandle<Project>,
6773            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6774            rng: Arc<Mutex<StdRng>>,
6775            mut cx: TestAppContext,
6776        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6777            async fn simulate_host_internal(
6778                client: &mut TestClient,
6779                project: ModelHandle<Project>,
6780                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6781                rng: Arc<Mutex<StdRng>>,
6782                cx: &mut TestAppContext,
6783            ) -> anyhow::Result<()> {
6784                let fs = project.read_with(cx, |project, _| project.fs().clone());
6785
6786                while op_start_signal.next().await.is_some() {
6787                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6788                    let files = fs.as_fake().files().await;
6789                    match distribution {
6790                        0..=20 if !files.is_empty() => {
6791                            let path = files.choose(&mut *rng.lock()).unwrap();
6792                            let mut path = path.as_path();
6793                            while let Some(parent_path) = path.parent() {
6794                                path = parent_path;
6795                                if rng.lock().gen() {
6796                                    break;
6797                                }
6798                            }
6799
6800                            log::info!("Host: find/create local worktree {:?}", path);
6801                            let find_or_create_worktree = project.update(cx, |project, cx| {
6802                                project.find_or_create_local_worktree(path, true, cx)
6803                            });
6804                            if rng.lock().gen() {
6805                                cx.background().spawn(find_or_create_worktree).detach();
6806                            } else {
6807                                find_or_create_worktree.await?;
6808                            }
6809                        }
6810                        10..=80 if !files.is_empty() => {
6811                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6812                                let file = files.choose(&mut *rng.lock()).unwrap();
6813                                let (worktree, path) = project
6814                                    .update(cx, |project, cx| {
6815                                        project.find_or_create_local_worktree(
6816                                            file.clone(),
6817                                            true,
6818                                            cx,
6819                                        )
6820                                    })
6821                                    .await?;
6822                                let project_path =
6823                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6824                                log::info!(
6825                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6826                                    file,
6827                                    project_path.0,
6828                                    project_path.1
6829                                );
6830                                let buffer = project
6831                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6832                                    .await
6833                                    .unwrap();
6834                                client.buffers.insert(buffer.clone());
6835                                buffer
6836                            } else {
6837                                client
6838                                    .buffers
6839                                    .iter()
6840                                    .choose(&mut *rng.lock())
6841                                    .unwrap()
6842                                    .clone()
6843                            };
6844
6845                            if rng.lock().gen_bool(0.1) {
6846                                cx.update(|cx| {
6847                                    log::info!(
6848                                        "Host: dropping buffer {:?}",
6849                                        buffer.read(cx).file().unwrap().full_path(cx)
6850                                    );
6851                                    client.buffers.remove(&buffer);
6852                                    drop(buffer);
6853                                });
6854                            } else {
6855                                buffer.update(cx, |buffer, cx| {
6856                                    log::info!(
6857                                        "Host: updating buffer {:?} ({})",
6858                                        buffer.file().unwrap().full_path(cx),
6859                                        buffer.remote_id()
6860                                    );
6861
6862                                    if rng.lock().gen_bool(0.7) {
6863                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6864                                    } else {
6865                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6866                                    }
6867                                });
6868                            }
6869                        }
6870                        _ => loop {
6871                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6872                            let mut path = PathBuf::new();
6873                            path.push("/");
6874                            for _ in 0..path_component_count {
6875                                let letter = rng.lock().gen_range(b'a'..=b'z');
6876                                path.push(std::str::from_utf8(&[letter]).unwrap());
6877                            }
6878                            path.set_extension("rs");
6879                            let parent_path = path.parent().unwrap();
6880
6881                            log::info!("Host: creating file {:?}", path,);
6882
6883                            if fs.create_dir(&parent_path).await.is_ok()
6884                                && fs.create_file(&path, Default::default()).await.is_ok()
6885                            {
6886                                break;
6887                            } else {
6888                                log::info!("Host: cannot create file");
6889                            }
6890                        },
6891                    }
6892
6893                    cx.background().simulate_random_delay().await;
6894                }
6895
6896                Ok(())
6897            }
6898
6899            let result =
6900                simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
6901                    .await;
6902            log::info!("Host done");
6903            self.project = Some(project);
6904            (self, cx, result.err())
6905        }
6906
6907        pub async fn simulate_guest(
6908            mut self,
6909            guest_username: String,
6910            project: ModelHandle<Project>,
6911            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6912            rng: Arc<Mutex<StdRng>>,
6913            mut cx: TestAppContext,
6914        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6915            async fn simulate_guest_internal(
6916                client: &mut TestClient,
6917                guest_username: &str,
6918                project: ModelHandle<Project>,
6919                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6920                rng: Arc<Mutex<StdRng>>,
6921                cx: &mut TestAppContext,
6922            ) -> anyhow::Result<()> {
6923                while op_start_signal.next().await.is_some() {
6924                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6925                        let worktree = if let Some(worktree) =
6926                            project.read_with(cx, |project, cx| {
6927                                project
6928                                    .worktrees(&cx)
6929                                    .filter(|worktree| {
6930                                        let worktree = worktree.read(cx);
6931                                        worktree.is_visible()
6932                                            && worktree.entries(false).any(|e| e.is_file())
6933                                    })
6934                                    .choose(&mut *rng.lock())
6935                            }) {
6936                            worktree
6937                        } else {
6938                            cx.background().simulate_random_delay().await;
6939                            continue;
6940                        };
6941
6942                        let (worktree_root_name, project_path) =
6943                            worktree.read_with(cx, |worktree, _| {
6944                                let entry = worktree
6945                                    .entries(false)
6946                                    .filter(|e| e.is_file())
6947                                    .choose(&mut *rng.lock())
6948                                    .unwrap();
6949                                (
6950                                    worktree.root_name().to_string(),
6951                                    (worktree.id(), entry.path.clone()),
6952                                )
6953                            });
6954                        log::info!(
6955                            "{}: opening path {:?} in worktree {} ({})",
6956                            guest_username,
6957                            project_path.1,
6958                            project_path.0,
6959                            worktree_root_name,
6960                        );
6961                        let buffer = project
6962                            .update(cx, |project, cx| {
6963                                project.open_buffer(project_path.clone(), cx)
6964                            })
6965                            .await?;
6966                        log::info!(
6967                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6968                            guest_username,
6969                            project_path.1,
6970                            project_path.0,
6971                            worktree_root_name,
6972                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6973                        );
6974                        client.buffers.insert(buffer.clone());
6975                        buffer
6976                    } else {
6977                        client
6978                            .buffers
6979                            .iter()
6980                            .choose(&mut *rng.lock())
6981                            .unwrap()
6982                            .clone()
6983                    };
6984
6985                    let choice = rng.lock().gen_range(0..100);
6986                    match choice {
6987                        0..=9 => {
6988                            cx.update(|cx| {
6989                                log::info!(
6990                                    "{}: dropping buffer {:?}",
6991                                    guest_username,
6992                                    buffer.read(cx).file().unwrap().full_path(cx)
6993                                );
6994                                client.buffers.remove(&buffer);
6995                                drop(buffer);
6996                            });
6997                        }
6998                        10..=19 => {
6999                            let completions = project.update(cx, |project, cx| {
7000                                log::info!(
7001                                    "{}: requesting completions for buffer {} ({:?})",
7002                                    guest_username,
7003                                    buffer.read(cx).remote_id(),
7004                                    buffer.read(cx).file().unwrap().full_path(cx)
7005                                );
7006                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7007                                project.completions(&buffer, offset, cx)
7008                            });
7009                            let completions = cx.background().spawn(async move {
7010                                completions
7011                                    .await
7012                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
7013                            });
7014                            if rng.lock().gen_bool(0.3) {
7015                                log::info!("{}: detaching completions request", guest_username);
7016                                cx.update(|cx| completions.detach_and_log_err(cx));
7017                            } else {
7018                                completions.await?;
7019                            }
7020                        }
7021                        20..=29 => {
7022                            let code_actions = project.update(cx, |project, cx| {
7023                                log::info!(
7024                                    "{}: requesting code actions for buffer {} ({:?})",
7025                                    guest_username,
7026                                    buffer.read(cx).remote_id(),
7027                                    buffer.read(cx).file().unwrap().full_path(cx)
7028                                );
7029                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7030                                project.code_actions(&buffer, range, cx)
7031                            });
7032                            let code_actions = cx.background().spawn(async move {
7033                                code_actions.await.map_err(|err| {
7034                                    anyhow!("code actions request failed: {:?}", err)
7035                                })
7036                            });
7037                            if rng.lock().gen_bool(0.3) {
7038                                log::info!("{}: detaching code actions request", guest_username);
7039                                cx.update(|cx| code_actions.detach_and_log_err(cx));
7040                            } else {
7041                                code_actions.await?;
7042                            }
7043                        }
7044                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7045                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7046                                log::info!(
7047                                    "{}: saving buffer {} ({:?})",
7048                                    guest_username,
7049                                    buffer.remote_id(),
7050                                    buffer.file().unwrap().full_path(cx)
7051                                );
7052                                (buffer.version(), buffer.save(cx))
7053                            });
7054                            let save = cx.background().spawn(async move {
7055                                let (saved_version, _) = save
7056                                    .await
7057                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7058                                assert!(saved_version.observed_all(&requested_version));
7059                                Ok::<_, anyhow::Error>(())
7060                            });
7061                            if rng.lock().gen_bool(0.3) {
7062                                log::info!("{}: detaching save request", guest_username);
7063                                cx.update(|cx| save.detach_and_log_err(cx));
7064                            } else {
7065                                save.await?;
7066                            }
7067                        }
7068                        40..=44 => {
7069                            let prepare_rename = project.update(cx, |project, cx| {
7070                                log::info!(
7071                                    "{}: preparing rename for buffer {} ({:?})",
7072                                    guest_username,
7073                                    buffer.read(cx).remote_id(),
7074                                    buffer.read(cx).file().unwrap().full_path(cx)
7075                                );
7076                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7077                                project.prepare_rename(buffer, offset, cx)
7078                            });
7079                            let prepare_rename = cx.background().spawn(async move {
7080                                prepare_rename.await.map_err(|err| {
7081                                    anyhow!("prepare rename request failed: {:?}", err)
7082                                })
7083                            });
7084                            if rng.lock().gen_bool(0.3) {
7085                                log::info!("{}: detaching prepare rename request", guest_username);
7086                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7087                            } else {
7088                                prepare_rename.await?;
7089                            }
7090                        }
7091                        45..=49 => {
7092                            let definitions = project.update(cx, |project, cx| {
7093                                log::info!(
7094                                    "{}: requesting definitions for buffer {} ({:?})",
7095                                    guest_username,
7096                                    buffer.read(cx).remote_id(),
7097                                    buffer.read(cx).file().unwrap().full_path(cx)
7098                                );
7099                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7100                                project.definition(&buffer, offset, cx)
7101                            });
7102                            let definitions = cx.background().spawn(async move {
7103                                definitions
7104                                    .await
7105                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7106                            });
7107                            if rng.lock().gen_bool(0.3) {
7108                                log::info!("{}: detaching definitions request", guest_username);
7109                                cx.update(|cx| definitions.detach_and_log_err(cx));
7110                            } else {
7111                                client
7112                                    .buffers
7113                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7114                            }
7115                        }
7116                        50..=54 => {
7117                            let highlights = project.update(cx, |project, cx| {
7118                                log::info!(
7119                                    "{}: requesting highlights for buffer {} ({:?})",
7120                                    guest_username,
7121                                    buffer.read(cx).remote_id(),
7122                                    buffer.read(cx).file().unwrap().full_path(cx)
7123                                );
7124                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7125                                project.document_highlights(&buffer, offset, cx)
7126                            });
7127                            let highlights = cx.background().spawn(async move {
7128                                highlights
7129                                    .await
7130                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7131                            });
7132                            if rng.lock().gen_bool(0.3) {
7133                                log::info!("{}: detaching highlights request", guest_username);
7134                                cx.update(|cx| highlights.detach_and_log_err(cx));
7135                            } else {
7136                                highlights.await?;
7137                            }
7138                        }
7139                        55..=59 => {
7140                            let search = project.update(cx, |project, cx| {
7141                                let query = rng.lock().gen_range('a'..='z');
7142                                log::info!("{}: project-wide search {:?}", guest_username, query);
7143                                project.search(SearchQuery::text(query, false, false), cx)
7144                            });
7145                            let search = cx.background().spawn(async move {
7146                                search
7147                                    .await
7148                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
7149                            });
7150                            if rng.lock().gen_bool(0.3) {
7151                                log::info!("{}: detaching search request", guest_username);
7152                                cx.update(|cx| search.detach_and_log_err(cx));
7153                            } else {
7154                                client.buffers.extend(search.await?.into_keys());
7155                            }
7156                        }
7157                        60..=69 => {
7158                            let worktree = project
7159                                .read_with(cx, |project, cx| {
7160                                    project
7161                                        .worktrees(&cx)
7162                                        .filter(|worktree| {
7163                                            let worktree = worktree.read(cx);
7164                                            worktree.is_visible()
7165                                                && worktree.entries(false).any(|e| e.is_file())
7166                                                && worktree
7167                                                    .root_entry()
7168                                                    .map_or(false, |e| e.is_dir())
7169                                        })
7170                                        .choose(&mut *rng.lock())
7171                                })
7172                                .unwrap();
7173                            let (worktree_id, worktree_root_name) = worktree
7174                                .read_with(cx, |worktree, _| {
7175                                    (worktree.id(), worktree.root_name().to_string())
7176                                });
7177
7178                            let mut new_name = String::new();
7179                            for _ in 0..10 {
7180                                let letter = rng.lock().gen_range('a'..='z');
7181                                new_name.push(letter);
7182                            }
7183                            let mut new_path = PathBuf::new();
7184                            new_path.push(new_name);
7185                            new_path.set_extension("rs");
7186                            log::info!(
7187                                "{}: creating {:?} in worktree {} ({})",
7188                                guest_username,
7189                                new_path,
7190                                worktree_id,
7191                                worktree_root_name,
7192                            );
7193                            project
7194                                .update(cx, |project, cx| {
7195                                    project.create_entry((worktree_id, new_path), false, cx)
7196                                })
7197                                .unwrap()
7198                                .await?;
7199                        }
7200                        _ => {
7201                            buffer.update(cx, |buffer, cx| {
7202                                log::info!(
7203                                    "{}: updating buffer {} ({:?})",
7204                                    guest_username,
7205                                    buffer.remote_id(),
7206                                    buffer.file().unwrap().full_path(cx)
7207                                );
7208                                if rng.lock().gen_bool(0.7) {
7209                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7210                                } else {
7211                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7212                                }
7213                            });
7214                        }
7215                    }
7216                    cx.background().simulate_random_delay().await;
7217                }
7218                Ok(())
7219            }
7220
7221            let result = simulate_guest_internal(
7222                &mut self,
7223                &guest_username,
7224                project.clone(),
7225                op_start_signal,
7226                rng,
7227                &mut cx,
7228            )
7229            .await;
7230            log::info!("{}: done", guest_username);
7231
7232            self.project = Some(project);
7233            (self, cx, result.err())
7234        }
7235    }
7236
7237    impl Drop for TestClient {
7238        fn drop(&mut self) {
7239            self.client.tear_down();
7240        }
7241    }
7242
7243    impl Executor for Arc<gpui::executor::Background> {
7244        type Sleep = gpui::executor::Timer;
7245
7246        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7247            self.spawn(future).detach();
7248        }
7249
7250        fn sleep(&self, duration: Duration) -> Self::Sleep {
7251            self.as_ref().timer(duration)
7252        }
7253    }
7254
7255    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7256        channel
7257            .messages()
7258            .cursor::<()>()
7259            .map(|m| {
7260                (
7261                    m.sender.github_login.clone(),
7262                    m.body.clone(),
7263                    m.is_pending(),
7264                )
7265            })
7266            .collect()
7267    }
7268
7269    struct EmptyView;
7270
7271    impl gpui::Entity for EmptyView {
7272        type Event = ();
7273    }
7274
7275    impl gpui::View for EmptyView {
7276        fn ui_name() -> &'static str {
7277            "empty view"
7278        }
7279
7280        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7281            gpui::Element::boxed(gpui::elements::Empty::new())
7282        }
7283    }
7284}