rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::IntoResponse,
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::HashMap;
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::{
  40        atomic::{AtomicBool, Ordering::SeqCst},
  41        Arc,
  42    },
  43    time::Duration,
  44};
  45use store::{Store, Worktree};
  46use time::OffsetDateTime;
  47use tokio::{
  48    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  49    time::Sleep,
  50};
  51use tower::ServiceBuilder;
  52use tracing::{info_span, Instrument};
  53
  54type MessageHandler =
  55    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  56
  57struct Response<R> {
  58    server: Arc<Server>,
  59    receipt: Receipt<R>,
  60    responded: Arc<AtomicBool>,
  61}
  62
  63impl<R: RequestMessage> Response<R> {
  64    fn send(self, payload: R::Response) -> Result<()> {
  65        self.responded.store(true, SeqCst);
  66        self.server.peer.respond(self.receipt, payload)?;
  67        Ok(())
  68    }
  69}
  70
  71pub struct Server {
  72    peer: Arc<Peer>,
  73    store: RwLock<Store>,
  74    app_state: Arc<AppState>,
  75    handlers: HashMap<TypeId, MessageHandler>,
  76    notifications: Option<mpsc::UnboundedSender<()>>,
  77}
  78
  79pub trait Executor: Send + Clone {
  80    type Sleep: Send + Future;
  81    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  82    fn sleep(&self, duration: Duration) -> Self::Sleep;
  83}
  84
  85#[derive(Clone)]
  86pub struct RealExecutor;
  87
  88const MESSAGE_COUNT_PER_PAGE: usize = 100;
  89const MAX_MESSAGE_LEN: usize = 1024;
  90
  91struct StoreReadGuard<'a> {
  92    guard: RwLockReadGuard<'a, Store>,
  93    _not_send: PhantomData<Rc<()>>,
  94}
  95
  96struct StoreWriteGuard<'a> {
  97    guard: RwLockWriteGuard<'a, Store>,
  98    _not_send: PhantomData<Rc<()>>,
  99}
 100
 101impl Server {
 102    pub fn new(
 103        app_state: Arc<AppState>,
 104        notifications: Option<mpsc::UnboundedSender<()>>,
 105    ) -> Arc<Self> {
 106        let mut server = Self {
 107            peer: Peer::new(),
 108            app_state,
 109            store: Default::default(),
 110            handlers: Default::default(),
 111            notifications,
 112        };
 113
 114        server
 115            .add_request_handler(Server::ping)
 116            .add_request_handler(Server::register_project)
 117            .add_message_handler(Server::unregister_project)
 118            .add_request_handler(Server::share_project)
 119            .add_message_handler(Server::unshare_project)
 120            .add_request_handler(Server::join_project)
 121            .add_message_handler(Server::leave_project)
 122            .add_request_handler(Server::register_worktree)
 123            .add_message_handler(Server::unregister_worktree)
 124            .add_request_handler(Server::update_worktree)
 125            .add_message_handler(Server::start_language_server)
 126            .add_message_handler(Server::update_language_server)
 127            .add_message_handler(Server::update_diagnostic_summary)
 128            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 129            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 130            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 131            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 132            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 133            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 134            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 135            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 136            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 137            .add_request_handler(
 138                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 139            )
 140            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 141            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 142            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 143            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 144            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 145            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 146            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 147            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 148            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 149            .add_request_handler(Server::update_buffer)
 150            .add_message_handler(Server::update_buffer_file)
 151            .add_message_handler(Server::buffer_reloaded)
 152            .add_message_handler(Server::buffer_saved)
 153            .add_request_handler(Server::save_buffer)
 154            .add_request_handler(Server::get_channels)
 155            .add_request_handler(Server::get_users)
 156            .add_request_handler(Server::fuzzy_search_users)
 157            .add_request_handler(Server::request_contact)
 158            .add_request_handler(Server::remove_contact)
 159            .add_request_handler(Server::respond_to_contact_request)
 160            .add_request_handler(Server::join_channel)
 161            .add_message_handler(Server::leave_channel)
 162            .add_request_handler(Server::send_channel_message)
 163            .add_request_handler(Server::follow)
 164            .add_message_handler(Server::unfollow)
 165            .add_message_handler(Server::update_followers)
 166            .add_request_handler(Server::get_channel_messages);
 167
 168        Arc::new(server)
 169    }
 170
 171    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 172    where
 173        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 174        Fut: 'static + Send + Future<Output = Result<()>>,
 175        M: EnvelopedMessage,
 176    {
 177        let prev_handler = self.handlers.insert(
 178            TypeId::of::<M>(),
 179            Box::new(move |server, envelope| {
 180                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 181                let span = info_span!(
 182                    "handle message",
 183                    payload_type = envelope.payload_type_name(),
 184                    payload = format!("{:?}", envelope.payload).as_str(),
 185                );
 186                let future = (handler)(server, *envelope);
 187                async move {
 188                    if let Err(error) = future.await {
 189                        tracing::error!(%error, "error handling message");
 190                    }
 191                }
 192                .instrument(span)
 193                .boxed()
 194            }),
 195        );
 196        if prev_handler.is_some() {
 197            panic!("registered a handler for the same message twice");
 198        }
 199        self
 200    }
 201
 202    /// Handle a request while holding a lock to the store. This is useful when we're registering
 203    /// a connection but we want to respond on the connection before anybody else can send on it.
 204    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 205    where
 206        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
 207        Fut: Send + Future<Output = Result<()>>,
 208        M: RequestMessage,
 209    {
 210        let handler = Arc::new(handler);
 211        self.add_message_handler(move |server, envelope| {
 212            let receipt = envelope.receipt();
 213            let handler = handler.clone();
 214            async move {
 215                let responded = Arc::new(AtomicBool::default());
 216                let response = Response {
 217                    server: server.clone(),
 218                    responded: responded.clone(),
 219                    receipt: envelope.receipt(),
 220                };
 221                match (handler)(server.clone(), envelope, response).await {
 222                    Ok(()) => {
 223                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 224                            Ok(())
 225                        } else {
 226                            Err(anyhow!("handler did not send a response"))?
 227                        }
 228                    }
 229                    Err(error) => {
 230                        server.peer.respond_with_error(
 231                            receipt,
 232                            proto::Error {
 233                                message: error.to_string(),
 234                            },
 235                        )?;
 236                        Err(error)
 237                    }
 238                }
 239            }
 240        })
 241    }
 242
 243    pub fn handle_connection<E: Executor>(
 244        self: &Arc<Self>,
 245        connection: Connection,
 246        address: String,
 247        user_id: UserId,
 248        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 249        executor: E,
 250    ) -> impl Future<Output = Result<()>> {
 251        let mut this = self.clone();
 252        let span = info_span!("handle connection", %user_id, %address);
 253        async move {
 254            let (connection_id, handle_io, mut incoming_rx) = this
 255                .peer
 256                .add_connection(connection, {
 257                    let executor = executor.clone();
 258                    move |duration| {
 259                        let timer = executor.sleep(duration);
 260                        async move {
 261                            timer.await;
 262                        }
 263                    }
 264                })
 265                .await;
 266
 267            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 268
 269            if let Some(send_connection_id) = send_connection_id.as_mut() {
 270                let _ = send_connection_id.send(connection_id).await;
 271            }
 272
 273            let contacts = this.app_state.db.get_contacts(user_id).await?;
 274
 275            {
 276                let mut store = this.store_mut().await;
 277                store.add_connection(connection_id, user_id);
 278                this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 279            }
 280            this.update_user_contacts(user_id).await?;
 281
 282            let handle_io = handle_io.fuse();
 283            futures::pin_mut!(handle_io);
 284            loop {
 285                let next_message = incoming_rx.next().fuse();
 286                futures::pin_mut!(next_message);
 287                futures::select_biased! {
 288                    result = handle_io => {
 289                        if let Err(error) = result {
 290                            tracing::error!(%error, "error handling I/O");
 291                        }
 292                        break;
 293                    }
 294                    message = next_message => {
 295                        if let Some(message) = message {
 296                            let type_name = message.payload_type_name();
 297                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 298                            async {
 299                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 300                                    let notifications = this.notifications.clone();
 301                                    let is_background = message.is_background();
 302                                    let handle_message = (handler)(this.clone(), message);
 303                                    let handle_message = async move {
 304                                        handle_message.await;
 305                                        if let Some(mut notifications) = notifications {
 306                                            let _ = notifications.send(()).await;
 307                                        }
 308                                    };
 309                                    if is_background {
 310                                        executor.spawn_detached(handle_message);
 311                                    } else {
 312                                        handle_message.await;
 313                                    }
 314                                } else {
 315                                    tracing::error!("no message handler");
 316                                }
 317                            }.instrument(span).await;
 318                        } else {
 319                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 320                            break;
 321                        }
 322                    }
 323                }
 324            }
 325
 326            if let Err(error) = this.sign_out(connection_id).await {
 327                tracing::error!(%error, "error signing out");
 328            }
 329
 330            Ok(())
 331        }.instrument(span)
 332    }
 333
 334    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 335        self.peer.disconnect(connection_id);
 336        let removed_connection = self.store_mut().await.remove_connection(connection_id)?;
 337
 338        for (project_id, project) in removed_connection.hosted_projects {
 339            if let Some(share) = project.share {
 340                broadcast(connection_id, share.guests.keys().copied(), |conn_id| {
 341                    self.peer
 342                        .send(conn_id, proto::UnshareProject { project_id })
 343                });
 344            }
 345        }
 346
 347        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 348            broadcast(connection_id, peer_ids, |conn_id| {
 349                self.peer.send(
 350                    conn_id,
 351                    proto::RemoveProjectCollaborator {
 352                        project_id,
 353                        peer_id: connection_id.0,
 354                    },
 355                )
 356            });
 357        }
 358
 359        self.update_user_contacts(removed_connection.user_id)
 360            .await?;
 361
 362        Ok(())
 363    }
 364
 365    async fn ping(
 366        self: Arc<Server>,
 367        _: TypedEnvelope<proto::Ping>,
 368        response: Response<proto::Ping>,
 369    ) -> Result<()> {
 370        response.send(proto::Ack {})?;
 371        Ok(())
 372    }
 373
 374    async fn register_project(
 375        self: Arc<Server>,
 376        request: TypedEnvelope<proto::RegisterProject>,
 377        response: Response<proto::RegisterProject>,
 378    ) -> Result<()> {
 379        let user_id;
 380        let project_id;
 381        {
 382            let mut state = self.store_mut().await;
 383            user_id = state.user_id_for_connection(request.sender_id)?;
 384            project_id = state.register_project(request.sender_id, user_id);
 385        };
 386        self.update_user_contacts(user_id).await?;
 387        response.send(proto::RegisterProjectResponse { project_id })?;
 388        Ok(())
 389    }
 390
 391    async fn unregister_project(
 392        self: Arc<Server>,
 393        request: TypedEnvelope<proto::UnregisterProject>,
 394    ) -> Result<()> {
 395        let user_id = {
 396            let mut state = self.store_mut().await;
 397            state.unregister_project(request.payload.project_id, request.sender_id)?;
 398            state.user_id_for_connection(request.sender_id)?
 399        };
 400
 401        self.update_user_contacts(user_id).await?;
 402        Ok(())
 403    }
 404
 405    async fn share_project(
 406        self: Arc<Server>,
 407        request: TypedEnvelope<proto::ShareProject>,
 408        response: Response<proto::ShareProject>,
 409    ) -> Result<()> {
 410        let user_id = {
 411            let mut state = self.store_mut().await;
 412            state.share_project(request.payload.project_id, request.sender_id)?;
 413            state.user_id_for_connection(request.sender_id)?
 414        };
 415        self.update_user_contacts(user_id).await?;
 416        response.send(proto::Ack {})?;
 417        Ok(())
 418    }
 419
 420    async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
 421        let contacts = self.app_state.db.get_contacts(user_id).await?;
 422        let store = self.store().await;
 423        let updated_contact = store.contact_for_user(user_id);
 424        for contact_user_id in contacts.current {
 425            for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
 426                self.peer
 427                    .send(
 428                        contact_conn_id,
 429                        proto::UpdateContacts {
 430                            contacts: vec![updated_contact.clone()],
 431                            remove_contacts: Default::default(),
 432                            incoming_requests: Default::default(),
 433                            remove_incoming_requests: Default::default(),
 434                            outgoing_requests: Default::default(),
 435                            remove_outgoing_requests: Default::default(),
 436                        },
 437                    )
 438                    .trace_err();
 439            }
 440        }
 441        Ok(())
 442    }
 443
 444    async fn unshare_project(
 445        self: Arc<Server>,
 446        request: TypedEnvelope<proto::UnshareProject>,
 447    ) -> Result<()> {
 448        let project_id = request.payload.project_id;
 449        let project;
 450        {
 451            let mut state = self.store_mut().await;
 452            project = state.unshare_project(project_id, request.sender_id)?;
 453            broadcast(request.sender_id, project.connection_ids, |conn_id| {
 454                self.peer
 455                    .send(conn_id, proto::UnshareProject { project_id })
 456            });
 457        }
 458        self.update_user_contacts(project.host_user_id).await?;
 459        Ok(())
 460    }
 461
 462    async fn join_project(
 463        self: Arc<Server>,
 464        request: TypedEnvelope<proto::JoinProject>,
 465        response: Response<proto::JoinProject>,
 466    ) -> Result<()> {
 467        let project_id = request.payload.project_id;
 468        let host_user_id;
 469        let guest_user_id;
 470        {
 471            let state = self.store().await;
 472            host_user_id = state.project(project_id)?.host_user_id;
 473            guest_user_id = state.user_id_for_connection(request.sender_id)?;
 474        };
 475
 476        let guest_contacts = self.app_state.db.get_contacts(guest_user_id).await?;
 477        if !guest_contacts.current.contains(&host_user_id) {
 478            return Err(anyhow!("no such project"))?;
 479        }
 480
 481        {
 482            let state = &mut *self.store_mut().await;
 483            let joined = state.join_project(request.sender_id, guest_user_id, project_id)?;
 484            let share = joined.project.share()?;
 485            let peer_count = share.guests.len();
 486            let mut collaborators = Vec::with_capacity(peer_count);
 487            collaborators.push(proto::Collaborator {
 488                peer_id: joined.project.host_connection_id.0,
 489                replica_id: 0,
 490                user_id: joined.project.host_user_id.to_proto(),
 491            });
 492            let worktrees = share
 493                .worktrees
 494                .iter()
 495                .filter_map(|(id, shared_worktree)| {
 496                    let worktree = joined.project.worktrees.get(&id)?;
 497                    Some(proto::Worktree {
 498                        id: *id,
 499                        root_name: worktree.root_name.clone(),
 500                        entries: shared_worktree.entries.values().cloned().collect(),
 501                        diagnostic_summaries: shared_worktree
 502                            .diagnostic_summaries
 503                            .values()
 504                            .cloned()
 505                            .collect(),
 506                        visible: worktree.visible,
 507                        scan_id: shared_worktree.scan_id,
 508                    })
 509                })
 510                .collect();
 511            for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 512                if *peer_conn_id != request.sender_id {
 513                    collaborators.push(proto::Collaborator {
 514                        peer_id: peer_conn_id.0,
 515                        replica_id: *peer_replica_id as u32,
 516                        user_id: peer_user_id.to_proto(),
 517                    });
 518                }
 519            }
 520            broadcast(
 521                request.sender_id,
 522                joined.project.connection_ids(),
 523                |conn_id| {
 524                    self.peer.send(
 525                        conn_id,
 526                        proto::AddProjectCollaborator {
 527                            project_id,
 528                            collaborator: Some(proto::Collaborator {
 529                                peer_id: request.sender_id.0,
 530                                replica_id: joined.replica_id as u32,
 531                                user_id: guest_user_id.to_proto(),
 532                            }),
 533                        },
 534                    )
 535                },
 536            );
 537            response.send(proto::JoinProjectResponse {
 538                worktrees,
 539                replica_id: joined.replica_id as u32,
 540                collaborators,
 541                language_servers: joined.project.language_servers.clone(),
 542            })?;
 543        }
 544        self.update_user_contacts(host_user_id).await?;
 545        Ok(())
 546    }
 547
 548    async fn leave_project(
 549        self: Arc<Server>,
 550        request: TypedEnvelope<proto::LeaveProject>,
 551    ) -> Result<()> {
 552        let sender_id = request.sender_id;
 553        let project_id = request.payload.project_id;
 554        let project;
 555        {
 556            let mut state = self.store_mut().await;
 557            project = state.leave_project(sender_id, project_id)?;
 558            broadcast(sender_id, project.connection_ids, |conn_id| {
 559                self.peer.send(
 560                    conn_id,
 561                    proto::RemoveProjectCollaborator {
 562                        project_id,
 563                        peer_id: sender_id.0,
 564                    },
 565                )
 566            });
 567        }
 568        self.update_user_contacts(project.host_user_id).await?;
 569        Ok(())
 570    }
 571
 572    async fn register_worktree(
 573        self: Arc<Server>,
 574        request: TypedEnvelope<proto::RegisterWorktree>,
 575        response: Response<proto::RegisterWorktree>,
 576    ) -> Result<()> {
 577        let host_user_id;
 578        {
 579            let mut state = self.store_mut().await;
 580            host_user_id = state.user_id_for_connection(request.sender_id)?;
 581
 582            let guest_connection_ids = state
 583                .read_project(request.payload.project_id, request.sender_id)?
 584                .guest_connection_ids();
 585            state.register_worktree(
 586                request.payload.project_id,
 587                request.payload.worktree_id,
 588                request.sender_id,
 589                Worktree {
 590                    root_name: request.payload.root_name.clone(),
 591                    visible: request.payload.visible,
 592                },
 593            )?;
 594
 595            broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 596                self.peer
 597                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 598            });
 599        }
 600        self.update_user_contacts(host_user_id).await?;
 601        response.send(proto::Ack {})?;
 602        Ok(())
 603    }
 604
 605    async fn unregister_worktree(
 606        self: Arc<Server>,
 607        request: TypedEnvelope<proto::UnregisterWorktree>,
 608    ) -> Result<()> {
 609        let host_user_id;
 610        let project_id = request.payload.project_id;
 611        let worktree_id = request.payload.worktree_id;
 612        {
 613            let mut state = self.store_mut().await;
 614            let (_, guest_connection_ids) =
 615                state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 616            host_user_id = state.user_id_for_connection(request.sender_id)?;
 617            broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 618                self.peer.send(
 619                    conn_id,
 620                    proto::UnregisterWorktree {
 621                        project_id,
 622                        worktree_id,
 623                    },
 624                )
 625            });
 626        }
 627        self.update_user_contacts(host_user_id).await?;
 628        Ok(())
 629    }
 630
 631    async fn update_worktree(
 632        self: Arc<Server>,
 633        request: TypedEnvelope<proto::UpdateWorktree>,
 634        response: Response<proto::UpdateWorktree>,
 635    ) -> Result<()> {
 636        let connection_ids = self.store_mut().await.update_worktree(
 637            request.sender_id,
 638            request.payload.project_id,
 639            request.payload.worktree_id,
 640            &request.payload.removed_entries,
 641            &request.payload.updated_entries,
 642            request.payload.scan_id,
 643        )?;
 644
 645        broadcast(request.sender_id, connection_ids, |connection_id| {
 646            self.peer
 647                .forward_send(request.sender_id, connection_id, request.payload.clone())
 648        });
 649        response.send(proto::Ack {})?;
 650        Ok(())
 651    }
 652
 653    async fn update_diagnostic_summary(
 654        self: Arc<Server>,
 655        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 656    ) -> Result<()> {
 657        let summary = request
 658            .payload
 659            .summary
 660            .clone()
 661            .ok_or_else(|| anyhow!("invalid summary"))?;
 662        let receiver_ids = self.store_mut().await.update_diagnostic_summary(
 663            request.payload.project_id,
 664            request.payload.worktree_id,
 665            request.sender_id,
 666            summary,
 667        )?;
 668
 669        broadcast(request.sender_id, receiver_ids, |connection_id| {
 670            self.peer
 671                .forward_send(request.sender_id, connection_id, request.payload.clone())
 672        });
 673        Ok(())
 674    }
 675
 676    async fn start_language_server(
 677        self: Arc<Server>,
 678        request: TypedEnvelope<proto::StartLanguageServer>,
 679    ) -> Result<()> {
 680        let receiver_ids = self.store_mut().await.start_language_server(
 681            request.payload.project_id,
 682            request.sender_id,
 683            request
 684                .payload
 685                .server
 686                .clone()
 687                .ok_or_else(|| anyhow!("invalid language server"))?,
 688        )?;
 689        broadcast(request.sender_id, receiver_ids, |connection_id| {
 690            self.peer
 691                .forward_send(request.sender_id, connection_id, request.payload.clone())
 692        });
 693        Ok(())
 694    }
 695
 696    async fn update_language_server(
 697        self: Arc<Server>,
 698        request: TypedEnvelope<proto::UpdateLanguageServer>,
 699    ) -> Result<()> {
 700        let receiver_ids = self
 701            .store()
 702            .await
 703            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 704        broadcast(request.sender_id, receiver_ids, |connection_id| {
 705            self.peer
 706                .forward_send(request.sender_id, connection_id, request.payload.clone())
 707        });
 708        Ok(())
 709    }
 710
 711    async fn forward_project_request<T>(
 712        self: Arc<Server>,
 713        request: TypedEnvelope<T>,
 714        response: Response<T>,
 715    ) -> Result<()>
 716    where
 717        T: EntityMessage + RequestMessage,
 718    {
 719        let host_connection_id = self
 720            .store()
 721            .await
 722            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 723            .host_connection_id;
 724
 725        response.send(
 726            self.peer
 727                .forward_request(request.sender_id, host_connection_id, request.payload)
 728                .await?,
 729        )?;
 730        Ok(())
 731    }
 732
 733    async fn save_buffer(
 734        self: Arc<Server>,
 735        request: TypedEnvelope<proto::SaveBuffer>,
 736        response: Response<proto::SaveBuffer>,
 737    ) -> Result<()> {
 738        let host = self
 739            .store()
 740            .await
 741            .read_project(request.payload.project_id, request.sender_id)?
 742            .host_connection_id;
 743        let response_payload = self
 744            .peer
 745            .forward_request(request.sender_id, host, request.payload.clone())
 746            .await?;
 747
 748        let mut guests = self
 749            .store()
 750            .await
 751            .read_project(request.payload.project_id, request.sender_id)?
 752            .connection_ids();
 753        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 754        broadcast(host, guests, |conn_id| {
 755            self.peer
 756                .forward_send(host, conn_id, response_payload.clone())
 757        });
 758        response.send(response_payload)?;
 759        Ok(())
 760    }
 761
 762    async fn update_buffer(
 763        self: Arc<Server>,
 764        request: TypedEnvelope<proto::UpdateBuffer>,
 765        response: Response<proto::UpdateBuffer>,
 766    ) -> Result<()> {
 767        let receiver_ids = self
 768            .store()
 769            .await
 770            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 771        broadcast(request.sender_id, receiver_ids, |connection_id| {
 772            self.peer
 773                .forward_send(request.sender_id, connection_id, request.payload.clone())
 774        });
 775        response.send(proto::Ack {})?;
 776        Ok(())
 777    }
 778
 779    async fn update_buffer_file(
 780        self: Arc<Server>,
 781        request: TypedEnvelope<proto::UpdateBufferFile>,
 782    ) -> Result<()> {
 783        let receiver_ids = self
 784            .store()
 785            .await
 786            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 787        broadcast(request.sender_id, receiver_ids, |connection_id| {
 788            self.peer
 789                .forward_send(request.sender_id, connection_id, request.payload.clone())
 790        });
 791        Ok(())
 792    }
 793
 794    async fn buffer_reloaded(
 795        self: Arc<Server>,
 796        request: TypedEnvelope<proto::BufferReloaded>,
 797    ) -> Result<()> {
 798        let receiver_ids = self
 799            .store()
 800            .await
 801            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 802        broadcast(request.sender_id, receiver_ids, |connection_id| {
 803            self.peer
 804                .forward_send(request.sender_id, connection_id, request.payload.clone())
 805        });
 806        Ok(())
 807    }
 808
 809    async fn buffer_saved(
 810        self: Arc<Server>,
 811        request: TypedEnvelope<proto::BufferSaved>,
 812    ) -> Result<()> {
 813        let receiver_ids = self
 814            .store()
 815            .await
 816            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 817        broadcast(request.sender_id, receiver_ids, |connection_id| {
 818            self.peer
 819                .forward_send(request.sender_id, connection_id, request.payload.clone())
 820        });
 821        Ok(())
 822    }
 823
 824    async fn follow(
 825        self: Arc<Self>,
 826        request: TypedEnvelope<proto::Follow>,
 827        response: Response<proto::Follow>,
 828    ) -> Result<()> {
 829        let leader_id = ConnectionId(request.payload.leader_id);
 830        let follower_id = request.sender_id;
 831        if !self
 832            .store()
 833            .await
 834            .project_connection_ids(request.payload.project_id, follower_id)?
 835            .contains(&leader_id)
 836        {
 837            Err(anyhow!("no such peer"))?;
 838        }
 839        let mut response_payload = self
 840            .peer
 841            .forward_request(request.sender_id, leader_id, request.payload)
 842            .await?;
 843        response_payload
 844            .views
 845            .retain(|view| view.leader_id != Some(follower_id.0));
 846        response.send(response_payload)?;
 847        Ok(())
 848    }
 849
 850    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 851        let leader_id = ConnectionId(request.payload.leader_id);
 852        if !self
 853            .store()
 854            .await
 855            .project_connection_ids(request.payload.project_id, request.sender_id)?
 856            .contains(&leader_id)
 857        {
 858            Err(anyhow!("no such peer"))?;
 859        }
 860        self.peer
 861            .forward_send(request.sender_id, leader_id, request.payload)?;
 862        Ok(())
 863    }
 864
 865    async fn update_followers(
 866        self: Arc<Self>,
 867        request: TypedEnvelope<proto::UpdateFollowers>,
 868    ) -> Result<()> {
 869        let connection_ids = self
 870            .store()
 871            .await
 872            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 873        let leader_id = request
 874            .payload
 875            .variant
 876            .as_ref()
 877            .and_then(|variant| match variant {
 878                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 879                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 880                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 881            });
 882        for follower_id in &request.payload.follower_ids {
 883            let follower_id = ConnectionId(*follower_id);
 884            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 885                self.peer
 886                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 887            }
 888        }
 889        Ok(())
 890    }
 891
 892    async fn get_channels(
 893        self: Arc<Server>,
 894        request: TypedEnvelope<proto::GetChannels>,
 895        response: Response<proto::GetChannels>,
 896    ) -> Result<()> {
 897        let user_id = self
 898            .store()
 899            .await
 900            .user_id_for_connection(request.sender_id)?;
 901        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 902        response.send(proto::GetChannelsResponse {
 903            channels: channels
 904                .into_iter()
 905                .map(|chan| proto::Channel {
 906                    id: chan.id.to_proto(),
 907                    name: chan.name,
 908                })
 909                .collect(),
 910        })?;
 911        Ok(())
 912    }
 913
 914    async fn get_users(
 915        self: Arc<Server>,
 916        request: TypedEnvelope<proto::GetUsers>,
 917        response: Response<proto::GetUsers>,
 918    ) -> Result<()> {
 919        let user_ids = request
 920            .payload
 921            .user_ids
 922            .into_iter()
 923            .map(UserId::from_proto)
 924            .collect();
 925        let users = self
 926            .app_state
 927            .db
 928            .get_users_by_ids(user_ids)
 929            .await?
 930            .into_iter()
 931            .map(|user| proto::User {
 932                id: user.id.to_proto(),
 933                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 934                github_login: user.github_login,
 935            })
 936            .collect();
 937        response.send(proto::UsersResponse { users })?;
 938        Ok(())
 939    }
 940
 941    async fn fuzzy_search_users(
 942        self: Arc<Server>,
 943        request: TypedEnvelope<proto::FuzzySearchUsers>,
 944        response: Response<proto::FuzzySearchUsers>,
 945    ) -> Result<()> {
 946        let user_id = self
 947            .store()
 948            .await
 949            .user_id_for_connection(request.sender_id)?;
 950        let query = request.payload.query;
 951        let db = &self.app_state.db;
 952        let users = match query.len() {
 953            0 => vec![],
 954            1 | 2 => db
 955                .get_user_by_github_login(&query)
 956                .await?
 957                .into_iter()
 958                .collect(),
 959            _ => db.fuzzy_search_users(&query, 10).await?,
 960        };
 961        let users = users
 962            .into_iter()
 963            .filter(|user| user.id != user_id)
 964            .map(|user| proto::User {
 965                id: user.id.to_proto(),
 966                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 967                github_login: user.github_login,
 968            })
 969            .collect();
 970        response.send(proto::UsersResponse { users })?;
 971        Ok(())
 972    }
 973
 974    async fn request_contact(
 975        self: Arc<Server>,
 976        request: TypedEnvelope<proto::RequestContact>,
 977        response: Response<proto::RequestContact>,
 978    ) -> Result<()> {
 979        let requester_id = self
 980            .store()
 981            .await
 982            .user_id_for_connection(request.sender_id)?;
 983        let responder_id = UserId::from_proto(request.payload.responder_id);
 984        if requester_id == responder_id {
 985            return Err(anyhow!("cannot add yourself as a contact"))?;
 986        }
 987
 988        self.app_state
 989            .db
 990            .send_contact_request(requester_id, responder_id)
 991            .await?;
 992
 993        // Update outgoing contact requests of requester
 994        let mut update = proto::UpdateContacts::default();
 995        update.outgoing_requests.push(responder_id.to_proto());
 996        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
 997            self.peer.send(connection_id, update.clone())?;
 998        }
 999
1000        // Update incoming contact requests of responder
1001        let mut update = proto::UpdateContacts::default();
1002        update
1003            .incoming_requests
1004            .push(proto::IncomingContactRequest {
1005                requester_id: requester_id.to_proto(),
1006                should_notify: true,
1007            });
1008        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1009            self.peer.send(connection_id, update.clone())?;
1010        }
1011
1012        response.send(proto::Ack {})?;
1013        Ok(())
1014    }
1015
1016    async fn respond_to_contact_request(
1017        self: Arc<Server>,
1018        request: TypedEnvelope<proto::RespondToContactRequest>,
1019        response: Response<proto::RespondToContactRequest>,
1020    ) -> Result<()> {
1021        let responder_id = self
1022            .store()
1023            .await
1024            .user_id_for_connection(request.sender_id)?;
1025        let requester_id = UserId::from_proto(request.payload.requester_id);
1026        if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1027            self.app_state
1028                .db
1029                .dismiss_contact_request(responder_id, requester_id)
1030                .await?;
1031        } else {
1032            let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1033            self.app_state
1034                .db
1035                .respond_to_contact_request(responder_id, requester_id, accept)
1036                .await?;
1037
1038            let store = self.store().await;
1039            // Update responder with new contact
1040            let mut update = proto::UpdateContacts::default();
1041            if accept {
1042                update.contacts.push(store.contact_for_user(requester_id));
1043            }
1044            update
1045                .remove_incoming_requests
1046                .push(requester_id.to_proto());
1047            for connection_id in store.connection_ids_for_user(responder_id) {
1048                self.peer.send(connection_id, update.clone())?;
1049            }
1050
1051            // Update requester with new contact
1052            let mut update = proto::UpdateContacts::default();
1053            if accept {
1054                update.contacts.push(store.contact_for_user(responder_id));
1055            }
1056            update
1057                .remove_outgoing_requests
1058                .push(responder_id.to_proto());
1059            for connection_id in store.connection_ids_for_user(requester_id) {
1060                self.peer.send(connection_id, update.clone())?;
1061            }
1062        }
1063
1064        response.send(proto::Ack {})?;
1065        Ok(())
1066    }
1067
1068    async fn remove_contact(
1069        self: Arc<Server>,
1070        request: TypedEnvelope<proto::RemoveContact>,
1071        response: Response<proto::RemoveContact>,
1072    ) -> Result<()> {
1073        let requester_id = self
1074            .store()
1075            .await
1076            .user_id_for_connection(request.sender_id)?;
1077        let responder_id = UserId::from_proto(request.payload.user_id);
1078        self.app_state
1079            .db
1080            .remove_contact(requester_id, responder_id)
1081            .await?;
1082
1083        // Update outgoing contact requests of requester
1084        let mut update = proto::UpdateContacts::default();
1085        update
1086            .remove_outgoing_requests
1087            .push(responder_id.to_proto());
1088        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1089            self.peer.send(connection_id, update.clone())?;
1090        }
1091
1092        // Update incoming contact requests of responder
1093        let mut update = proto::UpdateContacts::default();
1094        update
1095            .remove_incoming_requests
1096            .push(requester_id.to_proto());
1097        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1098            self.peer.send(connection_id, update.clone())?;
1099        }
1100
1101        response.send(proto::Ack {})?;
1102        Ok(())
1103    }
1104
1105    // #[instrument(skip(self, state, user_ids))]
1106    // fn update_contacts_for_users<'a>(
1107    //     self: &Arc<Self>,
1108    //     state: &Store,
1109    //     user_ids: impl IntoIterator<Item = &'a UserId>,
1110    // ) {
1111    //     for user_id in user_ids {
1112    //         let contacts = state.contacts_for_user(*user_id);
1113    //         for connection_id in state.connection_ids_for_user(*user_id) {
1114    //             self.peer
1115    //                 .send(
1116    //                     connection_id,
1117    //                     proto::UpdateContacts {
1118    //                         contacts: contacts.clone(),
1119    //                         pending_requests_from_user_ids: Default::default(),
1120    //                         pending_requests_to_user_ids: Default::default(),
1121    //                     },
1122    //                 )
1123    //                 .trace_err();
1124    //         }
1125    //     }
1126    // }
1127
1128    async fn join_channel(
1129        self: Arc<Self>,
1130        request: TypedEnvelope<proto::JoinChannel>,
1131        response: Response<proto::JoinChannel>,
1132    ) -> Result<()> {
1133        let user_id = self
1134            .store()
1135            .await
1136            .user_id_for_connection(request.sender_id)?;
1137        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1138        if !self
1139            .app_state
1140            .db
1141            .can_user_access_channel(user_id, channel_id)
1142            .await?
1143        {
1144            Err(anyhow!("access denied"))?;
1145        }
1146
1147        self.store_mut()
1148            .await
1149            .join_channel(request.sender_id, channel_id);
1150        let messages = self
1151            .app_state
1152            .db
1153            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1154            .await?
1155            .into_iter()
1156            .map(|msg| proto::ChannelMessage {
1157                id: msg.id.to_proto(),
1158                body: msg.body,
1159                timestamp: msg.sent_at.unix_timestamp() as u64,
1160                sender_id: msg.sender_id.to_proto(),
1161                nonce: Some(msg.nonce.as_u128().into()),
1162            })
1163            .collect::<Vec<_>>();
1164        response.send(proto::JoinChannelResponse {
1165            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1166            messages,
1167        })?;
1168        Ok(())
1169    }
1170
1171    async fn leave_channel(
1172        self: Arc<Self>,
1173        request: TypedEnvelope<proto::LeaveChannel>,
1174    ) -> Result<()> {
1175        let user_id = self
1176            .store()
1177            .await
1178            .user_id_for_connection(request.sender_id)?;
1179        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1180        if !self
1181            .app_state
1182            .db
1183            .can_user_access_channel(user_id, channel_id)
1184            .await?
1185        {
1186            Err(anyhow!("access denied"))?;
1187        }
1188
1189        self.store_mut()
1190            .await
1191            .leave_channel(request.sender_id, channel_id);
1192
1193        Ok(())
1194    }
1195
1196    async fn send_channel_message(
1197        self: Arc<Self>,
1198        request: TypedEnvelope<proto::SendChannelMessage>,
1199        response: Response<proto::SendChannelMessage>,
1200    ) -> Result<()> {
1201        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1202        let user_id;
1203        let connection_ids;
1204        {
1205            let state = self.store().await;
1206            user_id = state.user_id_for_connection(request.sender_id)?;
1207            connection_ids = state.channel_connection_ids(channel_id)?;
1208        }
1209
1210        // Validate the message body.
1211        let body = request.payload.body.trim().to_string();
1212        if body.len() > MAX_MESSAGE_LEN {
1213            return Err(anyhow!("message is too long"))?;
1214        }
1215        if body.is_empty() {
1216            return Err(anyhow!("message can't be blank"))?;
1217        }
1218
1219        let timestamp = OffsetDateTime::now_utc();
1220        let nonce = request
1221            .payload
1222            .nonce
1223            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1224
1225        let message_id = self
1226            .app_state
1227            .db
1228            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1229            .await?
1230            .to_proto();
1231        let message = proto::ChannelMessage {
1232            sender_id: user_id.to_proto(),
1233            id: message_id,
1234            body,
1235            timestamp: timestamp.unix_timestamp() as u64,
1236            nonce: Some(nonce),
1237        };
1238        broadcast(request.sender_id, connection_ids, |conn_id| {
1239            self.peer.send(
1240                conn_id,
1241                proto::ChannelMessageSent {
1242                    channel_id: channel_id.to_proto(),
1243                    message: Some(message.clone()),
1244                },
1245            )
1246        });
1247        response.send(proto::SendChannelMessageResponse {
1248            message: Some(message),
1249        })?;
1250        Ok(())
1251    }
1252
1253    async fn get_channel_messages(
1254        self: Arc<Self>,
1255        request: TypedEnvelope<proto::GetChannelMessages>,
1256        response: Response<proto::GetChannelMessages>,
1257    ) -> Result<()> {
1258        let user_id = self
1259            .store()
1260            .await
1261            .user_id_for_connection(request.sender_id)?;
1262        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1263        if !self
1264            .app_state
1265            .db
1266            .can_user_access_channel(user_id, channel_id)
1267            .await?
1268        {
1269            Err(anyhow!("access denied"))?;
1270        }
1271
1272        let messages = self
1273            .app_state
1274            .db
1275            .get_channel_messages(
1276                channel_id,
1277                MESSAGE_COUNT_PER_PAGE,
1278                Some(MessageId::from_proto(request.payload.before_message_id)),
1279            )
1280            .await?
1281            .into_iter()
1282            .map(|msg| proto::ChannelMessage {
1283                id: msg.id.to_proto(),
1284                body: msg.body,
1285                timestamp: msg.sent_at.unix_timestamp() as u64,
1286                sender_id: msg.sender_id.to_proto(),
1287                nonce: Some(msg.nonce.as_u128().into()),
1288            })
1289            .collect::<Vec<_>>();
1290        response.send(proto::GetChannelMessagesResponse {
1291            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1292            messages,
1293        })?;
1294        Ok(())
1295    }
1296
1297    async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1298        #[cfg(test)]
1299        tokio::task::yield_now().await;
1300        let guard = self.store.read().await;
1301        #[cfg(test)]
1302        tokio::task::yield_now().await;
1303        StoreReadGuard {
1304            guard,
1305            _not_send: PhantomData,
1306        }
1307    }
1308
1309    async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1310        #[cfg(test)]
1311        tokio::task::yield_now().await;
1312        let guard = self.store.write().await;
1313        #[cfg(test)]
1314        tokio::task::yield_now().await;
1315        StoreWriteGuard {
1316            guard,
1317            _not_send: PhantomData,
1318        }
1319    }
1320}
1321
1322impl<'a> Deref for StoreReadGuard<'a> {
1323    type Target = Store;
1324
1325    fn deref(&self) -> &Self::Target {
1326        &*self.guard
1327    }
1328}
1329
1330impl<'a> Deref for StoreWriteGuard<'a> {
1331    type Target = Store;
1332
1333    fn deref(&self) -> &Self::Target {
1334        &*self.guard
1335    }
1336}
1337
1338impl<'a> DerefMut for StoreWriteGuard<'a> {
1339    fn deref_mut(&mut self) -> &mut Self::Target {
1340        &mut *self.guard
1341    }
1342}
1343
1344impl<'a> Drop for StoreWriteGuard<'a> {
1345    fn drop(&mut self) {
1346        #[cfg(test)]
1347        self.check_invariants();
1348
1349        let metrics = self.metrics();
1350        tracing::info!(
1351            connections = metrics.connections,
1352            registered_projects = metrics.registered_projects,
1353            shared_projects = metrics.shared_projects,
1354            "metrics"
1355        );
1356    }
1357}
1358
1359impl Executor for RealExecutor {
1360    type Sleep = Sleep;
1361
1362    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1363        tokio::task::spawn(future);
1364    }
1365
1366    fn sleep(&self, duration: Duration) -> Self::Sleep {
1367        tokio::time::sleep(duration)
1368    }
1369}
1370
1371fn broadcast<F>(
1372    sender_id: ConnectionId,
1373    receiver_ids: impl IntoIterator<Item = ConnectionId>,
1374    mut f: F,
1375) where
1376    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1377{
1378    for receiver_id in receiver_ids {
1379        if receiver_id != sender_id {
1380            f(receiver_id).trace_err();
1381        }
1382    }
1383}
1384
1385lazy_static! {
1386    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1387}
1388
1389pub struct ProtocolVersion(u32);
1390
1391impl Header for ProtocolVersion {
1392    fn name() -> &'static HeaderName {
1393        &ZED_PROTOCOL_VERSION
1394    }
1395
1396    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1397    where
1398        Self: Sized,
1399        I: Iterator<Item = &'i axum::http::HeaderValue>,
1400    {
1401        let version = values
1402            .next()
1403            .ok_or_else(|| axum::headers::Error::invalid())?
1404            .to_str()
1405            .map_err(|_| axum::headers::Error::invalid())?
1406            .parse()
1407            .map_err(|_| axum::headers::Error::invalid())?;
1408        Ok(Self(version))
1409    }
1410
1411    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1412        values.extend([self.0.to_string().parse().unwrap()]);
1413    }
1414}
1415
1416pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1417    let server = Server::new(app_state.clone(), None);
1418    Router::new()
1419        .route("/rpc", get(handle_websocket_request))
1420        .layer(
1421            ServiceBuilder::new()
1422                .layer(Extension(app_state))
1423                .layer(middleware::from_fn(auth::validate_header))
1424                .layer(Extension(server)),
1425        )
1426}
1427
1428pub async fn handle_websocket_request(
1429    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1430    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1431    Extension(server): Extension<Arc<Server>>,
1432    Extension(user_id): Extension<UserId>,
1433    ws: WebSocketUpgrade,
1434) -> axum::response::Response {
1435    if protocol_version != rpc::PROTOCOL_VERSION {
1436        return (
1437            StatusCode::UPGRADE_REQUIRED,
1438            "client must be upgraded".to_string(),
1439        )
1440            .into_response();
1441    }
1442    let socket_address = socket_address.to_string();
1443    ws.on_upgrade(move |socket| {
1444        use util::ResultExt;
1445        let socket = socket
1446            .map_ok(to_tungstenite_message)
1447            .err_into()
1448            .with(|message| async move { Ok(to_axum_message(message)) });
1449        let connection = Connection::new(Box::pin(socket));
1450        async move {
1451            server
1452                .handle_connection(connection, socket_address, user_id, None, RealExecutor)
1453                .await
1454                .log_err();
1455        }
1456    })
1457}
1458
1459fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1460    match message {
1461        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1462        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1463        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1464        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1465        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1466            code: frame.code.into(),
1467            reason: frame.reason,
1468        })),
1469    }
1470}
1471
1472fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1473    match message {
1474        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1475        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1476        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1477        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1478        AxumMessage::Close(frame) => {
1479            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1480                code: frame.code.into(),
1481                reason: frame.reason,
1482            }))
1483        }
1484    }
1485}
1486
1487pub trait ResultExt {
1488    type Ok;
1489
1490    fn trace_err(self) -> Option<Self::Ok>;
1491}
1492
1493impl<T, E> ResultExt for Result<T, E>
1494where
1495    E: std::fmt::Debug,
1496{
1497    type Ok = T;
1498
1499    fn trace_err(self) -> Option<T> {
1500        match self {
1501            Ok(value) => Some(value),
1502            Err(error) => {
1503                tracing::error!("{:?}", error);
1504                None
1505            }
1506        }
1507    }
1508}
1509
1510#[cfg(test)]
1511mod tests {
1512    use super::*;
1513    use crate::{
1514        db::{tests::TestDb, UserId},
1515        AppState,
1516    };
1517    use ::rpc::Peer;
1518    use client::{
1519        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1520        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1521    };
1522    use collections::{BTreeMap, HashSet};
1523    use editor::{
1524        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1525        ToOffset, ToggleCodeActions, Undo,
1526    };
1527    use gpui::{
1528        executor::{self, Deterministic},
1529        geometry::vector::vec2f,
1530        ModelHandle, TestAppContext, ViewHandle,
1531    };
1532    use language::{
1533        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1534        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1535    };
1536    use lsp::{self, FakeLanguageServer};
1537    use parking_lot::Mutex;
1538    use project::{
1539        fs::{FakeFs, Fs as _},
1540        search::SearchQuery,
1541        worktree::WorktreeHandle,
1542        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1543    };
1544    use rand::prelude::*;
1545    use rpc::PeerId;
1546    use serde_json::json;
1547    use settings::Settings;
1548    use sqlx::types::time::OffsetDateTime;
1549    use std::{
1550        env,
1551        ops::Deref,
1552        path::{Path, PathBuf},
1553        rc::Rc,
1554        sync::{
1555            atomic::{AtomicBool, Ordering::SeqCst},
1556            Arc,
1557        },
1558        time::Duration,
1559    };
1560    use theme::ThemeRegistry;
1561    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1562
1563    #[cfg(test)]
1564    #[ctor::ctor]
1565    fn init_logger() {
1566        if std::env::var("RUST_LOG").is_ok() {
1567            env_logger::init();
1568        }
1569    }
1570
1571    #[gpui::test(iterations = 10)]
1572    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1573        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1574        let lang_registry = Arc::new(LanguageRegistry::test());
1575        let fs = FakeFs::new(cx_a.background());
1576        cx_a.foreground().forbid_parking();
1577
1578        // Connect to a server as 2 clients.
1579        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1580        let client_a = server.create_client(cx_a, "user_a").await;
1581        let client_b = server.create_client(cx_b, "user_b").await;
1582        server
1583            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1584            .await;
1585
1586        // Share a project as client A
1587        fs.insert_tree(
1588            "/a",
1589            json!({
1590                "a.txt": "a-contents",
1591                "b.txt": "b-contents",
1592            }),
1593        )
1594        .await;
1595        let project_a = cx_a.update(|cx| {
1596            Project::local(
1597                client_a.clone(),
1598                client_a.user_store.clone(),
1599                lang_registry.clone(),
1600                fs.clone(),
1601                cx,
1602            )
1603        });
1604        let (worktree_a, _) = project_a
1605            .update(cx_a, |p, cx| {
1606                p.find_or_create_local_worktree("/a", true, cx)
1607            })
1608            .await
1609            .unwrap();
1610        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1611        worktree_a
1612            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1613            .await;
1614        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1615        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1616
1617        // Join that project as client B
1618        let project_b = Project::remote(
1619            project_id,
1620            client_b.clone(),
1621            client_b.user_store.clone(),
1622            lang_registry.clone(),
1623            fs.clone(),
1624            &mut cx_b.to_async(),
1625        )
1626        .await
1627        .unwrap();
1628
1629        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1630            assert_eq!(
1631                project
1632                    .collaborators()
1633                    .get(&client_a.peer_id)
1634                    .unwrap()
1635                    .user
1636                    .github_login,
1637                "user_a"
1638            );
1639            project.replica_id()
1640        });
1641        project_a
1642            .condition(&cx_a, |tree, _| {
1643                tree.collaborators()
1644                    .get(&client_b.peer_id)
1645                    .map_or(false, |collaborator| {
1646                        collaborator.replica_id == replica_id_b
1647                            && collaborator.user.github_login == "user_b"
1648                    })
1649            })
1650            .await;
1651
1652        // Open the same file as client B and client A.
1653        let buffer_b = project_b
1654            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1655            .await
1656            .unwrap();
1657        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1658        project_a.read_with(cx_a, |project, cx| {
1659            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1660        });
1661        let buffer_a = project_a
1662            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1663            .await
1664            .unwrap();
1665
1666        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1667
1668        // TODO
1669        // // Create a selection set as client B and see that selection set as client A.
1670        // buffer_a
1671        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1672        //     .await;
1673
1674        // Edit the buffer as client B and see that edit as client A.
1675        editor_b.update(cx_b, |editor, cx| {
1676            editor.handle_input(&Input("ok, ".into()), cx)
1677        });
1678        buffer_a
1679            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1680            .await;
1681
1682        // TODO
1683        // // Remove the selection set as client B, see those selections disappear as client A.
1684        cx_b.update(move |_| drop(editor_b));
1685        // buffer_a
1686        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1687        //     .await;
1688
1689        // Dropping the client B's project removes client B from client A's collaborators.
1690        cx_b.update(move |_| drop(project_b));
1691        project_a
1692            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1693            .await;
1694    }
1695
1696    #[gpui::test(iterations = 10)]
1697    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1698        let lang_registry = Arc::new(LanguageRegistry::test());
1699        let fs = FakeFs::new(cx_a.background());
1700        cx_a.foreground().forbid_parking();
1701
1702        // Connect to a server as 2 clients.
1703        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1704        let client_a = server.create_client(cx_a, "user_a").await;
1705        let client_b = server.create_client(cx_b, "user_b").await;
1706        server
1707            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1708            .await;
1709
1710        // Share a project as client A
1711        fs.insert_tree(
1712            "/a",
1713            json!({
1714                "a.txt": "a-contents",
1715                "b.txt": "b-contents",
1716            }),
1717        )
1718        .await;
1719        let project_a = cx_a.update(|cx| {
1720            Project::local(
1721                client_a.clone(),
1722                client_a.user_store.clone(),
1723                lang_registry.clone(),
1724                fs.clone(),
1725                cx,
1726            )
1727        });
1728        let (worktree_a, _) = project_a
1729            .update(cx_a, |p, cx| {
1730                p.find_or_create_local_worktree("/a", true, cx)
1731            })
1732            .await
1733            .unwrap();
1734        worktree_a
1735            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1736            .await;
1737        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1738        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1739        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1740        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1741
1742        // Join that project as client B
1743        let project_b = Project::remote(
1744            project_id,
1745            client_b.clone(),
1746            client_b.user_store.clone(),
1747            lang_registry.clone(),
1748            fs.clone(),
1749            &mut cx_b.to_async(),
1750        )
1751        .await
1752        .unwrap();
1753        project_b
1754            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1755            .await
1756            .unwrap();
1757
1758        // Unshare the project as client A
1759        project_a.update(cx_a, |project, cx| project.unshare(cx));
1760        project_b
1761            .condition(cx_b, |project, _| project.is_read_only())
1762            .await;
1763        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1764        cx_b.update(|_| {
1765            drop(project_b);
1766        });
1767
1768        // Share the project again and ensure guests can still join.
1769        project_a
1770            .update(cx_a, |project, cx| project.share(cx))
1771            .await
1772            .unwrap();
1773        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1774
1775        let project_b2 = Project::remote(
1776            project_id,
1777            client_b.clone(),
1778            client_b.user_store.clone(),
1779            lang_registry.clone(),
1780            fs.clone(),
1781            &mut cx_b.to_async(),
1782        )
1783        .await
1784        .unwrap();
1785        project_b2
1786            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1787            .await
1788            .unwrap();
1789    }
1790
1791    #[gpui::test(iterations = 10)]
1792    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1793        let lang_registry = Arc::new(LanguageRegistry::test());
1794        let fs = FakeFs::new(cx_a.background());
1795        cx_a.foreground().forbid_parking();
1796
1797        // Connect to a server as 2 clients.
1798        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1799        let client_a = server.create_client(cx_a, "user_a").await;
1800        let client_b = server.create_client(cx_b, "user_b").await;
1801        server
1802            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1803            .await;
1804
1805        // Share a project as client A
1806        fs.insert_tree(
1807            "/a",
1808            json!({
1809                "a.txt": "a-contents",
1810                "b.txt": "b-contents",
1811            }),
1812        )
1813        .await;
1814        let project_a = cx_a.update(|cx| {
1815            Project::local(
1816                client_a.clone(),
1817                client_a.user_store.clone(),
1818                lang_registry.clone(),
1819                fs.clone(),
1820                cx,
1821            )
1822        });
1823        let (worktree_a, _) = project_a
1824            .update(cx_a, |p, cx| {
1825                p.find_or_create_local_worktree("/a", true, cx)
1826            })
1827            .await
1828            .unwrap();
1829        worktree_a
1830            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1831            .await;
1832        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1833        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1834        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1835        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1836
1837        // Join that project as client B
1838        let project_b = Project::remote(
1839            project_id,
1840            client_b.clone(),
1841            client_b.user_store.clone(),
1842            lang_registry.clone(),
1843            fs.clone(),
1844            &mut cx_b.to_async(),
1845        )
1846        .await
1847        .unwrap();
1848        project_b
1849            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1850            .await
1851            .unwrap();
1852
1853        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1854        server.disconnect_client(client_a.current_user_id(cx_a));
1855        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1856        project_a
1857            .condition(cx_a, |project, _| project.collaborators().is_empty())
1858            .await;
1859        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1860        project_b
1861            .condition(cx_b, |project, _| project.is_read_only())
1862            .await;
1863        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1864        cx_b.update(|_| {
1865            drop(project_b);
1866        });
1867
1868        // Await reconnection
1869        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1870
1871        // Share the project again and ensure guests can still join.
1872        project_a
1873            .update(cx_a, |project, cx| project.share(cx))
1874            .await
1875            .unwrap();
1876        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1877
1878        let project_b2 = Project::remote(
1879            project_id,
1880            client_b.clone(),
1881            client_b.user_store.clone(),
1882            lang_registry.clone(),
1883            fs.clone(),
1884            &mut cx_b.to_async(),
1885        )
1886        .await
1887        .unwrap();
1888        project_b2
1889            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1890            .await
1891            .unwrap();
1892    }
1893
1894    #[gpui::test(iterations = 10)]
1895    async fn test_propagate_saves_and_fs_changes(
1896        cx_a: &mut TestAppContext,
1897        cx_b: &mut TestAppContext,
1898        cx_c: &mut TestAppContext,
1899    ) {
1900        let lang_registry = Arc::new(LanguageRegistry::test());
1901        let fs = FakeFs::new(cx_a.background());
1902        cx_a.foreground().forbid_parking();
1903
1904        // Connect to a server as 3 clients.
1905        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1906        let client_a = server.create_client(cx_a, "user_a").await;
1907        let client_b = server.create_client(cx_b, "user_b").await;
1908        let client_c = server.create_client(cx_c, "user_c").await;
1909        server
1910            .make_contacts(vec![
1911                (&client_a, cx_a),
1912                (&client_b, cx_b),
1913                (&client_c, cx_c),
1914            ])
1915            .await;
1916
1917        // Share a worktree as client A.
1918        fs.insert_tree(
1919            "/a",
1920            json!({
1921                "file1": "",
1922                "file2": ""
1923            }),
1924        )
1925        .await;
1926        let project_a = cx_a.update(|cx| {
1927            Project::local(
1928                client_a.clone(),
1929                client_a.user_store.clone(),
1930                lang_registry.clone(),
1931                fs.clone(),
1932                cx,
1933            )
1934        });
1935        let (worktree_a, _) = project_a
1936            .update(cx_a, |p, cx| {
1937                p.find_or_create_local_worktree("/a", true, cx)
1938            })
1939            .await
1940            .unwrap();
1941        worktree_a
1942            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1943            .await;
1944        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1945        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1946        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1947
1948        // Join that worktree as clients B and C.
1949        let project_b = Project::remote(
1950            project_id,
1951            client_b.clone(),
1952            client_b.user_store.clone(),
1953            lang_registry.clone(),
1954            fs.clone(),
1955            &mut cx_b.to_async(),
1956        )
1957        .await
1958        .unwrap();
1959        let project_c = Project::remote(
1960            project_id,
1961            client_c.clone(),
1962            client_c.user_store.clone(),
1963            lang_registry.clone(),
1964            fs.clone(),
1965            &mut cx_c.to_async(),
1966        )
1967        .await
1968        .unwrap();
1969        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1970        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1971
1972        // Open and edit a buffer as both guests B and C.
1973        let buffer_b = project_b
1974            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1975            .await
1976            .unwrap();
1977        let buffer_c = project_c
1978            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1979            .await
1980            .unwrap();
1981        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1982        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1983
1984        // Open and edit that buffer as the host.
1985        let buffer_a = project_a
1986            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1987            .await
1988            .unwrap();
1989
1990        buffer_a
1991            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1992            .await;
1993        buffer_a.update(cx_a, |buf, cx| {
1994            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1995        });
1996
1997        // Wait for edits to propagate
1998        buffer_a
1999            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2000            .await;
2001        buffer_b
2002            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2003            .await;
2004        buffer_c
2005            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2006            .await;
2007
2008        // Edit the buffer as the host and concurrently save as guest B.
2009        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2010        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2011        save_b.await.unwrap();
2012        assert_eq!(
2013            fs.load("/a/file1".as_ref()).await.unwrap(),
2014            "hi-a, i-am-c, i-am-b, i-am-a"
2015        );
2016        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2017        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2018        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2019
2020        worktree_a.flush_fs_events(cx_a).await;
2021
2022        // Make changes on host's file system, see those changes on guest worktrees.
2023        fs.rename(
2024            "/a/file1".as_ref(),
2025            "/a/file1-renamed".as_ref(),
2026            Default::default(),
2027        )
2028        .await
2029        .unwrap();
2030
2031        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2032            .await
2033            .unwrap();
2034        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2035
2036        worktree_a
2037            .condition(&cx_a, |tree, _| {
2038                tree.paths()
2039                    .map(|p| p.to_string_lossy())
2040                    .collect::<Vec<_>>()
2041                    == ["file1-renamed", "file3", "file4"]
2042            })
2043            .await;
2044        worktree_b
2045            .condition(&cx_b, |tree, _| {
2046                tree.paths()
2047                    .map(|p| p.to_string_lossy())
2048                    .collect::<Vec<_>>()
2049                    == ["file1-renamed", "file3", "file4"]
2050            })
2051            .await;
2052        worktree_c
2053            .condition(&cx_c, |tree, _| {
2054                tree.paths()
2055                    .map(|p| p.to_string_lossy())
2056                    .collect::<Vec<_>>()
2057                    == ["file1-renamed", "file3", "file4"]
2058            })
2059            .await;
2060
2061        // Ensure buffer files are updated as well.
2062        buffer_a
2063            .condition(&cx_a, |buf, _| {
2064                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2065            })
2066            .await;
2067        buffer_b
2068            .condition(&cx_b, |buf, _| {
2069                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2070            })
2071            .await;
2072        buffer_c
2073            .condition(&cx_c, |buf, _| {
2074                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2075            })
2076            .await;
2077    }
2078
2079    #[gpui::test(iterations = 10)]
2080    async fn test_fs_operations(
2081        executor: Arc<Deterministic>,
2082        cx_a: &mut TestAppContext,
2083        cx_b: &mut TestAppContext,
2084    ) {
2085        executor.forbid_parking();
2086        let fs = FakeFs::new(cx_a.background());
2087
2088        // Connect to a server as 2 clients.
2089        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2090        let mut client_a = server.create_client(cx_a, "user_a").await;
2091        let mut client_b = server.create_client(cx_b, "user_b").await;
2092        server
2093            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2094            .await;
2095
2096        // Share a project as client A
2097        fs.insert_tree(
2098            "/dir",
2099            json!({
2100                "a.txt": "a-contents",
2101                "b.txt": "b-contents",
2102            }),
2103        )
2104        .await;
2105
2106        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2107        let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
2108        project_a
2109            .update(cx_a, |project, cx| project.share(cx))
2110            .await
2111            .unwrap();
2112
2113        let project_b = client_b.build_remote_project(project_id, cx_b).await;
2114
2115        let worktree_a =
2116            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2117        let worktree_b =
2118            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2119
2120        let entry = project_b
2121            .update(cx_b, |project, cx| {
2122                project
2123                    .create_entry((worktree_id, "c.txt"), false, cx)
2124                    .unwrap()
2125            })
2126            .await
2127            .unwrap();
2128        worktree_a.read_with(cx_a, |worktree, _| {
2129            assert_eq!(
2130                worktree
2131                    .paths()
2132                    .map(|p| p.to_string_lossy())
2133                    .collect::<Vec<_>>(),
2134                ["a.txt", "b.txt", "c.txt"]
2135            );
2136        });
2137        worktree_b.read_with(cx_b, |worktree, _| {
2138            assert_eq!(
2139                worktree
2140                    .paths()
2141                    .map(|p| p.to_string_lossy())
2142                    .collect::<Vec<_>>(),
2143                ["a.txt", "b.txt", "c.txt"]
2144            );
2145        });
2146
2147        project_b
2148            .update(cx_b, |project, cx| {
2149                project.rename_entry(entry.id, Path::new("d.txt"), cx)
2150            })
2151            .unwrap()
2152            .await
2153            .unwrap();
2154        worktree_a.read_with(cx_a, |worktree, _| {
2155            assert_eq!(
2156                worktree
2157                    .paths()
2158                    .map(|p| p.to_string_lossy())
2159                    .collect::<Vec<_>>(),
2160                ["a.txt", "b.txt", "d.txt"]
2161            );
2162        });
2163        worktree_b.read_with(cx_b, |worktree, _| {
2164            assert_eq!(
2165                worktree
2166                    .paths()
2167                    .map(|p| p.to_string_lossy())
2168                    .collect::<Vec<_>>(),
2169                ["a.txt", "b.txt", "d.txt"]
2170            );
2171        });
2172
2173        let dir_entry = project_b
2174            .update(cx_b, |project, cx| {
2175                project
2176                    .create_entry((worktree_id, "DIR"), true, cx)
2177                    .unwrap()
2178            })
2179            .await
2180            .unwrap();
2181        worktree_a.read_with(cx_a, |worktree, _| {
2182            assert_eq!(
2183                worktree
2184                    .paths()
2185                    .map(|p| p.to_string_lossy())
2186                    .collect::<Vec<_>>(),
2187                ["DIR", "a.txt", "b.txt", "d.txt"]
2188            );
2189        });
2190        worktree_b.read_with(cx_b, |worktree, _| {
2191            assert_eq!(
2192                worktree
2193                    .paths()
2194                    .map(|p| p.to_string_lossy())
2195                    .collect::<Vec<_>>(),
2196                ["DIR", "a.txt", "b.txt", "d.txt"]
2197            );
2198        });
2199
2200        project_b
2201            .update(cx_b, |project, cx| {
2202                project.delete_entry(dir_entry.id, cx).unwrap()
2203            })
2204            .await
2205            .unwrap();
2206        worktree_a.read_with(cx_a, |worktree, _| {
2207            assert_eq!(
2208                worktree
2209                    .paths()
2210                    .map(|p| p.to_string_lossy())
2211                    .collect::<Vec<_>>(),
2212                ["a.txt", "b.txt", "d.txt"]
2213            );
2214        });
2215        worktree_b.read_with(cx_b, |worktree, _| {
2216            assert_eq!(
2217                worktree
2218                    .paths()
2219                    .map(|p| p.to_string_lossy())
2220                    .collect::<Vec<_>>(),
2221                ["a.txt", "b.txt", "d.txt"]
2222            );
2223        });
2224
2225        project_b
2226            .update(cx_b, |project, cx| {
2227                project.delete_entry(entry.id, cx).unwrap()
2228            })
2229            .await
2230            .unwrap();
2231        worktree_a.read_with(cx_a, |worktree, _| {
2232            assert_eq!(
2233                worktree
2234                    .paths()
2235                    .map(|p| p.to_string_lossy())
2236                    .collect::<Vec<_>>(),
2237                ["a.txt", "b.txt"]
2238            );
2239        });
2240        worktree_b.read_with(cx_b, |worktree, _| {
2241            assert_eq!(
2242                worktree
2243                    .paths()
2244                    .map(|p| p.to_string_lossy())
2245                    .collect::<Vec<_>>(),
2246                ["a.txt", "b.txt"]
2247            );
2248        });
2249    }
2250
2251    #[gpui::test(iterations = 10)]
2252    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2253        cx_a.foreground().forbid_parking();
2254        let lang_registry = Arc::new(LanguageRegistry::test());
2255        let fs = FakeFs::new(cx_a.background());
2256
2257        // Connect to a server as 2 clients.
2258        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2259        let client_a = server.create_client(cx_a, "user_a").await;
2260        let client_b = server.create_client(cx_b, "user_b").await;
2261        server
2262            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2263            .await;
2264
2265        // Share a project as client A
2266        fs.insert_tree(
2267            "/dir",
2268            json!({
2269                "a.txt": "a-contents",
2270            }),
2271        )
2272        .await;
2273
2274        let project_a = cx_a.update(|cx| {
2275            Project::local(
2276                client_a.clone(),
2277                client_a.user_store.clone(),
2278                lang_registry.clone(),
2279                fs.clone(),
2280                cx,
2281            )
2282        });
2283        let (worktree_a, _) = project_a
2284            .update(cx_a, |p, cx| {
2285                p.find_or_create_local_worktree("/dir", true, cx)
2286            })
2287            .await
2288            .unwrap();
2289        worktree_a
2290            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2291            .await;
2292        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2293        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2294        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2295
2296        // Join that project as client B
2297        let project_b = Project::remote(
2298            project_id,
2299            client_b.clone(),
2300            client_b.user_store.clone(),
2301            lang_registry.clone(),
2302            fs.clone(),
2303            &mut cx_b.to_async(),
2304        )
2305        .await
2306        .unwrap();
2307
2308        // Open a buffer as client B
2309        let buffer_b = project_b
2310            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2311            .await
2312            .unwrap();
2313
2314        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2315        buffer_b.read_with(cx_b, |buf, _| {
2316            assert!(buf.is_dirty());
2317            assert!(!buf.has_conflict());
2318        });
2319
2320        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2321        buffer_b
2322            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2323            .await;
2324        buffer_b.read_with(cx_b, |buf, _| {
2325            assert!(!buf.has_conflict());
2326        });
2327
2328        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2329        buffer_b.read_with(cx_b, |buf, _| {
2330            assert!(buf.is_dirty());
2331            assert!(!buf.has_conflict());
2332        });
2333    }
2334
2335    #[gpui::test(iterations = 10)]
2336    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2337        cx_a.foreground().forbid_parking();
2338        let lang_registry = Arc::new(LanguageRegistry::test());
2339        let fs = FakeFs::new(cx_a.background());
2340
2341        // Connect to a server as 2 clients.
2342        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2343        let client_a = server.create_client(cx_a, "user_a").await;
2344        let client_b = server.create_client(cx_b, "user_b").await;
2345        server
2346            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2347            .await;
2348
2349        // Share a project as client A
2350        fs.insert_tree(
2351            "/dir",
2352            json!({
2353                "a.txt": "a-contents",
2354            }),
2355        )
2356        .await;
2357
2358        let project_a = cx_a.update(|cx| {
2359            Project::local(
2360                client_a.clone(),
2361                client_a.user_store.clone(),
2362                lang_registry.clone(),
2363                fs.clone(),
2364                cx,
2365            )
2366        });
2367        let (worktree_a, _) = project_a
2368            .update(cx_a, |p, cx| {
2369                p.find_or_create_local_worktree("/dir", true, cx)
2370            })
2371            .await
2372            .unwrap();
2373        worktree_a
2374            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2375            .await;
2376        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2377        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2378        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2379
2380        // Join that project as client B
2381        let project_b = Project::remote(
2382            project_id,
2383            client_b.clone(),
2384            client_b.user_store.clone(),
2385            lang_registry.clone(),
2386            fs.clone(),
2387            &mut cx_b.to_async(),
2388        )
2389        .await
2390        .unwrap();
2391        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2392
2393        // Open a buffer as client B
2394        let buffer_b = project_b
2395            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2396            .await
2397            .unwrap();
2398        buffer_b.read_with(cx_b, |buf, _| {
2399            assert!(!buf.is_dirty());
2400            assert!(!buf.has_conflict());
2401        });
2402
2403        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2404            .await
2405            .unwrap();
2406        buffer_b
2407            .condition(&cx_b, |buf, _| {
2408                buf.text() == "new contents" && !buf.is_dirty()
2409            })
2410            .await;
2411        buffer_b.read_with(cx_b, |buf, _| {
2412            assert!(!buf.has_conflict());
2413        });
2414    }
2415
2416    #[gpui::test(iterations = 10)]
2417    async fn test_editing_while_guest_opens_buffer(
2418        cx_a: &mut TestAppContext,
2419        cx_b: &mut TestAppContext,
2420    ) {
2421        cx_a.foreground().forbid_parking();
2422        let lang_registry = Arc::new(LanguageRegistry::test());
2423        let fs = FakeFs::new(cx_a.background());
2424
2425        // Connect to a server as 2 clients.
2426        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2427        let client_a = server.create_client(cx_a, "user_a").await;
2428        let client_b = server.create_client(cx_b, "user_b").await;
2429        server
2430            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2431            .await;
2432
2433        // Share a project as client A
2434        fs.insert_tree(
2435            "/dir",
2436            json!({
2437                "a.txt": "a-contents",
2438            }),
2439        )
2440        .await;
2441        let project_a = cx_a.update(|cx| {
2442            Project::local(
2443                client_a.clone(),
2444                client_a.user_store.clone(),
2445                lang_registry.clone(),
2446                fs.clone(),
2447                cx,
2448            )
2449        });
2450        let (worktree_a, _) = project_a
2451            .update(cx_a, |p, cx| {
2452                p.find_or_create_local_worktree("/dir", true, cx)
2453            })
2454            .await
2455            .unwrap();
2456        worktree_a
2457            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2458            .await;
2459        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2460        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2461        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2462
2463        // Join that project as client B
2464        let project_b = Project::remote(
2465            project_id,
2466            client_b.clone(),
2467            client_b.user_store.clone(),
2468            lang_registry.clone(),
2469            fs.clone(),
2470            &mut cx_b.to_async(),
2471        )
2472        .await
2473        .unwrap();
2474
2475        // Open a buffer as client A
2476        let buffer_a = project_a
2477            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2478            .await
2479            .unwrap();
2480
2481        // Start opening the same buffer as client B
2482        let buffer_b = cx_b
2483            .background()
2484            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2485
2486        // Edit the buffer as client A while client B is still opening it.
2487        cx_b.background().simulate_random_delay().await;
2488        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2489        cx_b.background().simulate_random_delay().await;
2490        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2491
2492        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2493        let buffer_b = buffer_b.await.unwrap();
2494        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2495    }
2496
2497    #[gpui::test(iterations = 10)]
2498    async fn test_leaving_worktree_while_opening_buffer(
2499        cx_a: &mut TestAppContext,
2500        cx_b: &mut TestAppContext,
2501    ) {
2502        cx_a.foreground().forbid_parking();
2503        let lang_registry = Arc::new(LanguageRegistry::test());
2504        let fs = FakeFs::new(cx_a.background());
2505
2506        // Connect to a server as 2 clients.
2507        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2508        let client_a = server.create_client(cx_a, "user_a").await;
2509        let client_b = server.create_client(cx_b, "user_b").await;
2510        server
2511            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2512            .await;
2513
2514        // Share a project as client A
2515        fs.insert_tree(
2516            "/dir",
2517            json!({
2518                "a.txt": "a-contents",
2519            }),
2520        )
2521        .await;
2522        let project_a = cx_a.update(|cx| {
2523            Project::local(
2524                client_a.clone(),
2525                client_a.user_store.clone(),
2526                lang_registry.clone(),
2527                fs.clone(),
2528                cx,
2529            )
2530        });
2531        let (worktree_a, _) = project_a
2532            .update(cx_a, |p, cx| {
2533                p.find_or_create_local_worktree("/dir", true, cx)
2534            })
2535            .await
2536            .unwrap();
2537        worktree_a
2538            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2539            .await;
2540        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2541        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2542        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2543
2544        // Join that project as client B
2545        let project_b = Project::remote(
2546            project_id,
2547            client_b.clone(),
2548            client_b.user_store.clone(),
2549            lang_registry.clone(),
2550            fs.clone(),
2551            &mut cx_b.to_async(),
2552        )
2553        .await
2554        .unwrap();
2555
2556        // See that a guest has joined as client A.
2557        project_a
2558            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2559            .await;
2560
2561        // Begin opening a buffer as client B, but leave the project before the open completes.
2562        let buffer_b = cx_b
2563            .background()
2564            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2565        cx_b.update(|_| drop(project_b));
2566        drop(buffer_b);
2567
2568        // See that the guest has left.
2569        project_a
2570            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2571            .await;
2572    }
2573
2574    #[gpui::test(iterations = 10)]
2575    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2576        cx_a.foreground().forbid_parking();
2577        let lang_registry = Arc::new(LanguageRegistry::test());
2578        let fs = FakeFs::new(cx_a.background());
2579
2580        // Connect to a server as 2 clients.
2581        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2582        let client_a = server.create_client(cx_a, "user_a").await;
2583        let client_b = server.create_client(cx_b, "user_b").await;
2584        server
2585            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2586            .await;
2587
2588        // Share a project as client A
2589        fs.insert_tree(
2590            "/a",
2591            json!({
2592                "a.txt": "a-contents",
2593                "b.txt": "b-contents",
2594            }),
2595        )
2596        .await;
2597        let project_a = cx_a.update(|cx| {
2598            Project::local(
2599                client_a.clone(),
2600                client_a.user_store.clone(),
2601                lang_registry.clone(),
2602                fs.clone(),
2603                cx,
2604            )
2605        });
2606        let (worktree_a, _) = project_a
2607            .update(cx_a, |p, cx| {
2608                p.find_or_create_local_worktree("/a", true, cx)
2609            })
2610            .await
2611            .unwrap();
2612        worktree_a
2613            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2614            .await;
2615        let project_id = project_a
2616            .update(cx_a, |project, _| project.next_remote_id())
2617            .await;
2618        project_a
2619            .update(cx_a, |project, cx| project.share(cx))
2620            .await
2621            .unwrap();
2622
2623        // Join that project as client B
2624        let _project_b = Project::remote(
2625            project_id,
2626            client_b.clone(),
2627            client_b.user_store.clone(),
2628            lang_registry.clone(),
2629            fs.clone(),
2630            &mut cx_b.to_async(),
2631        )
2632        .await
2633        .unwrap();
2634
2635        // Client A sees that a guest has joined.
2636        project_a
2637            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2638            .await;
2639
2640        // Drop client B's connection and ensure client A observes client B leaving the project.
2641        client_b.disconnect(&cx_b.to_async()).unwrap();
2642        project_a
2643            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2644            .await;
2645
2646        // Rejoin the project as client B
2647        let _project_b = Project::remote(
2648            project_id,
2649            client_b.clone(),
2650            client_b.user_store.clone(),
2651            lang_registry.clone(),
2652            fs.clone(),
2653            &mut cx_b.to_async(),
2654        )
2655        .await
2656        .unwrap();
2657
2658        // Client A sees that a guest has re-joined.
2659        project_a
2660            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2661            .await;
2662
2663        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2664        client_b.wait_for_current_user(cx_b).await;
2665        server.disconnect_client(client_b.current_user_id(cx_b));
2666        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2667        project_a
2668            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2669            .await;
2670    }
2671
2672    #[gpui::test(iterations = 10)]
2673    async fn test_collaborating_with_diagnostics(
2674        cx_a: &mut TestAppContext,
2675        cx_b: &mut TestAppContext,
2676    ) {
2677        cx_a.foreground().forbid_parking();
2678        let lang_registry = Arc::new(LanguageRegistry::test());
2679        let fs = FakeFs::new(cx_a.background());
2680
2681        // Set up a fake language server.
2682        let mut language = Language::new(
2683            LanguageConfig {
2684                name: "Rust".into(),
2685                path_suffixes: vec!["rs".to_string()],
2686                ..Default::default()
2687            },
2688            Some(tree_sitter_rust::language()),
2689        );
2690        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2691        lang_registry.add(Arc::new(language));
2692
2693        // Connect to a server as 2 clients.
2694        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2695        let client_a = server.create_client(cx_a, "user_a").await;
2696        let client_b = server.create_client(cx_b, "user_b").await;
2697        server
2698            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2699            .await;
2700
2701        // Share a project as client A
2702        fs.insert_tree(
2703            "/a",
2704            json!({
2705                "a.rs": "let one = two",
2706                "other.rs": "",
2707            }),
2708        )
2709        .await;
2710        let project_a = cx_a.update(|cx| {
2711            Project::local(
2712                client_a.clone(),
2713                client_a.user_store.clone(),
2714                lang_registry.clone(),
2715                fs.clone(),
2716                cx,
2717            )
2718        });
2719        let (worktree_a, _) = project_a
2720            .update(cx_a, |p, cx| {
2721                p.find_or_create_local_worktree("/a", true, cx)
2722            })
2723            .await
2724            .unwrap();
2725        worktree_a
2726            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2727            .await;
2728        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2729        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2730        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2731
2732        // Cause the language server to start.
2733        let _ = cx_a
2734            .background()
2735            .spawn(project_a.update(cx_a, |project, cx| {
2736                project.open_buffer(
2737                    ProjectPath {
2738                        worktree_id,
2739                        path: Path::new("other.rs").into(),
2740                    },
2741                    cx,
2742                )
2743            }))
2744            .await
2745            .unwrap();
2746
2747        // Simulate a language server reporting errors for a file.
2748        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2749        fake_language_server
2750            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2751            .await;
2752        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2753            lsp::PublishDiagnosticsParams {
2754                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2755                version: None,
2756                diagnostics: vec![lsp::Diagnostic {
2757                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2758                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2759                    message: "message 1".to_string(),
2760                    ..Default::default()
2761                }],
2762            },
2763        );
2764
2765        // Wait for server to see the diagnostics update.
2766        server
2767            .condition(|store| {
2768                let worktree = store
2769                    .project(project_id)
2770                    .unwrap()
2771                    .share
2772                    .as_ref()
2773                    .unwrap()
2774                    .worktrees
2775                    .get(&worktree_id.to_proto())
2776                    .unwrap();
2777
2778                !worktree.diagnostic_summaries.is_empty()
2779            })
2780            .await;
2781
2782        // Join the worktree as client B.
2783        let project_b = Project::remote(
2784            project_id,
2785            client_b.clone(),
2786            client_b.user_store.clone(),
2787            lang_registry.clone(),
2788            fs.clone(),
2789            &mut cx_b.to_async(),
2790        )
2791        .await
2792        .unwrap();
2793
2794        project_b.read_with(cx_b, |project, cx| {
2795            assert_eq!(
2796                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2797                &[(
2798                    ProjectPath {
2799                        worktree_id,
2800                        path: Arc::from(Path::new("a.rs")),
2801                    },
2802                    DiagnosticSummary {
2803                        error_count: 1,
2804                        warning_count: 0,
2805                        ..Default::default()
2806                    },
2807                )]
2808            )
2809        });
2810
2811        // Simulate a language server reporting more errors for a file.
2812        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2813            lsp::PublishDiagnosticsParams {
2814                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2815                version: None,
2816                diagnostics: vec![
2817                    lsp::Diagnostic {
2818                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2819                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2820                        message: "message 1".to_string(),
2821                        ..Default::default()
2822                    },
2823                    lsp::Diagnostic {
2824                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2825                        range: lsp::Range::new(
2826                            lsp::Position::new(0, 10),
2827                            lsp::Position::new(0, 13),
2828                        ),
2829                        message: "message 2".to_string(),
2830                        ..Default::default()
2831                    },
2832                ],
2833            },
2834        );
2835
2836        // Client b gets the updated summaries
2837        project_b
2838            .condition(&cx_b, |project, cx| {
2839                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2840                    == &[(
2841                        ProjectPath {
2842                            worktree_id,
2843                            path: Arc::from(Path::new("a.rs")),
2844                        },
2845                        DiagnosticSummary {
2846                            error_count: 1,
2847                            warning_count: 1,
2848                            ..Default::default()
2849                        },
2850                    )]
2851            })
2852            .await;
2853
2854        // Open the file with the errors on client B. They should be present.
2855        let buffer_b = cx_b
2856            .background()
2857            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2858            .await
2859            .unwrap();
2860
2861        buffer_b.read_with(cx_b, |buffer, _| {
2862            assert_eq!(
2863                buffer
2864                    .snapshot()
2865                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2866                    .map(|entry| entry)
2867                    .collect::<Vec<_>>(),
2868                &[
2869                    DiagnosticEntry {
2870                        range: Point::new(0, 4)..Point::new(0, 7),
2871                        diagnostic: Diagnostic {
2872                            group_id: 0,
2873                            message: "message 1".to_string(),
2874                            severity: lsp::DiagnosticSeverity::ERROR,
2875                            is_primary: true,
2876                            ..Default::default()
2877                        }
2878                    },
2879                    DiagnosticEntry {
2880                        range: Point::new(0, 10)..Point::new(0, 13),
2881                        diagnostic: Diagnostic {
2882                            group_id: 1,
2883                            severity: lsp::DiagnosticSeverity::WARNING,
2884                            message: "message 2".to_string(),
2885                            is_primary: true,
2886                            ..Default::default()
2887                        }
2888                    }
2889                ]
2890            );
2891        });
2892
2893        // Simulate a language server reporting no errors for a file.
2894        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2895            lsp::PublishDiagnosticsParams {
2896                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2897                version: None,
2898                diagnostics: vec![],
2899            },
2900        );
2901        project_a
2902            .condition(cx_a, |project, cx| {
2903                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2904            })
2905            .await;
2906        project_b
2907            .condition(cx_b, |project, cx| {
2908                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2909            })
2910            .await;
2911    }
2912
2913    #[gpui::test(iterations = 10)]
2914    async fn test_collaborating_with_completion(
2915        cx_a: &mut TestAppContext,
2916        cx_b: &mut TestAppContext,
2917    ) {
2918        cx_a.foreground().forbid_parking();
2919        let lang_registry = Arc::new(LanguageRegistry::test());
2920        let fs = FakeFs::new(cx_a.background());
2921
2922        // Set up a fake language server.
2923        let mut language = Language::new(
2924            LanguageConfig {
2925                name: "Rust".into(),
2926                path_suffixes: vec!["rs".to_string()],
2927                ..Default::default()
2928            },
2929            Some(tree_sitter_rust::language()),
2930        );
2931        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2932            capabilities: lsp::ServerCapabilities {
2933                completion_provider: Some(lsp::CompletionOptions {
2934                    trigger_characters: Some(vec![".".to_string()]),
2935                    ..Default::default()
2936                }),
2937                ..Default::default()
2938            },
2939            ..Default::default()
2940        });
2941        lang_registry.add(Arc::new(language));
2942
2943        // Connect to a server as 2 clients.
2944        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2945        let client_a = server.create_client(cx_a, "user_a").await;
2946        let client_b = server.create_client(cx_b, "user_b").await;
2947        server
2948            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2949            .await;
2950
2951        // Share a project as client A
2952        fs.insert_tree(
2953            "/a",
2954            json!({
2955                "main.rs": "fn main() { a }",
2956                "other.rs": "",
2957            }),
2958        )
2959        .await;
2960        let project_a = cx_a.update(|cx| {
2961            Project::local(
2962                client_a.clone(),
2963                client_a.user_store.clone(),
2964                lang_registry.clone(),
2965                fs.clone(),
2966                cx,
2967            )
2968        });
2969        let (worktree_a, _) = project_a
2970            .update(cx_a, |p, cx| {
2971                p.find_or_create_local_worktree("/a", true, cx)
2972            })
2973            .await
2974            .unwrap();
2975        worktree_a
2976            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2977            .await;
2978        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2979        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2980        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2981
2982        // Join the worktree as client B.
2983        let project_b = Project::remote(
2984            project_id,
2985            client_b.clone(),
2986            client_b.user_store.clone(),
2987            lang_registry.clone(),
2988            fs.clone(),
2989            &mut cx_b.to_async(),
2990        )
2991        .await
2992        .unwrap();
2993
2994        // Open a file in an editor as the guest.
2995        let buffer_b = project_b
2996            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2997            .await
2998            .unwrap();
2999        let (window_b, _) = cx_b.add_window(|_| EmptyView);
3000        let editor_b = cx_b.add_view(window_b, |cx| {
3001            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
3002        });
3003
3004        let fake_language_server = fake_language_servers.next().await.unwrap();
3005        buffer_b
3006            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3007            .await;
3008
3009        // Type a completion trigger character as the guest.
3010        editor_b.update(cx_b, |editor, cx| {
3011            editor.select_ranges([13..13], None, cx);
3012            editor.handle_input(&Input(".".into()), cx);
3013            cx.focus(&editor_b);
3014        });
3015
3016        // Receive a completion request as the host's language server.
3017        // Return some completions from the host's language server.
3018        cx_a.foreground().start_waiting();
3019        fake_language_server
3020            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3021                assert_eq!(
3022                    params.text_document_position.text_document.uri,
3023                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3024                );
3025                assert_eq!(
3026                    params.text_document_position.position,
3027                    lsp::Position::new(0, 14),
3028                );
3029
3030                Ok(Some(lsp::CompletionResponse::Array(vec![
3031                    lsp::CompletionItem {
3032                        label: "first_method(…)".into(),
3033                        detail: Some("fn(&mut self, B) -> C".into()),
3034                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3035                            new_text: "first_method($1)".to_string(),
3036                            range: lsp::Range::new(
3037                                lsp::Position::new(0, 14),
3038                                lsp::Position::new(0, 14),
3039                            ),
3040                        })),
3041                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3042                        ..Default::default()
3043                    },
3044                    lsp::CompletionItem {
3045                        label: "second_method(…)".into(),
3046                        detail: Some("fn(&mut self, C) -> D<E>".into()),
3047                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3048                            new_text: "second_method()".to_string(),
3049                            range: lsp::Range::new(
3050                                lsp::Position::new(0, 14),
3051                                lsp::Position::new(0, 14),
3052                            ),
3053                        })),
3054                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3055                        ..Default::default()
3056                    },
3057                ])))
3058            })
3059            .next()
3060            .await
3061            .unwrap();
3062        cx_a.foreground().finish_waiting();
3063
3064        // Open the buffer on the host.
3065        let buffer_a = project_a
3066            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3067            .await
3068            .unwrap();
3069        buffer_a
3070            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3071            .await;
3072
3073        // Confirm a completion on the guest.
3074        editor_b
3075            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3076            .await;
3077        editor_b.update(cx_b, |editor, cx| {
3078            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3079            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3080        });
3081
3082        // Return a resolved completion from the host's language server.
3083        // The resolved completion has an additional text edit.
3084        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3085            |params, _| async move {
3086                assert_eq!(params.label, "first_method(…)");
3087                Ok(lsp::CompletionItem {
3088                    label: "first_method(…)".into(),
3089                    detail: Some("fn(&mut self, B) -> C".into()),
3090                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3091                        new_text: "first_method($1)".to_string(),
3092                        range: lsp::Range::new(
3093                            lsp::Position::new(0, 14),
3094                            lsp::Position::new(0, 14),
3095                        ),
3096                    })),
3097                    additional_text_edits: Some(vec![lsp::TextEdit {
3098                        new_text: "use d::SomeTrait;\n".to_string(),
3099                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3100                    }]),
3101                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3102                    ..Default::default()
3103                })
3104            },
3105        );
3106
3107        // The additional edit is applied.
3108        buffer_a
3109            .condition(&cx_a, |buffer, _| {
3110                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3111            })
3112            .await;
3113        buffer_b
3114            .condition(&cx_b, |buffer, _| {
3115                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3116            })
3117            .await;
3118    }
3119
3120    #[gpui::test(iterations = 10)]
3121    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3122        cx_a.foreground().forbid_parking();
3123        let lang_registry = Arc::new(LanguageRegistry::test());
3124        let fs = FakeFs::new(cx_a.background());
3125
3126        // Connect to a server as 2 clients.
3127        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3128        let client_a = server.create_client(cx_a, "user_a").await;
3129        let client_b = server.create_client(cx_b, "user_b").await;
3130        server
3131            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3132            .await;
3133
3134        // Share a project as client A
3135        fs.insert_tree(
3136            "/a",
3137            json!({
3138                "a.rs": "let one = 1;",
3139            }),
3140        )
3141        .await;
3142        let project_a = cx_a.update(|cx| {
3143            Project::local(
3144                client_a.clone(),
3145                client_a.user_store.clone(),
3146                lang_registry.clone(),
3147                fs.clone(),
3148                cx,
3149            )
3150        });
3151        let (worktree_a, _) = project_a
3152            .update(cx_a, |p, cx| {
3153                p.find_or_create_local_worktree("/a", true, cx)
3154            })
3155            .await
3156            .unwrap();
3157        worktree_a
3158            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3159            .await;
3160        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3161        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3162        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3163        let buffer_a = project_a
3164            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3165            .await
3166            .unwrap();
3167
3168        // Join the worktree as client B.
3169        let project_b = Project::remote(
3170            project_id,
3171            client_b.clone(),
3172            client_b.user_store.clone(),
3173            lang_registry.clone(),
3174            fs.clone(),
3175            &mut cx_b.to_async(),
3176        )
3177        .await
3178        .unwrap();
3179
3180        let buffer_b = cx_b
3181            .background()
3182            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3183            .await
3184            .unwrap();
3185        buffer_b.update(cx_b, |buffer, cx| {
3186            buffer.edit([(4..7, "six")], cx);
3187            buffer.edit([(10..11, "6")], cx);
3188            assert_eq!(buffer.text(), "let six = 6;");
3189            assert!(buffer.is_dirty());
3190            assert!(!buffer.has_conflict());
3191        });
3192        buffer_a
3193            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3194            .await;
3195
3196        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3197            .await
3198            .unwrap();
3199        buffer_a
3200            .condition(cx_a, |buffer, _| buffer.has_conflict())
3201            .await;
3202        buffer_b
3203            .condition(cx_b, |buffer, _| buffer.has_conflict())
3204            .await;
3205
3206        project_b
3207            .update(cx_b, |project, cx| {
3208                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3209            })
3210            .await
3211            .unwrap();
3212        buffer_a.read_with(cx_a, |buffer, _| {
3213            assert_eq!(buffer.text(), "let seven = 7;");
3214            assert!(!buffer.is_dirty());
3215            assert!(!buffer.has_conflict());
3216        });
3217        buffer_b.read_with(cx_b, |buffer, _| {
3218            assert_eq!(buffer.text(), "let seven = 7;");
3219            assert!(!buffer.is_dirty());
3220            assert!(!buffer.has_conflict());
3221        });
3222
3223        buffer_a.update(cx_a, |buffer, cx| {
3224            // Undoing on the host is a no-op when the reload was initiated by the guest.
3225            buffer.undo(cx);
3226            assert_eq!(buffer.text(), "let seven = 7;");
3227            assert!(!buffer.is_dirty());
3228            assert!(!buffer.has_conflict());
3229        });
3230        buffer_b.update(cx_b, |buffer, cx| {
3231            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3232            buffer.undo(cx);
3233            assert_eq!(buffer.text(), "let six = 6;");
3234            assert!(buffer.is_dirty());
3235            assert!(!buffer.has_conflict());
3236        });
3237    }
3238
3239    #[gpui::test(iterations = 10)]
3240    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3241        cx_a.foreground().forbid_parking();
3242        let lang_registry = Arc::new(LanguageRegistry::test());
3243        let fs = FakeFs::new(cx_a.background());
3244
3245        // Set up a fake language server.
3246        let mut language = Language::new(
3247            LanguageConfig {
3248                name: "Rust".into(),
3249                path_suffixes: vec!["rs".to_string()],
3250                ..Default::default()
3251            },
3252            Some(tree_sitter_rust::language()),
3253        );
3254        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3255        lang_registry.add(Arc::new(language));
3256
3257        // Connect to a server as 2 clients.
3258        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3259        let client_a = server.create_client(cx_a, "user_a").await;
3260        let client_b = server.create_client(cx_b, "user_b").await;
3261        server
3262            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3263            .await;
3264
3265        // Share a project as client A
3266        fs.insert_tree(
3267            "/a",
3268            json!({
3269                "a.rs": "let one = two",
3270            }),
3271        )
3272        .await;
3273        let project_a = cx_a.update(|cx| {
3274            Project::local(
3275                client_a.clone(),
3276                client_a.user_store.clone(),
3277                lang_registry.clone(),
3278                fs.clone(),
3279                cx,
3280            )
3281        });
3282        let (worktree_a, _) = project_a
3283            .update(cx_a, |p, cx| {
3284                p.find_or_create_local_worktree("/a", true, cx)
3285            })
3286            .await
3287            .unwrap();
3288        worktree_a
3289            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3290            .await;
3291        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3292        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3293        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3294
3295        // Join the worktree as client B.
3296        let project_b = Project::remote(
3297            project_id,
3298            client_b.clone(),
3299            client_b.user_store.clone(),
3300            lang_registry.clone(),
3301            fs.clone(),
3302            &mut cx_b.to_async(),
3303        )
3304        .await
3305        .unwrap();
3306
3307        let buffer_b = cx_b
3308            .background()
3309            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3310            .await
3311            .unwrap();
3312
3313        let fake_language_server = fake_language_servers.next().await.unwrap();
3314        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3315            Ok(Some(vec![
3316                lsp::TextEdit {
3317                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3318                    new_text: "h".to_string(),
3319                },
3320                lsp::TextEdit {
3321                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3322                    new_text: "y".to_string(),
3323                },
3324            ]))
3325        });
3326
3327        project_b
3328            .update(cx_b, |project, cx| {
3329                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3330            })
3331            .await
3332            .unwrap();
3333        assert_eq!(
3334            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3335            "let honey = two"
3336        );
3337    }
3338
3339    #[gpui::test(iterations = 10)]
3340    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3341        cx_a.foreground().forbid_parking();
3342        let lang_registry = Arc::new(LanguageRegistry::test());
3343        let fs = FakeFs::new(cx_a.background());
3344        fs.insert_tree(
3345            "/root-1",
3346            json!({
3347                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3348            }),
3349        )
3350        .await;
3351        fs.insert_tree(
3352            "/root-2",
3353            json!({
3354                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3355            }),
3356        )
3357        .await;
3358
3359        // Set up a fake language server.
3360        let mut language = Language::new(
3361            LanguageConfig {
3362                name: "Rust".into(),
3363                path_suffixes: vec!["rs".to_string()],
3364                ..Default::default()
3365            },
3366            Some(tree_sitter_rust::language()),
3367        );
3368        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3369        lang_registry.add(Arc::new(language));
3370
3371        // Connect to a server as 2 clients.
3372        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3373        let client_a = server.create_client(cx_a, "user_a").await;
3374        let client_b = server.create_client(cx_b, "user_b").await;
3375        server
3376            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3377            .await;
3378
3379        // Share a project as client A
3380        let project_a = cx_a.update(|cx| {
3381            Project::local(
3382                client_a.clone(),
3383                client_a.user_store.clone(),
3384                lang_registry.clone(),
3385                fs.clone(),
3386                cx,
3387            )
3388        });
3389        let (worktree_a, _) = project_a
3390            .update(cx_a, |p, cx| {
3391                p.find_or_create_local_worktree("/root-1", true, cx)
3392            })
3393            .await
3394            .unwrap();
3395        worktree_a
3396            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3397            .await;
3398        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3399        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3400        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3401
3402        // Join the worktree as client B.
3403        let project_b = Project::remote(
3404            project_id,
3405            client_b.clone(),
3406            client_b.user_store.clone(),
3407            lang_registry.clone(),
3408            fs.clone(),
3409            &mut cx_b.to_async(),
3410        )
3411        .await
3412        .unwrap();
3413
3414        // Open the file on client B.
3415        let buffer_b = cx_b
3416            .background()
3417            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3418            .await
3419            .unwrap();
3420
3421        // Request the definition of a symbol as the guest.
3422        let fake_language_server = fake_language_servers.next().await.unwrap();
3423        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3424            |_, _| async move {
3425                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3426                    lsp::Location::new(
3427                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3428                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3429                    ),
3430                )))
3431            },
3432        );
3433
3434        let definitions_1 = project_b
3435            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3436            .await
3437            .unwrap();
3438        cx_b.read(|cx| {
3439            assert_eq!(definitions_1.len(), 1);
3440            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3441            let target_buffer = definitions_1[0].buffer.read(cx);
3442            assert_eq!(
3443                target_buffer.text(),
3444                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3445            );
3446            assert_eq!(
3447                definitions_1[0].range.to_point(target_buffer),
3448                Point::new(0, 6)..Point::new(0, 9)
3449            );
3450        });
3451
3452        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3453        // the previous call to `definition`.
3454        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3455            |_, _| async move {
3456                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3457                    lsp::Location::new(
3458                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3459                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3460                    ),
3461                )))
3462            },
3463        );
3464
3465        let definitions_2 = project_b
3466            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3467            .await
3468            .unwrap();
3469        cx_b.read(|cx| {
3470            assert_eq!(definitions_2.len(), 1);
3471            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3472            let target_buffer = definitions_2[0].buffer.read(cx);
3473            assert_eq!(
3474                target_buffer.text(),
3475                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3476            );
3477            assert_eq!(
3478                definitions_2[0].range.to_point(target_buffer),
3479                Point::new(1, 6)..Point::new(1, 11)
3480            );
3481        });
3482        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3483    }
3484
3485    #[gpui::test(iterations = 10)]
3486    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3487        cx_a.foreground().forbid_parking();
3488        let lang_registry = Arc::new(LanguageRegistry::test());
3489        let fs = FakeFs::new(cx_a.background());
3490        fs.insert_tree(
3491            "/root-1",
3492            json!({
3493                "one.rs": "const ONE: usize = 1;",
3494                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3495            }),
3496        )
3497        .await;
3498        fs.insert_tree(
3499            "/root-2",
3500            json!({
3501                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3502            }),
3503        )
3504        .await;
3505
3506        // Set up a fake language server.
3507        let mut language = Language::new(
3508            LanguageConfig {
3509                name: "Rust".into(),
3510                path_suffixes: vec!["rs".to_string()],
3511                ..Default::default()
3512            },
3513            Some(tree_sitter_rust::language()),
3514        );
3515        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3516        lang_registry.add(Arc::new(language));
3517
3518        // Connect to a server as 2 clients.
3519        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3520        let client_a = server.create_client(cx_a, "user_a").await;
3521        let client_b = server.create_client(cx_b, "user_b").await;
3522        server
3523            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3524            .await;
3525
3526        // Share a project as client A
3527        let project_a = cx_a.update(|cx| {
3528            Project::local(
3529                client_a.clone(),
3530                client_a.user_store.clone(),
3531                lang_registry.clone(),
3532                fs.clone(),
3533                cx,
3534            )
3535        });
3536        let (worktree_a, _) = project_a
3537            .update(cx_a, |p, cx| {
3538                p.find_or_create_local_worktree("/root-1", true, cx)
3539            })
3540            .await
3541            .unwrap();
3542        worktree_a
3543            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3544            .await;
3545        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3546        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3547        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3548
3549        // Join the worktree as client B.
3550        let project_b = Project::remote(
3551            project_id,
3552            client_b.clone(),
3553            client_b.user_store.clone(),
3554            lang_registry.clone(),
3555            fs.clone(),
3556            &mut cx_b.to_async(),
3557        )
3558        .await
3559        .unwrap();
3560
3561        // Open the file on client B.
3562        let buffer_b = cx_b
3563            .background()
3564            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3565            .await
3566            .unwrap();
3567
3568        // Request references to a symbol as the guest.
3569        let fake_language_server = fake_language_servers.next().await.unwrap();
3570        fake_language_server.handle_request::<lsp::request::References, _, _>(
3571            |params, _| async move {
3572                assert_eq!(
3573                    params.text_document_position.text_document.uri.as_str(),
3574                    "file:///root-1/one.rs"
3575                );
3576                Ok(Some(vec![
3577                    lsp::Location {
3578                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3579                        range: lsp::Range::new(
3580                            lsp::Position::new(0, 24),
3581                            lsp::Position::new(0, 27),
3582                        ),
3583                    },
3584                    lsp::Location {
3585                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3586                        range: lsp::Range::new(
3587                            lsp::Position::new(0, 35),
3588                            lsp::Position::new(0, 38),
3589                        ),
3590                    },
3591                    lsp::Location {
3592                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3593                        range: lsp::Range::new(
3594                            lsp::Position::new(0, 37),
3595                            lsp::Position::new(0, 40),
3596                        ),
3597                    },
3598                ]))
3599            },
3600        );
3601
3602        let references = project_b
3603            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3604            .await
3605            .unwrap();
3606        cx_b.read(|cx| {
3607            assert_eq!(references.len(), 3);
3608            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3609
3610            let two_buffer = references[0].buffer.read(cx);
3611            let three_buffer = references[2].buffer.read(cx);
3612            assert_eq!(
3613                two_buffer.file().unwrap().path().as_ref(),
3614                Path::new("two.rs")
3615            );
3616            assert_eq!(references[1].buffer, references[0].buffer);
3617            assert_eq!(
3618                three_buffer.file().unwrap().full_path(cx),
3619                Path::new("three.rs")
3620            );
3621
3622            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3623            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3624            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3625        });
3626    }
3627
3628    #[gpui::test(iterations = 10)]
3629    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3630        cx_a.foreground().forbid_parking();
3631        let lang_registry = Arc::new(LanguageRegistry::test());
3632        let fs = FakeFs::new(cx_a.background());
3633        fs.insert_tree(
3634            "/root-1",
3635            json!({
3636                "a": "hello world",
3637                "b": "goodnight moon",
3638                "c": "a world of goo",
3639                "d": "world champion of clown world",
3640            }),
3641        )
3642        .await;
3643        fs.insert_tree(
3644            "/root-2",
3645            json!({
3646                "e": "disney world is fun",
3647            }),
3648        )
3649        .await;
3650
3651        // Connect to a server as 2 clients.
3652        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3653        let client_a = server.create_client(cx_a, "user_a").await;
3654        let client_b = server.create_client(cx_b, "user_b").await;
3655        server
3656            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3657            .await;
3658
3659        // Share a project as client A
3660        let project_a = cx_a.update(|cx| {
3661            Project::local(
3662                client_a.clone(),
3663                client_a.user_store.clone(),
3664                lang_registry.clone(),
3665                fs.clone(),
3666                cx,
3667            )
3668        });
3669        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3670
3671        let (worktree_1, _) = project_a
3672            .update(cx_a, |p, cx| {
3673                p.find_or_create_local_worktree("/root-1", true, cx)
3674            })
3675            .await
3676            .unwrap();
3677        worktree_1
3678            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3679            .await;
3680        let (worktree_2, _) = project_a
3681            .update(cx_a, |p, cx| {
3682                p.find_or_create_local_worktree("/root-2", true, cx)
3683            })
3684            .await
3685            .unwrap();
3686        worktree_2
3687            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3688            .await;
3689
3690        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3691
3692        // Join the worktree as client B.
3693        let project_b = Project::remote(
3694            project_id,
3695            client_b.clone(),
3696            client_b.user_store.clone(),
3697            lang_registry.clone(),
3698            fs.clone(),
3699            &mut cx_b.to_async(),
3700        )
3701        .await
3702        .unwrap();
3703
3704        let results = project_b
3705            .update(cx_b, |project, cx| {
3706                project.search(SearchQuery::text("world", false, false), cx)
3707            })
3708            .await
3709            .unwrap();
3710
3711        let mut ranges_by_path = results
3712            .into_iter()
3713            .map(|(buffer, ranges)| {
3714                buffer.read_with(cx_b, |buffer, cx| {
3715                    let path = buffer.file().unwrap().full_path(cx);
3716                    let offset_ranges = ranges
3717                        .into_iter()
3718                        .map(|range| range.to_offset(buffer))
3719                        .collect::<Vec<_>>();
3720                    (path, offset_ranges)
3721                })
3722            })
3723            .collect::<Vec<_>>();
3724        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3725
3726        assert_eq!(
3727            ranges_by_path,
3728            &[
3729                (PathBuf::from("root-1/a"), vec![6..11]),
3730                (PathBuf::from("root-1/c"), vec![2..7]),
3731                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3732                (PathBuf::from("root-2/e"), vec![7..12]),
3733            ]
3734        );
3735    }
3736
3737    #[gpui::test(iterations = 10)]
3738    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3739        cx_a.foreground().forbid_parking();
3740        let lang_registry = Arc::new(LanguageRegistry::test());
3741        let fs = FakeFs::new(cx_a.background());
3742        fs.insert_tree(
3743            "/root-1",
3744            json!({
3745                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3746            }),
3747        )
3748        .await;
3749
3750        // Set up a fake language server.
3751        let mut language = Language::new(
3752            LanguageConfig {
3753                name: "Rust".into(),
3754                path_suffixes: vec!["rs".to_string()],
3755                ..Default::default()
3756            },
3757            Some(tree_sitter_rust::language()),
3758        );
3759        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3760        lang_registry.add(Arc::new(language));
3761
3762        // Connect to a server as 2 clients.
3763        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3764        let client_a = server.create_client(cx_a, "user_a").await;
3765        let client_b = server.create_client(cx_b, "user_b").await;
3766        server
3767            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3768            .await;
3769
3770        // Share a project as client A
3771        let project_a = cx_a.update(|cx| {
3772            Project::local(
3773                client_a.clone(),
3774                client_a.user_store.clone(),
3775                lang_registry.clone(),
3776                fs.clone(),
3777                cx,
3778            )
3779        });
3780        let (worktree_a, _) = project_a
3781            .update(cx_a, |p, cx| {
3782                p.find_or_create_local_worktree("/root-1", true, cx)
3783            })
3784            .await
3785            .unwrap();
3786        worktree_a
3787            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3788            .await;
3789        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3790        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3791        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3792
3793        // Join the worktree as client B.
3794        let project_b = Project::remote(
3795            project_id,
3796            client_b.clone(),
3797            client_b.user_store.clone(),
3798            lang_registry.clone(),
3799            fs.clone(),
3800            &mut cx_b.to_async(),
3801        )
3802        .await
3803        .unwrap();
3804
3805        // Open the file on client B.
3806        let buffer_b = cx_b
3807            .background()
3808            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3809            .await
3810            .unwrap();
3811
3812        // Request document highlights as the guest.
3813        let fake_language_server = fake_language_servers.next().await.unwrap();
3814        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3815            |params, _| async move {
3816                assert_eq!(
3817                    params
3818                        .text_document_position_params
3819                        .text_document
3820                        .uri
3821                        .as_str(),
3822                    "file:///root-1/main.rs"
3823                );
3824                assert_eq!(
3825                    params.text_document_position_params.position,
3826                    lsp::Position::new(0, 34)
3827                );
3828                Ok(Some(vec![
3829                    lsp::DocumentHighlight {
3830                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3831                        range: lsp::Range::new(
3832                            lsp::Position::new(0, 10),
3833                            lsp::Position::new(0, 16),
3834                        ),
3835                    },
3836                    lsp::DocumentHighlight {
3837                        kind: Some(lsp::DocumentHighlightKind::READ),
3838                        range: lsp::Range::new(
3839                            lsp::Position::new(0, 32),
3840                            lsp::Position::new(0, 38),
3841                        ),
3842                    },
3843                    lsp::DocumentHighlight {
3844                        kind: Some(lsp::DocumentHighlightKind::READ),
3845                        range: lsp::Range::new(
3846                            lsp::Position::new(0, 41),
3847                            lsp::Position::new(0, 47),
3848                        ),
3849                    },
3850                ]))
3851            },
3852        );
3853
3854        let highlights = project_b
3855            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3856            .await
3857            .unwrap();
3858        buffer_b.read_with(cx_b, |buffer, _| {
3859            let snapshot = buffer.snapshot();
3860
3861            let highlights = highlights
3862                .into_iter()
3863                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3864                .collect::<Vec<_>>();
3865            assert_eq!(
3866                highlights,
3867                &[
3868                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3869                    (lsp::DocumentHighlightKind::READ, 32..38),
3870                    (lsp::DocumentHighlightKind::READ, 41..47)
3871                ]
3872            )
3873        });
3874    }
3875
3876    #[gpui::test(iterations = 10)]
3877    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3878        cx_a.foreground().forbid_parking();
3879        let lang_registry = Arc::new(LanguageRegistry::test());
3880        let fs = FakeFs::new(cx_a.background());
3881        fs.insert_tree(
3882            "/code",
3883            json!({
3884                "crate-1": {
3885                    "one.rs": "const ONE: usize = 1;",
3886                },
3887                "crate-2": {
3888                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3889                },
3890                "private": {
3891                    "passwords.txt": "the-password",
3892                }
3893            }),
3894        )
3895        .await;
3896
3897        // Set up a fake language server.
3898        let mut language = Language::new(
3899            LanguageConfig {
3900                name: "Rust".into(),
3901                path_suffixes: vec!["rs".to_string()],
3902                ..Default::default()
3903            },
3904            Some(tree_sitter_rust::language()),
3905        );
3906        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3907        lang_registry.add(Arc::new(language));
3908
3909        // Connect to a server as 2 clients.
3910        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3911        let client_a = server.create_client(cx_a, "user_a").await;
3912        let client_b = server.create_client(cx_b, "user_b").await;
3913        server
3914            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3915            .await;
3916
3917        // Share a project as client A
3918        let project_a = cx_a.update(|cx| {
3919            Project::local(
3920                client_a.clone(),
3921                client_a.user_store.clone(),
3922                lang_registry.clone(),
3923                fs.clone(),
3924                cx,
3925            )
3926        });
3927        let (worktree_a, _) = project_a
3928            .update(cx_a, |p, cx| {
3929                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3930            })
3931            .await
3932            .unwrap();
3933        worktree_a
3934            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3935            .await;
3936        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3937        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3938        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3939
3940        // Join the worktree as client B.
3941        let project_b = Project::remote(
3942            project_id,
3943            client_b.clone(),
3944            client_b.user_store.clone(),
3945            lang_registry.clone(),
3946            fs.clone(),
3947            &mut cx_b.to_async(),
3948        )
3949        .await
3950        .unwrap();
3951
3952        // Cause the language server to start.
3953        let _buffer = cx_b
3954            .background()
3955            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3956            .await
3957            .unwrap();
3958
3959        let fake_language_server = fake_language_servers.next().await.unwrap();
3960        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3961            |_, _| async move {
3962                #[allow(deprecated)]
3963                Ok(Some(vec![lsp::SymbolInformation {
3964                    name: "TWO".into(),
3965                    location: lsp::Location {
3966                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3967                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3968                    },
3969                    kind: lsp::SymbolKind::CONSTANT,
3970                    tags: None,
3971                    container_name: None,
3972                    deprecated: None,
3973                }]))
3974            },
3975        );
3976
3977        // Request the definition of a symbol as the guest.
3978        let symbols = project_b
3979            .update(cx_b, |p, cx| p.symbols("two", cx))
3980            .await
3981            .unwrap();
3982        assert_eq!(symbols.len(), 1);
3983        assert_eq!(symbols[0].name, "TWO");
3984
3985        // Open one of the returned symbols.
3986        let buffer_b_2 = project_b
3987            .update(cx_b, |project, cx| {
3988                project.open_buffer_for_symbol(&symbols[0], cx)
3989            })
3990            .await
3991            .unwrap();
3992        buffer_b_2.read_with(cx_b, |buffer, _| {
3993            assert_eq!(
3994                buffer.file().unwrap().path().as_ref(),
3995                Path::new("../crate-2/two.rs")
3996            );
3997        });
3998
3999        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
4000        let mut fake_symbol = symbols[0].clone();
4001        fake_symbol.path = Path::new("/code/secrets").into();
4002        let error = project_b
4003            .update(cx_b, |project, cx| {
4004                project.open_buffer_for_symbol(&fake_symbol, cx)
4005            })
4006            .await
4007            .unwrap_err();
4008        assert!(error.to_string().contains("invalid symbol signature"));
4009    }
4010
4011    #[gpui::test(iterations = 10)]
4012    async fn test_open_buffer_while_getting_definition_pointing_to_it(
4013        cx_a: &mut TestAppContext,
4014        cx_b: &mut TestAppContext,
4015        mut rng: StdRng,
4016    ) {
4017        cx_a.foreground().forbid_parking();
4018        let lang_registry = Arc::new(LanguageRegistry::test());
4019        let fs = FakeFs::new(cx_a.background());
4020        fs.insert_tree(
4021            "/root",
4022            json!({
4023                "a.rs": "const ONE: usize = b::TWO;",
4024                "b.rs": "const TWO: usize = 2",
4025            }),
4026        )
4027        .await;
4028
4029        // Set up a fake language server.
4030        let mut language = Language::new(
4031            LanguageConfig {
4032                name: "Rust".into(),
4033                path_suffixes: vec!["rs".to_string()],
4034                ..Default::default()
4035            },
4036            Some(tree_sitter_rust::language()),
4037        );
4038        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4039        lang_registry.add(Arc::new(language));
4040
4041        // Connect to a server as 2 clients.
4042        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4043        let client_a = server.create_client(cx_a, "user_a").await;
4044        let client_b = server.create_client(cx_b, "user_b").await;
4045        server
4046            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4047            .await;
4048
4049        // Share a project as client A
4050        let project_a = cx_a.update(|cx| {
4051            Project::local(
4052                client_a.clone(),
4053                client_a.user_store.clone(),
4054                lang_registry.clone(),
4055                fs.clone(),
4056                cx,
4057            )
4058        });
4059
4060        let (worktree_a, _) = project_a
4061            .update(cx_a, |p, cx| {
4062                p.find_or_create_local_worktree("/root", true, cx)
4063            })
4064            .await
4065            .unwrap();
4066        worktree_a
4067            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4068            .await;
4069        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4070        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4071        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4072
4073        // Join the worktree as client B.
4074        let project_b = Project::remote(
4075            project_id,
4076            client_b.clone(),
4077            client_b.user_store.clone(),
4078            lang_registry.clone(),
4079            fs.clone(),
4080            &mut cx_b.to_async(),
4081        )
4082        .await
4083        .unwrap();
4084
4085        let buffer_b1 = cx_b
4086            .background()
4087            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4088            .await
4089            .unwrap();
4090
4091        let fake_language_server = fake_language_servers.next().await.unwrap();
4092        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4093            |_, _| async move {
4094                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4095                    lsp::Location::new(
4096                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
4097                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4098                    ),
4099                )))
4100            },
4101        );
4102
4103        let definitions;
4104        let buffer_b2;
4105        if rng.gen() {
4106            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4107            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4108        } else {
4109            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4110            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4111        }
4112
4113        let buffer_b2 = buffer_b2.await.unwrap();
4114        let definitions = definitions.await.unwrap();
4115        assert_eq!(definitions.len(), 1);
4116        assert_eq!(definitions[0].buffer, buffer_b2);
4117    }
4118
4119    #[gpui::test(iterations = 10)]
4120    async fn test_collaborating_with_code_actions(
4121        cx_a: &mut TestAppContext,
4122        cx_b: &mut TestAppContext,
4123    ) {
4124        cx_a.foreground().forbid_parking();
4125        let lang_registry = Arc::new(LanguageRegistry::test());
4126        let fs = FakeFs::new(cx_a.background());
4127        cx_b.update(|cx| editor::init(cx));
4128
4129        // Set up a fake language server.
4130        let mut language = Language::new(
4131            LanguageConfig {
4132                name: "Rust".into(),
4133                path_suffixes: vec!["rs".to_string()],
4134                ..Default::default()
4135            },
4136            Some(tree_sitter_rust::language()),
4137        );
4138        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4139        lang_registry.add(Arc::new(language));
4140
4141        // Connect to a server as 2 clients.
4142        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4143        let client_a = server.create_client(cx_a, "user_a").await;
4144        let client_b = server.create_client(cx_b, "user_b").await;
4145        server
4146            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4147            .await;
4148
4149        // Share a project as client A
4150        fs.insert_tree(
4151            "/a",
4152            json!({
4153                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4154                "other.rs": "pub fn foo() -> usize { 4 }",
4155            }),
4156        )
4157        .await;
4158        let project_a = cx_a.update(|cx| {
4159            Project::local(
4160                client_a.clone(),
4161                client_a.user_store.clone(),
4162                lang_registry.clone(),
4163                fs.clone(),
4164                cx,
4165            )
4166        });
4167        let (worktree_a, _) = project_a
4168            .update(cx_a, |p, cx| {
4169                p.find_or_create_local_worktree("/a", true, cx)
4170            })
4171            .await
4172            .unwrap();
4173        worktree_a
4174            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4175            .await;
4176        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4177        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4178        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4179
4180        // Join the worktree as client B.
4181        let project_b = Project::remote(
4182            project_id,
4183            client_b.clone(),
4184            client_b.user_store.clone(),
4185            lang_registry.clone(),
4186            fs.clone(),
4187            &mut cx_b.to_async(),
4188        )
4189        .await
4190        .unwrap();
4191        let mut params = cx_b.update(WorkspaceParams::test);
4192        params.languages = lang_registry.clone();
4193        params.client = client_b.client.clone();
4194        params.user_store = client_b.user_store.clone();
4195        params.project = project_b;
4196
4197        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4198        let editor_b = workspace_b
4199            .update(cx_b, |workspace, cx| {
4200                workspace.open_path((worktree_id, "main.rs"), true, cx)
4201            })
4202            .await
4203            .unwrap()
4204            .downcast::<Editor>()
4205            .unwrap();
4206
4207        let mut fake_language_server = fake_language_servers.next().await.unwrap();
4208        fake_language_server
4209            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4210                assert_eq!(
4211                    params.text_document.uri,
4212                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4213                );
4214                assert_eq!(params.range.start, lsp::Position::new(0, 0));
4215                assert_eq!(params.range.end, lsp::Position::new(0, 0));
4216                Ok(None)
4217            })
4218            .next()
4219            .await;
4220
4221        // Move cursor to a location that contains code actions.
4222        editor_b.update(cx_b, |editor, cx| {
4223            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4224            cx.focus(&editor_b);
4225        });
4226
4227        fake_language_server
4228            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4229                assert_eq!(
4230                    params.text_document.uri,
4231                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4232                );
4233                assert_eq!(params.range.start, lsp::Position::new(1, 31));
4234                assert_eq!(params.range.end, lsp::Position::new(1, 31));
4235
4236                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4237                    lsp::CodeAction {
4238                        title: "Inline into all callers".to_string(),
4239                        edit: Some(lsp::WorkspaceEdit {
4240                            changes: Some(
4241                                [
4242                                    (
4243                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
4244                                        vec![lsp::TextEdit::new(
4245                                            lsp::Range::new(
4246                                                lsp::Position::new(1, 22),
4247                                                lsp::Position::new(1, 34),
4248                                            ),
4249                                            "4".to_string(),
4250                                        )],
4251                                    ),
4252                                    (
4253                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
4254                                        vec![lsp::TextEdit::new(
4255                                            lsp::Range::new(
4256                                                lsp::Position::new(0, 0),
4257                                                lsp::Position::new(0, 27),
4258                                            ),
4259                                            "".to_string(),
4260                                        )],
4261                                    ),
4262                                ]
4263                                .into_iter()
4264                                .collect(),
4265                            ),
4266                            ..Default::default()
4267                        }),
4268                        data: Some(json!({
4269                            "codeActionParams": {
4270                                "range": {
4271                                    "start": {"line": 1, "column": 31},
4272                                    "end": {"line": 1, "column": 31},
4273                                }
4274                            }
4275                        })),
4276                        ..Default::default()
4277                    },
4278                )]))
4279            })
4280            .next()
4281            .await;
4282
4283        // Toggle code actions and wait for them to display.
4284        editor_b.update(cx_b, |editor, cx| {
4285            editor.toggle_code_actions(
4286                &ToggleCodeActions {
4287                    deployed_from_indicator: false,
4288                },
4289                cx,
4290            );
4291        });
4292        editor_b
4293            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4294            .await;
4295
4296        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4297
4298        // Confirming the code action will trigger a resolve request.
4299        let confirm_action = workspace_b
4300            .update(cx_b, |workspace, cx| {
4301                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4302            })
4303            .unwrap();
4304        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4305            |_, _| async move {
4306                Ok(lsp::CodeAction {
4307                    title: "Inline into all callers".to_string(),
4308                    edit: Some(lsp::WorkspaceEdit {
4309                        changes: Some(
4310                            [
4311                                (
4312                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4313                                    vec![lsp::TextEdit::new(
4314                                        lsp::Range::new(
4315                                            lsp::Position::new(1, 22),
4316                                            lsp::Position::new(1, 34),
4317                                        ),
4318                                        "4".to_string(),
4319                                    )],
4320                                ),
4321                                (
4322                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4323                                    vec![lsp::TextEdit::new(
4324                                        lsp::Range::new(
4325                                            lsp::Position::new(0, 0),
4326                                            lsp::Position::new(0, 27),
4327                                        ),
4328                                        "".to_string(),
4329                                    )],
4330                                ),
4331                            ]
4332                            .into_iter()
4333                            .collect(),
4334                        ),
4335                        ..Default::default()
4336                    }),
4337                    ..Default::default()
4338                })
4339            },
4340        );
4341
4342        // After the action is confirmed, an editor containing both modified files is opened.
4343        confirm_action.await.unwrap();
4344        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4345            workspace
4346                .active_item(cx)
4347                .unwrap()
4348                .downcast::<Editor>()
4349                .unwrap()
4350        });
4351        code_action_editor.update(cx_b, |editor, cx| {
4352            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4353            editor.undo(&Undo, cx);
4354            assert_eq!(
4355                editor.text(cx),
4356                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4357            );
4358            editor.redo(&Redo, cx);
4359            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4360        });
4361    }
4362
4363    #[gpui::test(iterations = 10)]
4364    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4365        cx_a.foreground().forbid_parking();
4366        let lang_registry = Arc::new(LanguageRegistry::test());
4367        let fs = FakeFs::new(cx_a.background());
4368        cx_b.update(|cx| editor::init(cx));
4369
4370        // Set up a fake language server.
4371        let mut language = Language::new(
4372            LanguageConfig {
4373                name: "Rust".into(),
4374                path_suffixes: vec!["rs".to_string()],
4375                ..Default::default()
4376            },
4377            Some(tree_sitter_rust::language()),
4378        );
4379        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4380            capabilities: lsp::ServerCapabilities {
4381                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4382                    prepare_provider: Some(true),
4383                    work_done_progress_options: Default::default(),
4384                })),
4385                ..Default::default()
4386            },
4387            ..Default::default()
4388        });
4389        lang_registry.add(Arc::new(language));
4390
4391        // Connect to a server as 2 clients.
4392        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4393        let client_a = server.create_client(cx_a, "user_a").await;
4394        let client_b = server.create_client(cx_b, "user_b").await;
4395        server
4396            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4397            .await;
4398
4399        // Share a project as client A
4400        fs.insert_tree(
4401            "/dir",
4402            json!({
4403                "one.rs": "const ONE: usize = 1;",
4404                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4405            }),
4406        )
4407        .await;
4408        let project_a = cx_a.update(|cx| {
4409            Project::local(
4410                client_a.clone(),
4411                client_a.user_store.clone(),
4412                lang_registry.clone(),
4413                fs.clone(),
4414                cx,
4415            )
4416        });
4417        let (worktree_a, _) = project_a
4418            .update(cx_a, |p, cx| {
4419                p.find_or_create_local_worktree("/dir", true, cx)
4420            })
4421            .await
4422            .unwrap();
4423        worktree_a
4424            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4425            .await;
4426        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4427        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4428        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4429
4430        // Join the worktree as client B.
4431        let project_b = Project::remote(
4432            project_id,
4433            client_b.clone(),
4434            client_b.user_store.clone(),
4435            lang_registry.clone(),
4436            fs.clone(),
4437            &mut cx_b.to_async(),
4438        )
4439        .await
4440        .unwrap();
4441        let mut params = cx_b.update(WorkspaceParams::test);
4442        params.languages = lang_registry.clone();
4443        params.client = client_b.client.clone();
4444        params.user_store = client_b.user_store.clone();
4445        params.project = project_b;
4446
4447        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4448        let editor_b = workspace_b
4449            .update(cx_b, |workspace, cx| {
4450                workspace.open_path((worktree_id, "one.rs"), true, cx)
4451            })
4452            .await
4453            .unwrap()
4454            .downcast::<Editor>()
4455            .unwrap();
4456        let fake_language_server = fake_language_servers.next().await.unwrap();
4457
4458        // Move cursor to a location that can be renamed.
4459        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4460            editor.select_ranges([7..7], None, cx);
4461            editor.rename(&Rename, cx).unwrap()
4462        });
4463
4464        fake_language_server
4465            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4466                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4467                assert_eq!(params.position, lsp::Position::new(0, 7));
4468                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4469                    lsp::Position::new(0, 6),
4470                    lsp::Position::new(0, 9),
4471                ))))
4472            })
4473            .next()
4474            .await
4475            .unwrap();
4476        prepare_rename.await.unwrap();
4477        editor_b.update(cx_b, |editor, cx| {
4478            let rename = editor.pending_rename().unwrap();
4479            let buffer = editor.buffer().read(cx).snapshot(cx);
4480            assert_eq!(
4481                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4482                6..9
4483            );
4484            rename.editor.update(cx, |rename_editor, cx| {
4485                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4486                    rename_buffer.edit([(0..3, "THREE")], cx);
4487                });
4488            });
4489        });
4490
4491        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4492            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4493        });
4494        fake_language_server
4495            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4496                assert_eq!(
4497                    params.text_document_position.text_document.uri.as_str(),
4498                    "file:///dir/one.rs"
4499                );
4500                assert_eq!(
4501                    params.text_document_position.position,
4502                    lsp::Position::new(0, 6)
4503                );
4504                assert_eq!(params.new_name, "THREE");
4505                Ok(Some(lsp::WorkspaceEdit {
4506                    changes: Some(
4507                        [
4508                            (
4509                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4510                                vec![lsp::TextEdit::new(
4511                                    lsp::Range::new(
4512                                        lsp::Position::new(0, 6),
4513                                        lsp::Position::new(0, 9),
4514                                    ),
4515                                    "THREE".to_string(),
4516                                )],
4517                            ),
4518                            (
4519                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4520                                vec![
4521                                    lsp::TextEdit::new(
4522                                        lsp::Range::new(
4523                                            lsp::Position::new(0, 24),
4524                                            lsp::Position::new(0, 27),
4525                                        ),
4526                                        "THREE".to_string(),
4527                                    ),
4528                                    lsp::TextEdit::new(
4529                                        lsp::Range::new(
4530                                            lsp::Position::new(0, 35),
4531                                            lsp::Position::new(0, 38),
4532                                        ),
4533                                        "THREE".to_string(),
4534                                    ),
4535                                ],
4536                            ),
4537                        ]
4538                        .into_iter()
4539                        .collect(),
4540                    ),
4541                    ..Default::default()
4542                }))
4543            })
4544            .next()
4545            .await
4546            .unwrap();
4547        confirm_rename.await.unwrap();
4548
4549        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4550            workspace
4551                .active_item(cx)
4552                .unwrap()
4553                .downcast::<Editor>()
4554                .unwrap()
4555        });
4556        rename_editor.update(cx_b, |editor, cx| {
4557            assert_eq!(
4558                editor.text(cx),
4559                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4560            );
4561            editor.undo(&Undo, cx);
4562            assert_eq!(
4563                editor.text(cx),
4564                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4565            );
4566            editor.redo(&Redo, cx);
4567            assert_eq!(
4568                editor.text(cx),
4569                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4570            );
4571        });
4572
4573        // Ensure temporary rename edits cannot be undone/redone.
4574        editor_b.update(cx_b, |editor, cx| {
4575            editor.undo(&Undo, cx);
4576            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4577            editor.undo(&Undo, cx);
4578            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4579            editor.redo(&Redo, cx);
4580            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4581        })
4582    }
4583
4584    #[gpui::test(iterations = 10)]
4585    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4586        cx_a.foreground().forbid_parking();
4587
4588        // Connect to a server as 2 clients.
4589        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4590        let client_a = server.create_client(cx_a, "user_a").await;
4591        let client_b = server.create_client(cx_b, "user_b").await;
4592
4593        // Create an org that includes these 2 users.
4594        let db = &server.app_state.db;
4595        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4596        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4597            .await
4598            .unwrap();
4599        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4600            .await
4601            .unwrap();
4602
4603        // Create a channel that includes all the users.
4604        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4605        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4606            .await
4607            .unwrap();
4608        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4609            .await
4610            .unwrap();
4611        db.create_channel_message(
4612            channel_id,
4613            client_b.current_user_id(&cx_b),
4614            "hello A, it's B.",
4615            OffsetDateTime::now_utc(),
4616            1,
4617        )
4618        .await
4619        .unwrap();
4620
4621        let channels_a = cx_a
4622            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4623        channels_a
4624            .condition(cx_a, |list, _| list.available_channels().is_some())
4625            .await;
4626        channels_a.read_with(cx_a, |list, _| {
4627            assert_eq!(
4628                list.available_channels().unwrap(),
4629                &[ChannelDetails {
4630                    id: channel_id.to_proto(),
4631                    name: "test-channel".to_string()
4632                }]
4633            )
4634        });
4635        let channel_a = channels_a.update(cx_a, |this, cx| {
4636            this.get_channel(channel_id.to_proto(), cx).unwrap()
4637        });
4638        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4639        channel_a
4640            .condition(&cx_a, |channel, _| {
4641                channel_messages(channel)
4642                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4643            })
4644            .await;
4645
4646        let channels_b = cx_b
4647            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4648        channels_b
4649            .condition(cx_b, |list, _| list.available_channels().is_some())
4650            .await;
4651        channels_b.read_with(cx_b, |list, _| {
4652            assert_eq!(
4653                list.available_channels().unwrap(),
4654                &[ChannelDetails {
4655                    id: channel_id.to_proto(),
4656                    name: "test-channel".to_string()
4657                }]
4658            )
4659        });
4660
4661        let channel_b = channels_b.update(cx_b, |this, cx| {
4662            this.get_channel(channel_id.to_proto(), cx).unwrap()
4663        });
4664        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4665        channel_b
4666            .condition(&cx_b, |channel, _| {
4667                channel_messages(channel)
4668                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4669            })
4670            .await;
4671
4672        channel_a
4673            .update(cx_a, |channel, cx| {
4674                channel
4675                    .send_message("oh, hi B.".to_string(), cx)
4676                    .unwrap()
4677                    .detach();
4678                let task = channel.send_message("sup".to_string(), cx).unwrap();
4679                assert_eq!(
4680                    channel_messages(channel),
4681                    &[
4682                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4683                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4684                        ("user_a".to_string(), "sup".to_string(), true)
4685                    ]
4686                );
4687                task
4688            })
4689            .await
4690            .unwrap();
4691
4692        channel_b
4693            .condition(&cx_b, |channel, _| {
4694                channel_messages(channel)
4695                    == [
4696                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4697                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4698                        ("user_a".to_string(), "sup".to_string(), false),
4699                    ]
4700            })
4701            .await;
4702
4703        assert_eq!(
4704            server
4705                .state()
4706                .await
4707                .channel(channel_id)
4708                .unwrap()
4709                .connection_ids
4710                .len(),
4711            2
4712        );
4713        cx_b.update(|_| drop(channel_b));
4714        server
4715            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4716            .await;
4717
4718        cx_a.update(|_| drop(channel_a));
4719        server
4720            .condition(|state| state.channel(channel_id).is_none())
4721            .await;
4722    }
4723
4724    #[gpui::test(iterations = 10)]
4725    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4726        cx_a.foreground().forbid_parking();
4727
4728        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4729        let client_a = server.create_client(cx_a, "user_a").await;
4730
4731        let db = &server.app_state.db;
4732        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4733        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4734        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4735            .await
4736            .unwrap();
4737        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4738            .await
4739            .unwrap();
4740
4741        let channels_a = cx_a
4742            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4743        channels_a
4744            .condition(cx_a, |list, _| list.available_channels().is_some())
4745            .await;
4746        let channel_a = channels_a.update(cx_a, |this, cx| {
4747            this.get_channel(channel_id.to_proto(), cx).unwrap()
4748        });
4749
4750        // Messages aren't allowed to be too long.
4751        channel_a
4752            .update(cx_a, |channel, cx| {
4753                let long_body = "this is long.\n".repeat(1024);
4754                channel.send_message(long_body, cx).unwrap()
4755            })
4756            .await
4757            .unwrap_err();
4758
4759        // Messages aren't allowed to be blank.
4760        channel_a.update(cx_a, |channel, cx| {
4761            channel.send_message(String::new(), cx).unwrap_err()
4762        });
4763
4764        // Leading and trailing whitespace are trimmed.
4765        channel_a
4766            .update(cx_a, |channel, cx| {
4767                channel
4768                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4769                    .unwrap()
4770            })
4771            .await
4772            .unwrap();
4773        assert_eq!(
4774            db.get_channel_messages(channel_id, 10, None)
4775                .await
4776                .unwrap()
4777                .iter()
4778                .map(|m| &m.body)
4779                .collect::<Vec<_>>(),
4780            &["surrounded by whitespace"]
4781        );
4782    }
4783
4784    #[gpui::test(iterations = 10)]
4785    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4786        cx_a.foreground().forbid_parking();
4787
4788        // Connect to a server as 2 clients.
4789        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4790        let client_a = server.create_client(cx_a, "user_a").await;
4791        let client_b = server.create_client(cx_b, "user_b").await;
4792        let mut status_b = client_b.status();
4793
4794        // Create an org that includes these 2 users.
4795        let db = &server.app_state.db;
4796        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4797        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4798            .await
4799            .unwrap();
4800        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4801            .await
4802            .unwrap();
4803
4804        // Create a channel that includes all the users.
4805        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4806        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4807            .await
4808            .unwrap();
4809        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4810            .await
4811            .unwrap();
4812        db.create_channel_message(
4813            channel_id,
4814            client_b.current_user_id(&cx_b),
4815            "hello A, it's B.",
4816            OffsetDateTime::now_utc(),
4817            2,
4818        )
4819        .await
4820        .unwrap();
4821
4822        let channels_a = cx_a
4823            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4824        channels_a
4825            .condition(cx_a, |list, _| list.available_channels().is_some())
4826            .await;
4827
4828        channels_a.read_with(cx_a, |list, _| {
4829            assert_eq!(
4830                list.available_channels().unwrap(),
4831                &[ChannelDetails {
4832                    id: channel_id.to_proto(),
4833                    name: "test-channel".to_string()
4834                }]
4835            )
4836        });
4837        let channel_a = channels_a.update(cx_a, |this, cx| {
4838            this.get_channel(channel_id.to_proto(), cx).unwrap()
4839        });
4840        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4841        channel_a
4842            .condition(&cx_a, |channel, _| {
4843                channel_messages(channel)
4844                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4845            })
4846            .await;
4847
4848        let channels_b = cx_b
4849            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4850        channels_b
4851            .condition(cx_b, |list, _| list.available_channels().is_some())
4852            .await;
4853        channels_b.read_with(cx_b, |list, _| {
4854            assert_eq!(
4855                list.available_channels().unwrap(),
4856                &[ChannelDetails {
4857                    id: channel_id.to_proto(),
4858                    name: "test-channel".to_string()
4859                }]
4860            )
4861        });
4862
4863        let channel_b = channels_b.update(cx_b, |this, cx| {
4864            this.get_channel(channel_id.to_proto(), cx).unwrap()
4865        });
4866        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4867        channel_b
4868            .condition(&cx_b, |channel, _| {
4869                channel_messages(channel)
4870                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4871            })
4872            .await;
4873
4874        // Disconnect client B, ensuring we can still access its cached channel data.
4875        server.forbid_connections();
4876        server.disconnect_client(client_b.current_user_id(&cx_b));
4877        cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4878        while !matches!(
4879            status_b.next().await,
4880            Some(client::Status::ReconnectionError { .. })
4881        ) {}
4882
4883        channels_b.read_with(cx_b, |channels, _| {
4884            assert_eq!(
4885                channels.available_channels().unwrap(),
4886                [ChannelDetails {
4887                    id: channel_id.to_proto(),
4888                    name: "test-channel".to_string()
4889                }]
4890            )
4891        });
4892        channel_b.read_with(cx_b, |channel, _| {
4893            assert_eq!(
4894                channel_messages(channel),
4895                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4896            )
4897        });
4898
4899        // Send a message from client B while it is disconnected.
4900        channel_b
4901            .update(cx_b, |channel, cx| {
4902                let task = channel
4903                    .send_message("can you see this?".to_string(), cx)
4904                    .unwrap();
4905                assert_eq!(
4906                    channel_messages(channel),
4907                    &[
4908                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4909                        ("user_b".to_string(), "can you see this?".to_string(), true)
4910                    ]
4911                );
4912                task
4913            })
4914            .await
4915            .unwrap_err();
4916
4917        // Send a message from client A while B is disconnected.
4918        channel_a
4919            .update(cx_a, |channel, cx| {
4920                channel
4921                    .send_message("oh, hi B.".to_string(), cx)
4922                    .unwrap()
4923                    .detach();
4924                let task = channel.send_message("sup".to_string(), cx).unwrap();
4925                assert_eq!(
4926                    channel_messages(channel),
4927                    &[
4928                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4929                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4930                        ("user_a".to_string(), "sup".to_string(), true)
4931                    ]
4932                );
4933                task
4934            })
4935            .await
4936            .unwrap();
4937
4938        // Give client B a chance to reconnect.
4939        server.allow_connections();
4940        cx_b.foreground().advance_clock(Duration::from_secs(10));
4941
4942        // Verify that B sees the new messages upon reconnection, as well as the message client B
4943        // sent while offline.
4944        channel_b
4945            .condition(&cx_b, |channel, _| {
4946                channel_messages(channel)
4947                    == [
4948                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4949                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4950                        ("user_a".to_string(), "sup".to_string(), false),
4951                        ("user_b".to_string(), "can you see this?".to_string(), false),
4952                    ]
4953            })
4954            .await;
4955
4956        // Ensure client A and B can communicate normally after reconnection.
4957        channel_a
4958            .update(cx_a, |channel, cx| {
4959                channel.send_message("you online?".to_string(), cx).unwrap()
4960            })
4961            .await
4962            .unwrap();
4963        channel_b
4964            .condition(&cx_b, |channel, _| {
4965                channel_messages(channel)
4966                    == [
4967                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4968                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4969                        ("user_a".to_string(), "sup".to_string(), false),
4970                        ("user_b".to_string(), "can you see this?".to_string(), false),
4971                        ("user_a".to_string(), "you online?".to_string(), false),
4972                    ]
4973            })
4974            .await;
4975
4976        channel_b
4977            .update(cx_b, |channel, cx| {
4978                channel.send_message("yep".to_string(), cx).unwrap()
4979            })
4980            .await
4981            .unwrap();
4982        channel_a
4983            .condition(&cx_a, |channel, _| {
4984                channel_messages(channel)
4985                    == [
4986                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4987                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4988                        ("user_a".to_string(), "sup".to_string(), false),
4989                        ("user_b".to_string(), "can you see this?".to_string(), false),
4990                        ("user_a".to_string(), "you online?".to_string(), false),
4991                        ("user_b".to_string(), "yep".to_string(), false),
4992                    ]
4993            })
4994            .await;
4995    }
4996
4997    #[gpui::test(iterations = 10)]
4998    async fn test_contacts(
4999        deterministic: Arc<Deterministic>,
5000        cx_a: &mut TestAppContext,
5001        cx_b: &mut TestAppContext,
5002        cx_c: &mut TestAppContext,
5003    ) {
5004        cx_a.foreground().forbid_parking();
5005        let lang_registry = Arc::new(LanguageRegistry::test());
5006        let fs = FakeFs::new(cx_a.background());
5007
5008        // Connect to a server as 3 clients.
5009        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5010        let client_a = server.create_client(cx_a, "user_a").await;
5011        let client_b = server.create_client(cx_b, "user_b").await;
5012        let client_c = server.create_client(cx_c, "user_c").await;
5013        server
5014            .make_contacts(vec![
5015                (&client_a, cx_a),
5016                (&client_b, cx_b),
5017                (&client_c, cx_c),
5018            ])
5019            .await;
5020
5021        deterministic.run_until_parked();
5022        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5023            client.user_store.read_with(*cx, |store, _| {
5024                assert_eq!(
5025                    contacts(store),
5026                    [
5027                        ("user_a", true, vec![]),
5028                        ("user_b", true, vec![]),
5029                        ("user_c", true, vec![])
5030                    ]
5031                )
5032            });
5033        }
5034
5035        // Share a worktree as client A.
5036        fs.create_dir(Path::new("/a")).await.unwrap();
5037
5038        let project_a = cx_a.update(|cx| {
5039            Project::local(
5040                client_a.clone(),
5041                client_a.user_store.clone(),
5042                lang_registry.clone(),
5043                fs.clone(),
5044                cx,
5045            )
5046        });
5047        let (worktree_a, _) = project_a
5048            .update(cx_a, |p, cx| {
5049                p.find_or_create_local_worktree("/a", true, cx)
5050            })
5051            .await
5052            .unwrap();
5053        worktree_a
5054            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
5055            .await;
5056
5057        deterministic.run_until_parked();
5058        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5059            client.user_store.read_with(*cx, |store, _| {
5060                assert_eq!(
5061                    contacts(store),
5062                    [
5063                        ("user_a", true, vec![("a", false, vec![])]),
5064                        ("user_b", true, vec![]),
5065                        ("user_c", true, vec![])
5066                    ]
5067                )
5068            });
5069        }
5070
5071        let project_id = project_a
5072            .update(cx_a, |project, _| project.next_remote_id())
5073            .await;
5074        project_a
5075            .update(cx_a, |project, cx| project.share(cx))
5076            .await
5077            .unwrap();
5078
5079        deterministic.run_until_parked();
5080        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5081            client.user_store.read_with(*cx, |store, _| {
5082                assert_eq!(
5083                    contacts(store),
5084                    [
5085                        ("user_a", true, vec![("a", true, vec![])]),
5086                        ("user_b", true, vec![]),
5087                        ("user_c", true, vec![])
5088                    ]
5089                )
5090            });
5091        }
5092
5093        let _project_b = Project::remote(
5094            project_id,
5095            client_b.clone(),
5096            client_b.user_store.clone(),
5097            lang_registry.clone(),
5098            fs.clone(),
5099            &mut cx_b.to_async(),
5100        )
5101        .await
5102        .unwrap();
5103
5104        deterministic.run_until_parked();
5105        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5106            client.user_store.read_with(*cx, |store, _| {
5107                assert_eq!(
5108                    contacts(store),
5109                    [
5110                        ("user_a", true, vec![("a", true, vec!["user_b"])]),
5111                        ("user_b", true, vec![]),
5112                        ("user_c", true, vec![])
5113                    ]
5114                )
5115            });
5116        }
5117
5118        project_a
5119            .condition(&cx_a, |project, _| {
5120                project.collaborators().contains_key(&client_b.peer_id)
5121            })
5122            .await;
5123
5124        cx_a.update(move |_| drop(project_a));
5125        deterministic.run_until_parked();
5126        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5127            client.user_store.read_with(*cx, |store, _| {
5128                assert_eq!(
5129                    contacts(store),
5130                    [
5131                        ("user_a", true, vec![]),
5132                        ("user_b", true, vec![]),
5133                        ("user_c", true, vec![])
5134                    ]
5135                )
5136            });
5137        }
5138
5139        server.disconnect_client(client_c.current_user_id(cx_c));
5140        server.forbid_connections();
5141        deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5142        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5143            client.user_store.read_with(*cx, |store, _| {
5144                assert_eq!(
5145                    contacts(store),
5146                    [
5147                        ("user_a", true, vec![]),
5148                        ("user_b", true, vec![]),
5149                        ("user_c", false, vec![])
5150                    ]
5151                )
5152            });
5153        }
5154        client_c
5155            .user_store
5156            .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5157
5158        server.allow_connections();
5159        client_c
5160            .authenticate_and_connect(false, &cx_c.to_async())
5161            .await
5162            .unwrap();
5163
5164        deterministic.run_until_parked();
5165        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5166            client.user_store.read_with(*cx, |store, _| {
5167                assert_eq!(
5168                    contacts(store),
5169                    [
5170                        ("user_a", true, vec![]),
5171                        ("user_b", true, vec![]),
5172                        ("user_c", true, vec![])
5173                    ]
5174                )
5175            });
5176        }
5177
5178        fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, bool, Vec<&str>)>)> {
5179            user_store
5180                .contacts()
5181                .iter()
5182                .map(|contact| {
5183                    let worktrees = contact
5184                        .projects
5185                        .iter()
5186                        .map(|p| {
5187                            (
5188                                p.worktree_root_names[0].as_str(),
5189                                p.is_shared,
5190                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5191                            )
5192                        })
5193                        .collect();
5194                    (
5195                        contact.user.github_login.as_str(),
5196                        contact.online,
5197                        worktrees,
5198                    )
5199                })
5200                .collect()
5201        }
5202    }
5203
5204    #[gpui::test(iterations = 10)]
5205    async fn test_contact_requests(
5206        executor: Arc<Deterministic>,
5207        cx_a: &mut TestAppContext,
5208        cx_a2: &mut TestAppContext,
5209        cx_b: &mut TestAppContext,
5210        cx_b2: &mut TestAppContext,
5211        cx_c: &mut TestAppContext,
5212        cx_c2: &mut TestAppContext,
5213    ) {
5214        cx_a.foreground().forbid_parking();
5215
5216        // Connect to a server as 3 clients.
5217        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5218        let client_a = server.create_client(cx_a, "user_a").await;
5219        let client_a2 = server.create_client(cx_a2, "user_a").await;
5220        let client_b = server.create_client(cx_b, "user_b").await;
5221        let client_b2 = server.create_client(cx_b2, "user_b").await;
5222        let client_c = server.create_client(cx_c, "user_c").await;
5223        let client_c2 = server.create_client(cx_c2, "user_c").await;
5224
5225        assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5226        assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5227        assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5228
5229        // User A and User C request that user B become their contact.
5230        client_a
5231            .user_store
5232            .update(cx_a, |store, cx| {
5233                store.request_contact(client_b.user_id().unwrap(), cx)
5234            })
5235            .await
5236            .unwrap();
5237        client_c
5238            .user_store
5239            .update(cx_c, |store, cx| {
5240                store.request_contact(client_b.user_id().unwrap(), cx)
5241            })
5242            .await
5243            .unwrap();
5244        executor.run_until_parked();
5245
5246        // All users see the pending request appear in all their clients.
5247        assert_eq!(
5248            client_a.summarize_contacts(&cx_a).outgoing_requests,
5249            &["user_b"]
5250        );
5251        assert_eq!(
5252            client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5253            &["user_b"]
5254        );
5255        assert_eq!(
5256            client_b.summarize_contacts(&cx_b).incoming_requests,
5257            &["user_a", "user_c"]
5258        );
5259        assert_eq!(
5260            client_b2.summarize_contacts(&cx_b2).incoming_requests,
5261            &["user_a", "user_c"]
5262        );
5263        assert_eq!(
5264            client_c.summarize_contacts(&cx_c).outgoing_requests,
5265            &["user_b"]
5266        );
5267        assert_eq!(
5268            client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5269            &["user_b"]
5270        );
5271
5272        // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5273        disconnect_and_reconnect(&client_a, cx_a).await;
5274        disconnect_and_reconnect(&client_b, cx_b).await;
5275        disconnect_and_reconnect(&client_c, cx_c).await;
5276        executor.run_until_parked();
5277        assert_eq!(
5278            client_a.summarize_contacts(&cx_a).outgoing_requests,
5279            &["user_b"]
5280        );
5281        assert_eq!(
5282            client_b.summarize_contacts(&cx_b).incoming_requests,
5283            &["user_a", "user_c"]
5284        );
5285        assert_eq!(
5286            client_c.summarize_contacts(&cx_c).outgoing_requests,
5287            &["user_b"]
5288        );
5289
5290        // User B accepts the request from user A.
5291        client_b
5292            .user_store
5293            .update(cx_b, |store, cx| {
5294                store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5295            })
5296            .await
5297            .unwrap();
5298
5299        executor.run_until_parked();
5300
5301        // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5302        let contacts_b = client_b.summarize_contacts(&cx_b);
5303        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5304        assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5305        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5306        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5307        assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5308
5309        // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5310        let contacts_a = client_a.summarize_contacts(&cx_a);
5311        assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5312        assert!(contacts_a.outgoing_requests.is_empty());
5313        let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5314        assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5315        assert!(contacts_a2.outgoing_requests.is_empty());
5316
5317        // Contacts are present upon connecting (tested here via disconnect/reconnect)
5318        disconnect_and_reconnect(&client_a, cx_a).await;
5319        disconnect_and_reconnect(&client_b, cx_b).await;
5320        disconnect_and_reconnect(&client_c, cx_c).await;
5321        executor.run_until_parked();
5322        assert_eq!(
5323            client_a.summarize_contacts(&cx_a).current,
5324            &["user_a", "user_b"]
5325        );
5326        assert_eq!(
5327            client_b.summarize_contacts(&cx_b).current,
5328            &["user_a", "user_b"]
5329        );
5330        assert_eq!(
5331            client_b.summarize_contacts(&cx_b).incoming_requests,
5332            &["user_c"]
5333        );
5334        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5335        assert_eq!(
5336            client_c.summarize_contacts(&cx_c).outgoing_requests,
5337            &["user_b"]
5338        );
5339
5340        // User B rejects the request from user C.
5341        client_b
5342            .user_store
5343            .update(cx_b, |store, cx| {
5344                store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5345            })
5346            .await
5347            .unwrap();
5348
5349        executor.run_until_parked();
5350
5351        // User B doesn't see user C as their contact, and the incoming request from them is removed.
5352        let contacts_b = client_b.summarize_contacts(&cx_b);
5353        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5354        assert!(contacts_b.incoming_requests.is_empty());
5355        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5356        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5357        assert!(contacts_b2.incoming_requests.is_empty());
5358
5359        // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5360        let contacts_c = client_c.summarize_contacts(&cx_c);
5361        assert_eq!(contacts_c.current, &["user_c"]);
5362        assert!(contacts_c.outgoing_requests.is_empty());
5363        let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5364        assert_eq!(contacts_c2.current, &["user_c"]);
5365        assert!(contacts_c2.outgoing_requests.is_empty());
5366
5367        // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5368        disconnect_and_reconnect(&client_a, cx_a).await;
5369        disconnect_and_reconnect(&client_b, cx_b).await;
5370        disconnect_and_reconnect(&client_c, cx_c).await;
5371        executor.run_until_parked();
5372        assert_eq!(
5373            client_a.summarize_contacts(&cx_a).current,
5374            &["user_a", "user_b"]
5375        );
5376        assert_eq!(
5377            client_b.summarize_contacts(&cx_b).current,
5378            &["user_a", "user_b"]
5379        );
5380        assert!(client_b
5381            .summarize_contacts(&cx_b)
5382            .incoming_requests
5383            .is_empty());
5384        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5385        assert!(client_c
5386            .summarize_contacts(&cx_c)
5387            .outgoing_requests
5388            .is_empty());
5389
5390        async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5391            client.disconnect(&cx.to_async()).unwrap();
5392            client.clear_contacts(cx).await;
5393            client
5394                .authenticate_and_connect(false, &cx.to_async())
5395                .await
5396                .unwrap();
5397        }
5398    }
5399
5400    #[gpui::test(iterations = 10)]
5401    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5402        cx_a.foreground().forbid_parking();
5403        let fs = FakeFs::new(cx_a.background());
5404
5405        // 2 clients connect to a server.
5406        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5407        let mut client_a = server.create_client(cx_a, "user_a").await;
5408        let mut client_b = server.create_client(cx_b, "user_b").await;
5409        server
5410            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5411            .await;
5412        cx_a.update(editor::init);
5413        cx_b.update(editor::init);
5414
5415        // Client A shares a project.
5416        fs.insert_tree(
5417            "/a",
5418            json!({
5419                "1.txt": "one",
5420                "2.txt": "two",
5421                "3.txt": "three",
5422            }),
5423        )
5424        .await;
5425        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5426        project_a
5427            .update(cx_a, |project, cx| project.share(cx))
5428            .await
5429            .unwrap();
5430
5431        // Client B joins the project.
5432        let project_b = client_b
5433            .build_remote_project(
5434                project_a
5435                    .read_with(cx_a, |project, _| project.remote_id())
5436                    .unwrap(),
5437                cx_b,
5438            )
5439            .await;
5440
5441        // Client A opens some editors.
5442        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5443        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5444        let editor_a1 = workspace_a
5445            .update(cx_a, |workspace, cx| {
5446                workspace.open_path((worktree_id, "1.txt"), true, cx)
5447            })
5448            .await
5449            .unwrap()
5450            .downcast::<Editor>()
5451            .unwrap();
5452        let editor_a2 = workspace_a
5453            .update(cx_a, |workspace, cx| {
5454                workspace.open_path((worktree_id, "2.txt"), true, cx)
5455            })
5456            .await
5457            .unwrap()
5458            .downcast::<Editor>()
5459            .unwrap();
5460
5461        // Client B opens an editor.
5462        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5463        let editor_b1 = workspace_b
5464            .update(cx_b, |workspace, cx| {
5465                workspace.open_path((worktree_id, "1.txt"), true, cx)
5466            })
5467            .await
5468            .unwrap()
5469            .downcast::<Editor>()
5470            .unwrap();
5471
5472        let client_a_id = project_b.read_with(cx_b, |project, _| {
5473            project.collaborators().values().next().unwrap().peer_id
5474        });
5475        let client_b_id = project_a.read_with(cx_a, |project, _| {
5476            project.collaborators().values().next().unwrap().peer_id
5477        });
5478
5479        // When client B starts following client A, all visible view states are replicated to client B.
5480        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5481        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5482        workspace_b
5483            .update(cx_b, |workspace, cx| {
5484                workspace
5485                    .toggle_follow(&ToggleFollow(client_a_id), cx)
5486                    .unwrap()
5487            })
5488            .await
5489            .unwrap();
5490        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5491            workspace
5492                .active_item(cx)
5493                .unwrap()
5494                .downcast::<Editor>()
5495                .unwrap()
5496        });
5497        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5498        assert_eq!(
5499            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5500            Some((worktree_id, "2.txt").into())
5501        );
5502        assert_eq!(
5503            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5504            vec![2..3]
5505        );
5506        assert_eq!(
5507            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5508            vec![0..1]
5509        );
5510
5511        // When client A activates a different editor, client B does so as well.
5512        workspace_a.update(cx_a, |workspace, cx| {
5513            workspace.activate_item(&editor_a1, cx)
5514        });
5515        workspace_b
5516            .condition(cx_b, |workspace, cx| {
5517                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5518            })
5519            .await;
5520
5521        // When client A navigates back and forth, client B does so as well.
5522        workspace_a
5523            .update(cx_a, |workspace, cx| {
5524                workspace::Pane::go_back(workspace, None, cx)
5525            })
5526            .await;
5527        workspace_b
5528            .condition(cx_b, |workspace, cx| {
5529                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5530            })
5531            .await;
5532
5533        workspace_a
5534            .update(cx_a, |workspace, cx| {
5535                workspace::Pane::go_forward(workspace, None, cx)
5536            })
5537            .await;
5538        workspace_b
5539            .condition(cx_b, |workspace, cx| {
5540                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5541            })
5542            .await;
5543
5544        // Changes to client A's editor are reflected on client B.
5545        editor_a1.update(cx_a, |editor, cx| {
5546            editor.select_ranges([1..1, 2..2], None, cx);
5547        });
5548        editor_b1
5549            .condition(cx_b, |editor, cx| {
5550                editor.selected_ranges(cx) == vec![1..1, 2..2]
5551            })
5552            .await;
5553
5554        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5555        editor_b1
5556            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5557            .await;
5558
5559        editor_a1.update(cx_a, |editor, cx| {
5560            editor.select_ranges([3..3], None, cx);
5561            editor.set_scroll_position(vec2f(0., 100.), cx);
5562        });
5563        editor_b1
5564            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5565            .await;
5566
5567        // After unfollowing, client B stops receiving updates from client A.
5568        workspace_b.update(cx_b, |workspace, cx| {
5569            workspace.unfollow(&workspace.active_pane().clone(), cx)
5570        });
5571        workspace_a.update(cx_a, |workspace, cx| {
5572            workspace.activate_item(&editor_a2, cx)
5573        });
5574        cx_a.foreground().run_until_parked();
5575        assert_eq!(
5576            workspace_b.read_with(cx_b, |workspace, cx| workspace
5577                .active_item(cx)
5578                .unwrap()
5579                .id()),
5580            editor_b1.id()
5581        );
5582
5583        // Client A starts following client B.
5584        workspace_a
5585            .update(cx_a, |workspace, cx| {
5586                workspace
5587                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5588                    .unwrap()
5589            })
5590            .await
5591            .unwrap();
5592        assert_eq!(
5593            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5594            Some(client_b_id)
5595        );
5596        assert_eq!(
5597            workspace_a.read_with(cx_a, |workspace, cx| workspace
5598                .active_item(cx)
5599                .unwrap()
5600                .id()),
5601            editor_a1.id()
5602        );
5603
5604        // Following interrupts when client B disconnects.
5605        client_b.disconnect(&cx_b.to_async()).unwrap();
5606        cx_a.foreground().run_until_parked();
5607        assert_eq!(
5608            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5609            None
5610        );
5611    }
5612
5613    #[gpui::test(iterations = 10)]
5614    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5615        cx_a.foreground().forbid_parking();
5616        let fs = FakeFs::new(cx_a.background());
5617
5618        // 2 clients connect to a server.
5619        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5620        let mut client_a = server.create_client(cx_a, "user_a").await;
5621        let mut client_b = server.create_client(cx_b, "user_b").await;
5622        server
5623            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5624            .await;
5625        cx_a.update(editor::init);
5626        cx_b.update(editor::init);
5627
5628        // Client A shares a project.
5629        fs.insert_tree(
5630            "/a",
5631            json!({
5632                "1.txt": "one",
5633                "2.txt": "two",
5634                "3.txt": "three",
5635                "4.txt": "four",
5636            }),
5637        )
5638        .await;
5639        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5640        project_a
5641            .update(cx_a, |project, cx| project.share(cx))
5642            .await
5643            .unwrap();
5644
5645        // Client B joins the project.
5646        let project_b = client_b
5647            .build_remote_project(
5648                project_a
5649                    .read_with(cx_a, |project, _| project.remote_id())
5650                    .unwrap(),
5651                cx_b,
5652            )
5653            .await;
5654
5655        // Client A opens some editors.
5656        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5657        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5658        let _editor_a1 = workspace_a
5659            .update(cx_a, |workspace, cx| {
5660                workspace.open_path((worktree_id, "1.txt"), true, cx)
5661            })
5662            .await
5663            .unwrap()
5664            .downcast::<Editor>()
5665            .unwrap();
5666
5667        // Client B opens an editor.
5668        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5669        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5670        let _editor_b1 = workspace_b
5671            .update(cx_b, |workspace, cx| {
5672                workspace.open_path((worktree_id, "2.txt"), true, cx)
5673            })
5674            .await
5675            .unwrap()
5676            .downcast::<Editor>()
5677            .unwrap();
5678
5679        // Clients A and B follow each other in split panes
5680        workspace_a
5681            .update(cx_a, |workspace, cx| {
5682                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5683                assert_ne!(*workspace.active_pane(), pane_a1);
5684                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5685                workspace
5686                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5687                    .unwrap()
5688            })
5689            .await
5690            .unwrap();
5691        workspace_b
5692            .update(cx_b, |workspace, cx| {
5693                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5694                assert_ne!(*workspace.active_pane(), pane_b1);
5695                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5696                workspace
5697                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5698                    .unwrap()
5699            })
5700            .await
5701            .unwrap();
5702
5703        workspace_a
5704            .update(cx_a, |workspace, cx| {
5705                workspace.activate_next_pane(cx);
5706                assert_eq!(*workspace.active_pane(), pane_a1);
5707                workspace.open_path((worktree_id, "3.txt"), true, cx)
5708            })
5709            .await
5710            .unwrap();
5711        workspace_b
5712            .update(cx_b, |workspace, cx| {
5713                workspace.activate_next_pane(cx);
5714                assert_eq!(*workspace.active_pane(), pane_b1);
5715                workspace.open_path((worktree_id, "4.txt"), true, cx)
5716            })
5717            .await
5718            .unwrap();
5719        cx_a.foreground().run_until_parked();
5720
5721        // Ensure leader updates don't change the active pane of followers
5722        workspace_a.read_with(cx_a, |workspace, _| {
5723            assert_eq!(*workspace.active_pane(), pane_a1);
5724        });
5725        workspace_b.read_with(cx_b, |workspace, _| {
5726            assert_eq!(*workspace.active_pane(), pane_b1);
5727        });
5728
5729        // Ensure peers following each other doesn't cause an infinite loop.
5730        assert_eq!(
5731            workspace_a.read_with(cx_a, |workspace, cx| workspace
5732                .active_item(cx)
5733                .unwrap()
5734                .project_path(cx)),
5735            Some((worktree_id, "3.txt").into())
5736        );
5737        workspace_a.update(cx_a, |workspace, cx| {
5738            assert_eq!(
5739                workspace.active_item(cx).unwrap().project_path(cx),
5740                Some((worktree_id, "3.txt").into())
5741            );
5742            workspace.activate_next_pane(cx);
5743            assert_eq!(
5744                workspace.active_item(cx).unwrap().project_path(cx),
5745                Some((worktree_id, "4.txt").into())
5746            );
5747        });
5748        workspace_b.update(cx_b, |workspace, cx| {
5749            assert_eq!(
5750                workspace.active_item(cx).unwrap().project_path(cx),
5751                Some((worktree_id, "4.txt").into())
5752            );
5753            workspace.activate_next_pane(cx);
5754            assert_eq!(
5755                workspace.active_item(cx).unwrap().project_path(cx),
5756                Some((worktree_id, "3.txt").into())
5757            );
5758        });
5759    }
5760
5761    #[gpui::test(iterations = 10)]
5762    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5763        cx_a.foreground().forbid_parking();
5764        let fs = FakeFs::new(cx_a.background());
5765
5766        // 2 clients connect to a server.
5767        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5768        let mut client_a = server.create_client(cx_a, "user_a").await;
5769        let mut client_b = server.create_client(cx_b, "user_b").await;
5770        server
5771            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5772            .await;
5773        cx_a.update(editor::init);
5774        cx_b.update(editor::init);
5775
5776        // Client A shares a project.
5777        fs.insert_tree(
5778            "/a",
5779            json!({
5780                "1.txt": "one",
5781                "2.txt": "two",
5782                "3.txt": "three",
5783            }),
5784        )
5785        .await;
5786        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5787        project_a
5788            .update(cx_a, |project, cx| project.share(cx))
5789            .await
5790            .unwrap();
5791
5792        // Client B joins the project.
5793        let project_b = client_b
5794            .build_remote_project(
5795                project_a
5796                    .read_with(cx_a, |project, _| project.remote_id())
5797                    .unwrap(),
5798                cx_b,
5799            )
5800            .await;
5801
5802        // Client A opens some editors.
5803        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5804        let _editor_a1 = workspace_a
5805            .update(cx_a, |workspace, cx| {
5806                workspace.open_path((worktree_id, "1.txt"), true, cx)
5807            })
5808            .await
5809            .unwrap()
5810            .downcast::<Editor>()
5811            .unwrap();
5812
5813        // Client B starts following client A.
5814        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5815        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5816        let leader_id = project_b.read_with(cx_b, |project, _| {
5817            project.collaborators().values().next().unwrap().peer_id
5818        });
5819        workspace_b
5820            .update(cx_b, |workspace, cx| {
5821                workspace
5822                    .toggle_follow(&ToggleFollow(leader_id), cx)
5823                    .unwrap()
5824            })
5825            .await
5826            .unwrap();
5827        assert_eq!(
5828            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5829            Some(leader_id)
5830        );
5831        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5832            workspace
5833                .active_item(cx)
5834                .unwrap()
5835                .downcast::<Editor>()
5836                .unwrap()
5837        });
5838
5839        // When client B moves, it automatically stops following client A.
5840        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5841        assert_eq!(
5842            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5843            None
5844        );
5845
5846        workspace_b
5847            .update(cx_b, |workspace, cx| {
5848                workspace
5849                    .toggle_follow(&ToggleFollow(leader_id), cx)
5850                    .unwrap()
5851            })
5852            .await
5853            .unwrap();
5854        assert_eq!(
5855            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5856            Some(leader_id)
5857        );
5858
5859        // When client B edits, it automatically stops following client A.
5860        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5861        assert_eq!(
5862            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5863            None
5864        );
5865
5866        workspace_b
5867            .update(cx_b, |workspace, cx| {
5868                workspace
5869                    .toggle_follow(&ToggleFollow(leader_id), cx)
5870                    .unwrap()
5871            })
5872            .await
5873            .unwrap();
5874        assert_eq!(
5875            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5876            Some(leader_id)
5877        );
5878
5879        // When client B scrolls, it automatically stops following client A.
5880        editor_b2.update(cx_b, |editor, cx| {
5881            editor.set_scroll_position(vec2f(0., 3.), cx)
5882        });
5883        assert_eq!(
5884            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5885            None
5886        );
5887
5888        workspace_b
5889            .update(cx_b, |workspace, cx| {
5890                workspace
5891                    .toggle_follow(&ToggleFollow(leader_id), cx)
5892                    .unwrap()
5893            })
5894            .await
5895            .unwrap();
5896        assert_eq!(
5897            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5898            Some(leader_id)
5899        );
5900
5901        // When client B activates a different pane, it continues following client A in the original pane.
5902        workspace_b.update(cx_b, |workspace, cx| {
5903            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5904        });
5905        assert_eq!(
5906            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5907            Some(leader_id)
5908        );
5909
5910        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5911        assert_eq!(
5912            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5913            Some(leader_id)
5914        );
5915
5916        // When client B activates a different item in the original pane, it automatically stops following client A.
5917        workspace_b
5918            .update(cx_b, |workspace, cx| {
5919                workspace.open_path((worktree_id, "2.txt"), true, cx)
5920            })
5921            .await
5922            .unwrap();
5923        assert_eq!(
5924            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5925            None
5926        );
5927    }
5928
5929    #[gpui::test(iterations = 100)]
5930    async fn test_random_collaboration(
5931        cx: &mut TestAppContext,
5932        deterministic: Arc<Deterministic>,
5933        rng: StdRng,
5934    ) {
5935        cx.foreground().forbid_parking();
5936        let max_peers = env::var("MAX_PEERS")
5937            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5938            .unwrap_or(5);
5939        assert!(max_peers <= 5);
5940
5941        let max_operations = env::var("OPERATIONS")
5942            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5943            .unwrap_or(10);
5944
5945        let rng = Arc::new(Mutex::new(rng));
5946
5947        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5948        let host_language_registry = Arc::new(LanguageRegistry::test());
5949
5950        let fs = FakeFs::new(cx.background());
5951        fs.insert_tree("/_collab", json!({"init": ""})).await;
5952
5953        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5954        let db = server.app_state.db.clone();
5955        let host_user_id = db.create_user("host", false).await.unwrap();
5956        for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
5957            let guest_user_id = db.create_user(username, false).await.unwrap();
5958            server
5959                .app_state
5960                .db
5961                .send_contact_request(guest_user_id, host_user_id)
5962                .await
5963                .unwrap();
5964            server
5965                .app_state
5966                .db
5967                .respond_to_contact_request(host_user_id, guest_user_id, true)
5968                .await
5969                .unwrap();
5970        }
5971
5972        let mut clients = Vec::new();
5973        let mut user_ids = Vec::new();
5974        let mut op_start_signals = Vec::new();
5975
5976        let mut next_entity_id = 100000;
5977        let mut host_cx = TestAppContext::new(
5978            cx.foreground_platform(),
5979            cx.platform(),
5980            deterministic.build_foreground(next_entity_id),
5981            deterministic.build_background(),
5982            cx.font_cache(),
5983            cx.leak_detector(),
5984            next_entity_id,
5985        );
5986        let host = server.create_client(&mut host_cx, "host").await;
5987        let host_project = host_cx.update(|cx| {
5988            Project::local(
5989                host.client.clone(),
5990                host.user_store.clone(),
5991                host_language_registry.clone(),
5992                fs.clone(),
5993                cx,
5994            )
5995        });
5996        let host_project_id = host_project
5997            .update(&mut host_cx, |p, _| p.next_remote_id())
5998            .await;
5999
6000        let (collab_worktree, _) = host_project
6001            .update(&mut host_cx, |project, cx| {
6002                project.find_or_create_local_worktree("/_collab", true, cx)
6003            })
6004            .await
6005            .unwrap();
6006        collab_worktree
6007            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
6008            .await;
6009        host_project
6010            .update(&mut host_cx, |project, cx| project.share(cx))
6011            .await
6012            .unwrap();
6013
6014        // Set up fake language servers.
6015        let mut language = Language::new(
6016            LanguageConfig {
6017                name: "Rust".into(),
6018                path_suffixes: vec!["rs".to_string()],
6019                ..Default::default()
6020            },
6021            None,
6022        );
6023        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6024            name: "the-fake-language-server",
6025            capabilities: lsp::LanguageServer::full_capabilities(),
6026            initializer: Some(Box::new({
6027                let rng = rng.clone();
6028                let fs = fs.clone();
6029                let project = host_project.downgrade();
6030                move |fake_server: &mut FakeLanguageServer| {
6031                    fake_server.handle_request::<lsp::request::Completion, _, _>(
6032                        |_, _| async move {
6033                            Ok(Some(lsp::CompletionResponse::Array(vec![
6034                                lsp::CompletionItem {
6035                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6036                                        range: lsp::Range::new(
6037                                            lsp::Position::new(0, 0),
6038                                            lsp::Position::new(0, 0),
6039                                        ),
6040                                        new_text: "the-new-text".to_string(),
6041                                    })),
6042                                    ..Default::default()
6043                                },
6044                            ])))
6045                        },
6046                    );
6047
6048                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6049                        |_, _| async move {
6050                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6051                                lsp::CodeAction {
6052                                    title: "the-code-action".to_string(),
6053                                    ..Default::default()
6054                                },
6055                            )]))
6056                        },
6057                    );
6058
6059                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6060                        |params, _| async move {
6061                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6062                                params.position,
6063                                params.position,
6064                            ))))
6065                        },
6066                    );
6067
6068                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6069                        let fs = fs.clone();
6070                        let rng = rng.clone();
6071                        move |_, _| {
6072                            let fs = fs.clone();
6073                            let rng = rng.clone();
6074                            async move {
6075                                let files = fs.files().await;
6076                                let mut rng = rng.lock();
6077                                let count = rng.gen_range::<usize, _>(1..3);
6078                                let files = (0..count)
6079                                    .map(|_| files.choose(&mut *rng).unwrap())
6080                                    .collect::<Vec<_>>();
6081                                log::info!("LSP: Returning definitions in files {:?}", &files);
6082                                Ok(Some(lsp::GotoDefinitionResponse::Array(
6083                                    files
6084                                        .into_iter()
6085                                        .map(|file| lsp::Location {
6086                                            uri: lsp::Url::from_file_path(file).unwrap(),
6087                                            range: Default::default(),
6088                                        })
6089                                        .collect(),
6090                                )))
6091                            }
6092                        }
6093                    });
6094
6095                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6096                        let rng = rng.clone();
6097                        let project = project.clone();
6098                        move |params, mut cx| {
6099                            let highlights = if let Some(project) = project.upgrade(&cx) {
6100                                project.update(&mut cx, |project, cx| {
6101                                    let path = params
6102                                        .text_document_position_params
6103                                        .text_document
6104                                        .uri
6105                                        .to_file_path()
6106                                        .unwrap();
6107                                    let (worktree, relative_path) =
6108                                        project.find_local_worktree(&path, cx)?;
6109                                    let project_path =
6110                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
6111                                    let buffer =
6112                                        project.get_open_buffer(&project_path, cx)?.read(cx);
6113
6114                                    let mut highlights = Vec::new();
6115                                    let highlight_count = rng.lock().gen_range(1..=5);
6116                                    let mut prev_end = 0;
6117                                    for _ in 0..highlight_count {
6118                                        let range =
6119                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
6120
6121                                        highlights.push(lsp::DocumentHighlight {
6122                                            range: range_to_lsp(range.to_point_utf16(buffer)),
6123                                            kind: Some(lsp::DocumentHighlightKind::READ),
6124                                        });
6125                                        prev_end = range.end;
6126                                    }
6127                                    Some(highlights)
6128                                })
6129                            } else {
6130                                None
6131                            };
6132                            async move { Ok(highlights) }
6133                        }
6134                    });
6135                }
6136            })),
6137            ..Default::default()
6138        });
6139        host_language_registry.add(Arc::new(language));
6140
6141        let op_start_signal = futures::channel::mpsc::unbounded();
6142        user_ids.push(host.current_user_id(&host_cx));
6143        op_start_signals.push(op_start_signal.0);
6144        clients.push(host_cx.foreground().spawn(host.simulate_host(
6145            host_project,
6146            op_start_signal.1,
6147            rng.clone(),
6148            host_cx,
6149        )));
6150
6151        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6152            rng.lock().gen_range(0..max_operations)
6153        } else {
6154            max_operations
6155        };
6156        let mut available_guests = vec![
6157            "guest-1".to_string(),
6158            "guest-2".to_string(),
6159            "guest-3".to_string(),
6160            "guest-4".to_string(),
6161        ];
6162        let mut operations = 0;
6163        while operations < max_operations {
6164            if operations == disconnect_host_at {
6165                server.disconnect_client(user_ids[0]);
6166                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6167                drop(op_start_signals);
6168                let mut clients = futures::future::join_all(clients).await;
6169                cx.foreground().run_until_parked();
6170
6171                let (host, mut host_cx, host_err) = clients.remove(0);
6172                if let Some(host_err) = host_err {
6173                    log::error!("host error - {}", host_err);
6174                }
6175                host.project
6176                    .as_ref()
6177                    .unwrap()
6178                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6179                for (guest, mut guest_cx, guest_err) in clients {
6180                    if let Some(guest_err) = guest_err {
6181                        log::error!("{} error - {}", guest.username, guest_err);
6182                    }
6183                    // TODO
6184                    // let contacts = server
6185                    //     .store
6186                    //     .read()
6187                    //     .await
6188                    //     .contacts_for_user(guest.current_user_id(&guest_cx));
6189                    // assert!(!contacts
6190                    //     .iter()
6191                    //     .flat_map(|contact| &contact.projects)
6192                    //     .any(|project| project.id == host_project_id));
6193                    guest
6194                        .project
6195                        .as_ref()
6196                        .unwrap()
6197                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6198                    guest_cx.update(|_| drop(guest));
6199                }
6200                host_cx.update(|_| drop(host));
6201
6202                return;
6203            }
6204
6205            let distribution = rng.lock().gen_range(0..100);
6206            match distribution {
6207                0..=19 if !available_guests.is_empty() => {
6208                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
6209                    let guest_username = available_guests.remove(guest_ix);
6210                    log::info!("Adding new connection for {}", guest_username);
6211                    next_entity_id += 100000;
6212                    let mut guest_cx = TestAppContext::new(
6213                        cx.foreground_platform(),
6214                        cx.platform(),
6215                        deterministic.build_foreground(next_entity_id),
6216                        deterministic.build_background(),
6217                        cx.font_cache(),
6218                        cx.leak_detector(),
6219                        next_entity_id,
6220                    );
6221                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
6222                    let guest_project = Project::remote(
6223                        host_project_id,
6224                        guest.client.clone(),
6225                        guest.user_store.clone(),
6226                        guest_lang_registry.clone(),
6227                        FakeFs::new(cx.background()),
6228                        &mut guest_cx.to_async(),
6229                    )
6230                    .await
6231                    .unwrap();
6232                    let op_start_signal = futures::channel::mpsc::unbounded();
6233                    user_ids.push(guest.current_user_id(&guest_cx));
6234                    op_start_signals.push(op_start_signal.0);
6235                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6236                        guest_username.clone(),
6237                        guest_project,
6238                        op_start_signal.1,
6239                        rng.clone(),
6240                        guest_cx,
6241                    )));
6242
6243                    log::info!("Added connection for {}", guest_username);
6244                    operations += 1;
6245                }
6246                20..=29 if clients.len() > 1 => {
6247                    let guest_ix = rng.lock().gen_range(1..clients.len());
6248                    log::info!("Removing guest {}", user_ids[guest_ix]);
6249                    let removed_guest_id = user_ids.remove(guest_ix);
6250                    let guest = clients.remove(guest_ix);
6251                    op_start_signals.remove(guest_ix);
6252                    server.disconnect_client(removed_guest_id);
6253                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6254                    let (guest, mut guest_cx, guest_err) = guest.await;
6255                    if let Some(guest_err) = guest_err {
6256                        log::error!("{} error - {}", guest.username, guest_err);
6257                    }
6258                    guest
6259                        .project
6260                        .as_ref()
6261                        .unwrap()
6262                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6263                    // TODO
6264                    // for user_id in &user_ids {
6265                    //     for contact in server.store.read().await.contacts_for_user(*user_id) {
6266                    //         assert_ne!(
6267                    //             contact.user_id, removed_guest_id.0 as u64,
6268                    //             "removed guest is still a contact of another peer"
6269                    //         );
6270                    //         for project in contact.projects {
6271                    //             for project_guest_id in project.guests {
6272                    //                 assert_ne!(
6273                    //                     project_guest_id, removed_guest_id.0 as u64,
6274                    //                     "removed guest appears as still participating on a project"
6275                    //                 );
6276                    //             }
6277                    //         }
6278                    //     }
6279                    // }
6280
6281                    log::info!("{} removed", guest.username);
6282                    available_guests.push(guest.username.clone());
6283                    guest_cx.update(|_| drop(guest));
6284
6285                    operations += 1;
6286                }
6287                _ => {
6288                    while operations < max_operations && rng.lock().gen_bool(0.7) {
6289                        op_start_signals
6290                            .choose(&mut *rng.lock())
6291                            .unwrap()
6292                            .unbounded_send(())
6293                            .unwrap();
6294                        operations += 1;
6295                    }
6296
6297                    if rng.lock().gen_bool(0.8) {
6298                        cx.foreground().run_until_parked();
6299                    }
6300                }
6301            }
6302        }
6303
6304        drop(op_start_signals);
6305        let mut clients = futures::future::join_all(clients).await;
6306        cx.foreground().run_until_parked();
6307
6308        let (host_client, mut host_cx, host_err) = clients.remove(0);
6309        if let Some(host_err) = host_err {
6310            panic!("host error - {}", host_err);
6311        }
6312        let host_project = host_client.project.as_ref().unwrap();
6313        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6314            project
6315                .worktrees(cx)
6316                .map(|worktree| {
6317                    let snapshot = worktree.read(cx).snapshot();
6318                    (snapshot.id(), snapshot)
6319                })
6320                .collect::<BTreeMap<_, _>>()
6321        });
6322
6323        host_client
6324            .project
6325            .as_ref()
6326            .unwrap()
6327            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6328
6329        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6330            if let Some(guest_err) = guest_err {
6331                panic!("{} error - {}", guest_client.username, guest_err);
6332            }
6333            let worktree_snapshots =
6334                guest_client
6335                    .project
6336                    .as_ref()
6337                    .unwrap()
6338                    .read_with(&guest_cx, |project, cx| {
6339                        project
6340                            .worktrees(cx)
6341                            .map(|worktree| {
6342                                let worktree = worktree.read(cx);
6343                                (worktree.id(), worktree.snapshot())
6344                            })
6345                            .collect::<BTreeMap<_, _>>()
6346                    });
6347
6348            assert_eq!(
6349                worktree_snapshots.keys().collect::<Vec<_>>(),
6350                host_worktree_snapshots.keys().collect::<Vec<_>>(),
6351                "{} has different worktrees than the host",
6352                guest_client.username
6353            );
6354            for (id, host_snapshot) in &host_worktree_snapshots {
6355                let guest_snapshot = &worktree_snapshots[id];
6356                assert_eq!(
6357                    guest_snapshot.root_name(),
6358                    host_snapshot.root_name(),
6359                    "{} has different root name than the host for worktree {}",
6360                    guest_client.username,
6361                    id
6362                );
6363                assert_eq!(
6364                    guest_snapshot.entries(false).collect::<Vec<_>>(),
6365                    host_snapshot.entries(false).collect::<Vec<_>>(),
6366                    "{} has different snapshot than the host for worktree {}",
6367                    guest_client.username,
6368                    id
6369                );
6370                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6371            }
6372
6373            guest_client
6374                .project
6375                .as_ref()
6376                .unwrap()
6377                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6378
6379            for guest_buffer in &guest_client.buffers {
6380                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6381                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6382                    project.buffer_for_id(buffer_id, cx).expect(&format!(
6383                        "host does not have buffer for guest:{}, peer:{}, id:{}",
6384                        guest_client.username, guest_client.peer_id, buffer_id
6385                    ))
6386                });
6387                let path = host_buffer
6388                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6389
6390                assert_eq!(
6391                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6392                    0,
6393                    "{}, buffer {}, path {:?} has deferred operations",
6394                    guest_client.username,
6395                    buffer_id,
6396                    path,
6397                );
6398                assert_eq!(
6399                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6400                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6401                    "{}, buffer {}, path {:?}, differs from the host's buffer",
6402                    guest_client.username,
6403                    buffer_id,
6404                    path
6405                );
6406            }
6407
6408            guest_cx.update(|_| drop(guest_client));
6409        }
6410
6411        host_cx.update(|_| drop(host_client));
6412    }
6413
6414    struct TestServer {
6415        peer: Arc<Peer>,
6416        app_state: Arc<AppState>,
6417        server: Arc<Server>,
6418        foreground: Rc<executor::Foreground>,
6419        notifications: mpsc::UnboundedReceiver<()>,
6420        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6421        forbid_connections: Arc<AtomicBool>,
6422        _test_db: TestDb,
6423    }
6424
6425    impl TestServer {
6426        async fn start(
6427            foreground: Rc<executor::Foreground>,
6428            background: Arc<executor::Background>,
6429        ) -> Self {
6430            let test_db = TestDb::fake(background);
6431            let app_state = Self::build_app_state(&test_db).await;
6432            let peer = Peer::new();
6433            let notifications = mpsc::unbounded();
6434            let server = Server::new(app_state.clone(), Some(notifications.0));
6435            Self {
6436                peer,
6437                app_state,
6438                server,
6439                foreground,
6440                notifications: notifications.1,
6441                connection_killers: Default::default(),
6442                forbid_connections: Default::default(),
6443                _test_db: test_db,
6444            }
6445        }
6446
6447        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6448            cx.update(|cx| {
6449                let settings = Settings::test(cx);
6450                cx.set_global(settings);
6451            });
6452
6453            let http = FakeHttpClient::with_404_response();
6454            let user_id =
6455                if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6456                    user.id
6457                } else {
6458                    self.app_state.db.create_user(name, false).await.unwrap()
6459                };
6460            let client_name = name.to_string();
6461            let mut client = Client::new(http.clone());
6462            let server = self.server.clone();
6463            let connection_killers = self.connection_killers.clone();
6464            let forbid_connections = self.forbid_connections.clone();
6465            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6466
6467            Arc::get_mut(&mut client)
6468                .unwrap()
6469                .override_authenticate(move |cx| {
6470                    cx.spawn(|_| async move {
6471                        let access_token = "the-token".to_string();
6472                        Ok(Credentials {
6473                            user_id: user_id.0 as u64,
6474                            access_token,
6475                        })
6476                    })
6477                })
6478                .override_establish_connection(move |credentials, cx| {
6479                    assert_eq!(credentials.user_id, user_id.0 as u64);
6480                    assert_eq!(credentials.access_token, "the-token");
6481
6482                    let server = server.clone();
6483                    let connection_killers = connection_killers.clone();
6484                    let forbid_connections = forbid_connections.clone();
6485                    let client_name = client_name.clone();
6486                    let connection_id_tx = connection_id_tx.clone();
6487                    cx.spawn(move |cx| async move {
6488                        if forbid_connections.load(SeqCst) {
6489                            Err(EstablishConnectionError::other(anyhow!(
6490                                "server is forbidding connections"
6491                            )))
6492                        } else {
6493                            let (client_conn, server_conn, killed) =
6494                                Connection::in_memory(cx.background());
6495                            connection_killers.lock().insert(user_id, killed);
6496                            cx.background()
6497                                .spawn(server.handle_connection(
6498                                    server_conn,
6499                                    client_name,
6500                                    user_id,
6501                                    Some(connection_id_tx),
6502                                    cx.background(),
6503                                ))
6504                                .detach();
6505                            Ok(client_conn)
6506                        }
6507                    })
6508                });
6509
6510            client
6511                .authenticate_and_connect(false, &cx.to_async())
6512                .await
6513                .unwrap();
6514
6515            Channel::init(&client);
6516            Project::init(&client);
6517            cx.update(|cx| {
6518                workspace::init(&client, cx);
6519            });
6520
6521            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6522            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6523
6524            let client = TestClient {
6525                client,
6526                peer_id,
6527                username: name.to_string(),
6528                user_store,
6529                language_registry: Arc::new(LanguageRegistry::test()),
6530                project: Default::default(),
6531                buffers: Default::default(),
6532            };
6533            client.wait_for_current_user(cx).await;
6534            client
6535        }
6536
6537        fn disconnect_client(&self, user_id: UserId) {
6538            self.connection_killers
6539                .lock()
6540                .remove(&user_id)
6541                .unwrap()
6542                .store(true, SeqCst);
6543        }
6544
6545        fn forbid_connections(&self) {
6546            self.forbid_connections.store(true, SeqCst);
6547        }
6548
6549        fn allow_connections(&self) {
6550            self.forbid_connections.store(false, SeqCst);
6551        }
6552
6553        async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6554            while let Some((client_a, cx_a)) = clients.pop() {
6555                for (client_b, cx_b) in &mut clients {
6556                    client_a
6557                        .user_store
6558                        .update(cx_a, |store, cx| {
6559                            store.request_contact(client_b.user_id().unwrap(), cx)
6560                        })
6561                        .await
6562                        .unwrap();
6563                    cx_a.foreground().run_until_parked();
6564                    client_b
6565                        .user_store
6566                        .update(*cx_b, |store, cx| {
6567                            store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6568                        })
6569                        .await
6570                        .unwrap();
6571                }
6572            }
6573        }
6574
6575        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6576            Arc::new(AppState {
6577                db: test_db.db().clone(),
6578                api_token: Default::default(),
6579            })
6580        }
6581
6582        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6583            self.server.store.read().await
6584        }
6585
6586        async fn condition<F>(&mut self, mut predicate: F)
6587        where
6588            F: FnMut(&Store) -> bool,
6589        {
6590            assert!(
6591                self.foreground.parking_forbidden(),
6592                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6593            );
6594            while !(predicate)(&*self.server.store.read().await) {
6595                self.foreground.start_waiting();
6596                self.notifications.next().await;
6597                self.foreground.finish_waiting();
6598            }
6599        }
6600    }
6601
6602    impl Deref for TestServer {
6603        type Target = Server;
6604
6605        fn deref(&self) -> &Self::Target {
6606            &self.server
6607        }
6608    }
6609
6610    impl Drop for TestServer {
6611        fn drop(&mut self) {
6612            self.peer.reset();
6613        }
6614    }
6615
6616    struct TestClient {
6617        client: Arc<Client>,
6618        username: String,
6619        pub peer_id: PeerId,
6620        pub user_store: ModelHandle<UserStore>,
6621        language_registry: Arc<LanguageRegistry>,
6622        project: Option<ModelHandle<Project>>,
6623        buffers: HashSet<ModelHandle<language::Buffer>>,
6624    }
6625
6626    impl Deref for TestClient {
6627        type Target = Arc<Client>;
6628
6629        fn deref(&self) -> &Self::Target {
6630            &self.client
6631        }
6632    }
6633
6634    struct ContactsSummary {
6635        pub current: Vec<String>,
6636        pub outgoing_requests: Vec<String>,
6637        pub incoming_requests: Vec<String>,
6638    }
6639
6640    impl TestClient {
6641        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6642            UserId::from_proto(
6643                self.user_store
6644                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6645            )
6646        }
6647
6648        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6649            let mut authed_user = self
6650                .user_store
6651                .read_with(cx, |user_store, _| user_store.watch_current_user());
6652            while authed_user.next().await.unwrap().is_none() {}
6653        }
6654
6655        async fn clear_contacts(&self, cx: &mut TestAppContext) {
6656            self.user_store
6657                .update(cx, |store, _| store.clear_contacts())
6658                .await;
6659        }
6660
6661        fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6662            self.user_store.read_with(cx, |store, _| ContactsSummary {
6663                current: store
6664                    .contacts()
6665                    .iter()
6666                    .map(|contact| contact.user.github_login.clone())
6667                    .collect(),
6668                outgoing_requests: store
6669                    .outgoing_contact_requests()
6670                    .iter()
6671                    .map(|user| user.github_login.clone())
6672                    .collect(),
6673                incoming_requests: store
6674                    .incoming_contact_requests()
6675                    .iter()
6676                    .map(|user| user.github_login.clone())
6677                    .collect(),
6678            })
6679        }
6680
6681        async fn build_local_project(
6682            &mut self,
6683            fs: Arc<FakeFs>,
6684            root_path: impl AsRef<Path>,
6685            cx: &mut TestAppContext,
6686        ) -> (ModelHandle<Project>, WorktreeId) {
6687            let project = cx.update(|cx| {
6688                Project::local(
6689                    self.client.clone(),
6690                    self.user_store.clone(),
6691                    self.language_registry.clone(),
6692                    fs,
6693                    cx,
6694                )
6695            });
6696            self.project = Some(project.clone());
6697            let (worktree, _) = project
6698                .update(cx, |p, cx| {
6699                    p.find_or_create_local_worktree(root_path, true, cx)
6700                })
6701                .await
6702                .unwrap();
6703            worktree
6704                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6705                .await;
6706            project
6707                .update(cx, |project, _| project.next_remote_id())
6708                .await;
6709            (project, worktree.read_with(cx, |tree, _| tree.id()))
6710        }
6711
6712        async fn build_remote_project(
6713            &mut self,
6714            project_id: u64,
6715            cx: &mut TestAppContext,
6716        ) -> ModelHandle<Project> {
6717            let project = Project::remote(
6718                project_id,
6719                self.client.clone(),
6720                self.user_store.clone(),
6721                self.language_registry.clone(),
6722                FakeFs::new(cx.background()),
6723                &mut cx.to_async(),
6724            )
6725            .await
6726            .unwrap();
6727            self.project = Some(project.clone());
6728            project
6729        }
6730
6731        fn build_workspace(
6732            &self,
6733            project: &ModelHandle<Project>,
6734            cx: &mut TestAppContext,
6735        ) -> ViewHandle<Workspace> {
6736            let (window_id, _) = cx.add_window(|_| EmptyView);
6737            cx.add_view(window_id, |cx| {
6738                let fs = project.read(cx).fs().clone();
6739                Workspace::new(
6740                    &WorkspaceParams {
6741                        fs,
6742                        project: project.clone(),
6743                        user_store: self.user_store.clone(),
6744                        languages: self.language_registry.clone(),
6745                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6746                        channel_list: cx.add_model(|cx| {
6747                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6748                        }),
6749                        client: self.client.clone(),
6750                    },
6751                    cx,
6752                )
6753            })
6754        }
6755
6756        async fn simulate_host(
6757            mut self,
6758            project: ModelHandle<Project>,
6759            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6760            rng: Arc<Mutex<StdRng>>,
6761            mut cx: TestAppContext,
6762        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6763            async fn simulate_host_internal(
6764                client: &mut TestClient,
6765                project: ModelHandle<Project>,
6766                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6767                rng: Arc<Mutex<StdRng>>,
6768                cx: &mut TestAppContext,
6769            ) -> anyhow::Result<()> {
6770                let fs = project.read_with(cx, |project, _| project.fs().clone());
6771
6772                while op_start_signal.next().await.is_some() {
6773                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6774                    let files = fs.as_fake().files().await;
6775                    match distribution {
6776                        0..=20 if !files.is_empty() => {
6777                            let path = files.choose(&mut *rng.lock()).unwrap();
6778                            let mut path = path.as_path();
6779                            while let Some(parent_path) = path.parent() {
6780                                path = parent_path;
6781                                if rng.lock().gen() {
6782                                    break;
6783                                }
6784                            }
6785
6786                            log::info!("Host: find/create local worktree {:?}", path);
6787                            let find_or_create_worktree = project.update(cx, |project, cx| {
6788                                project.find_or_create_local_worktree(path, true, cx)
6789                            });
6790                            if rng.lock().gen() {
6791                                cx.background().spawn(find_or_create_worktree).detach();
6792                            } else {
6793                                find_or_create_worktree.await?;
6794                            }
6795                        }
6796                        10..=80 if !files.is_empty() => {
6797                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6798                                let file = files.choose(&mut *rng.lock()).unwrap();
6799                                let (worktree, path) = project
6800                                    .update(cx, |project, cx| {
6801                                        project.find_or_create_local_worktree(
6802                                            file.clone(),
6803                                            true,
6804                                            cx,
6805                                        )
6806                                    })
6807                                    .await?;
6808                                let project_path =
6809                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6810                                log::info!(
6811                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6812                                    file,
6813                                    project_path.0,
6814                                    project_path.1
6815                                );
6816                                let buffer = project
6817                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6818                                    .await
6819                                    .unwrap();
6820                                client.buffers.insert(buffer.clone());
6821                                buffer
6822                            } else {
6823                                client
6824                                    .buffers
6825                                    .iter()
6826                                    .choose(&mut *rng.lock())
6827                                    .unwrap()
6828                                    .clone()
6829                            };
6830
6831                            if rng.lock().gen_bool(0.1) {
6832                                cx.update(|cx| {
6833                                    log::info!(
6834                                        "Host: dropping buffer {:?}",
6835                                        buffer.read(cx).file().unwrap().full_path(cx)
6836                                    );
6837                                    client.buffers.remove(&buffer);
6838                                    drop(buffer);
6839                                });
6840                            } else {
6841                                buffer.update(cx, |buffer, cx| {
6842                                    log::info!(
6843                                        "Host: updating buffer {:?} ({})",
6844                                        buffer.file().unwrap().full_path(cx),
6845                                        buffer.remote_id()
6846                                    );
6847
6848                                    if rng.lock().gen_bool(0.7) {
6849                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6850                                    } else {
6851                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6852                                    }
6853                                });
6854                            }
6855                        }
6856                        _ => loop {
6857                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6858                            let mut path = PathBuf::new();
6859                            path.push("/");
6860                            for _ in 0..path_component_count {
6861                                let letter = rng.lock().gen_range(b'a'..=b'z');
6862                                path.push(std::str::from_utf8(&[letter]).unwrap());
6863                            }
6864                            path.set_extension("rs");
6865                            let parent_path = path.parent().unwrap();
6866
6867                            log::info!("Host: creating file {:?}", path,);
6868
6869                            if fs.create_dir(&parent_path).await.is_ok()
6870                                && fs.create_file(&path, Default::default()).await.is_ok()
6871                            {
6872                                break;
6873                            } else {
6874                                log::info!("Host: cannot create file");
6875                            }
6876                        },
6877                    }
6878
6879                    cx.background().simulate_random_delay().await;
6880                }
6881
6882                Ok(())
6883            }
6884
6885            let result =
6886                simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
6887                    .await;
6888            log::info!("Host done");
6889            self.project = Some(project);
6890            (self, cx, result.err())
6891        }
6892
6893        pub async fn simulate_guest(
6894            mut self,
6895            guest_username: String,
6896            project: ModelHandle<Project>,
6897            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6898            rng: Arc<Mutex<StdRng>>,
6899            mut cx: TestAppContext,
6900        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6901            async fn simulate_guest_internal(
6902                client: &mut TestClient,
6903                guest_username: &str,
6904                project: ModelHandle<Project>,
6905                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6906                rng: Arc<Mutex<StdRng>>,
6907                cx: &mut TestAppContext,
6908            ) -> anyhow::Result<()> {
6909                while op_start_signal.next().await.is_some() {
6910                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6911                        let worktree = if let Some(worktree) =
6912                            project.read_with(cx, |project, cx| {
6913                                project
6914                                    .worktrees(&cx)
6915                                    .filter(|worktree| {
6916                                        let worktree = worktree.read(cx);
6917                                        worktree.is_visible()
6918                                            && worktree.entries(false).any(|e| e.is_file())
6919                                    })
6920                                    .choose(&mut *rng.lock())
6921                            }) {
6922                            worktree
6923                        } else {
6924                            cx.background().simulate_random_delay().await;
6925                            continue;
6926                        };
6927
6928                        let (worktree_root_name, project_path) =
6929                            worktree.read_with(cx, |worktree, _| {
6930                                let entry = worktree
6931                                    .entries(false)
6932                                    .filter(|e| e.is_file())
6933                                    .choose(&mut *rng.lock())
6934                                    .unwrap();
6935                                (
6936                                    worktree.root_name().to_string(),
6937                                    (worktree.id(), entry.path.clone()),
6938                                )
6939                            });
6940                        log::info!(
6941                            "{}: opening path {:?} in worktree {} ({})",
6942                            guest_username,
6943                            project_path.1,
6944                            project_path.0,
6945                            worktree_root_name,
6946                        );
6947                        let buffer = project
6948                            .update(cx, |project, cx| {
6949                                project.open_buffer(project_path.clone(), cx)
6950                            })
6951                            .await?;
6952                        log::info!(
6953                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6954                            guest_username,
6955                            project_path.1,
6956                            project_path.0,
6957                            worktree_root_name,
6958                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6959                        );
6960                        client.buffers.insert(buffer.clone());
6961                        buffer
6962                    } else {
6963                        client
6964                            .buffers
6965                            .iter()
6966                            .choose(&mut *rng.lock())
6967                            .unwrap()
6968                            .clone()
6969                    };
6970
6971                    let choice = rng.lock().gen_range(0..100);
6972                    match choice {
6973                        0..=9 => {
6974                            cx.update(|cx| {
6975                                log::info!(
6976                                    "{}: dropping buffer {:?}",
6977                                    guest_username,
6978                                    buffer.read(cx).file().unwrap().full_path(cx)
6979                                );
6980                                client.buffers.remove(&buffer);
6981                                drop(buffer);
6982                            });
6983                        }
6984                        10..=19 => {
6985                            let completions = project.update(cx, |project, cx| {
6986                                log::info!(
6987                                    "{}: requesting completions for buffer {} ({:?})",
6988                                    guest_username,
6989                                    buffer.read(cx).remote_id(),
6990                                    buffer.read(cx).file().unwrap().full_path(cx)
6991                                );
6992                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6993                                project.completions(&buffer, offset, cx)
6994                            });
6995                            let completions = cx.background().spawn(async move {
6996                                completions
6997                                    .await
6998                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6999                            });
7000                            if rng.lock().gen_bool(0.3) {
7001                                log::info!("{}: detaching completions request", guest_username);
7002                                cx.update(|cx| completions.detach_and_log_err(cx));
7003                            } else {
7004                                completions.await?;
7005                            }
7006                        }
7007                        20..=29 => {
7008                            let code_actions = project.update(cx, |project, cx| {
7009                                log::info!(
7010                                    "{}: requesting code actions for buffer {} ({:?})",
7011                                    guest_username,
7012                                    buffer.read(cx).remote_id(),
7013                                    buffer.read(cx).file().unwrap().full_path(cx)
7014                                );
7015                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7016                                project.code_actions(&buffer, range, cx)
7017                            });
7018                            let code_actions = cx.background().spawn(async move {
7019                                code_actions.await.map_err(|err| {
7020                                    anyhow!("code actions request failed: {:?}", err)
7021                                })
7022                            });
7023                            if rng.lock().gen_bool(0.3) {
7024                                log::info!("{}: detaching code actions request", guest_username);
7025                                cx.update(|cx| code_actions.detach_and_log_err(cx));
7026                            } else {
7027                                code_actions.await?;
7028                            }
7029                        }
7030                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7031                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7032                                log::info!(
7033                                    "{}: saving buffer {} ({:?})",
7034                                    guest_username,
7035                                    buffer.remote_id(),
7036                                    buffer.file().unwrap().full_path(cx)
7037                                );
7038                                (buffer.version(), buffer.save(cx))
7039                            });
7040                            let save = cx.background().spawn(async move {
7041                                let (saved_version, _) = save
7042                                    .await
7043                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7044                                assert!(saved_version.observed_all(&requested_version));
7045                                Ok::<_, anyhow::Error>(())
7046                            });
7047                            if rng.lock().gen_bool(0.3) {
7048                                log::info!("{}: detaching save request", guest_username);
7049                                cx.update(|cx| save.detach_and_log_err(cx));
7050                            } else {
7051                                save.await?;
7052                            }
7053                        }
7054                        40..=44 => {
7055                            let prepare_rename = project.update(cx, |project, cx| {
7056                                log::info!(
7057                                    "{}: preparing rename for buffer {} ({:?})",
7058                                    guest_username,
7059                                    buffer.read(cx).remote_id(),
7060                                    buffer.read(cx).file().unwrap().full_path(cx)
7061                                );
7062                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7063                                project.prepare_rename(buffer, offset, cx)
7064                            });
7065                            let prepare_rename = cx.background().spawn(async move {
7066                                prepare_rename.await.map_err(|err| {
7067                                    anyhow!("prepare rename request failed: {:?}", err)
7068                                })
7069                            });
7070                            if rng.lock().gen_bool(0.3) {
7071                                log::info!("{}: detaching prepare rename request", guest_username);
7072                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7073                            } else {
7074                                prepare_rename.await?;
7075                            }
7076                        }
7077                        45..=49 => {
7078                            let definitions = project.update(cx, |project, cx| {
7079                                log::info!(
7080                                    "{}: requesting definitions for buffer {} ({:?})",
7081                                    guest_username,
7082                                    buffer.read(cx).remote_id(),
7083                                    buffer.read(cx).file().unwrap().full_path(cx)
7084                                );
7085                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7086                                project.definition(&buffer, offset, cx)
7087                            });
7088                            let definitions = cx.background().spawn(async move {
7089                                definitions
7090                                    .await
7091                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7092                            });
7093                            if rng.lock().gen_bool(0.3) {
7094                                log::info!("{}: detaching definitions request", guest_username);
7095                                cx.update(|cx| definitions.detach_and_log_err(cx));
7096                            } else {
7097                                client
7098                                    .buffers
7099                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7100                            }
7101                        }
7102                        50..=54 => {
7103                            let highlights = project.update(cx, |project, cx| {
7104                                log::info!(
7105                                    "{}: requesting highlights for buffer {} ({:?})",
7106                                    guest_username,
7107                                    buffer.read(cx).remote_id(),
7108                                    buffer.read(cx).file().unwrap().full_path(cx)
7109                                );
7110                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7111                                project.document_highlights(&buffer, offset, cx)
7112                            });
7113                            let highlights = cx.background().spawn(async move {
7114                                highlights
7115                                    .await
7116                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7117                            });
7118                            if rng.lock().gen_bool(0.3) {
7119                                log::info!("{}: detaching highlights request", guest_username);
7120                                cx.update(|cx| highlights.detach_and_log_err(cx));
7121                            } else {
7122                                highlights.await?;
7123                            }
7124                        }
7125                        55..=59 => {
7126                            let search = project.update(cx, |project, cx| {
7127                                let query = rng.lock().gen_range('a'..='z');
7128                                log::info!("{}: project-wide search {:?}", guest_username, query);
7129                                project.search(SearchQuery::text(query, false, false), cx)
7130                            });
7131                            let search = cx.background().spawn(async move {
7132                                search
7133                                    .await
7134                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
7135                            });
7136                            if rng.lock().gen_bool(0.3) {
7137                                log::info!("{}: detaching search request", guest_username);
7138                                cx.update(|cx| search.detach_and_log_err(cx));
7139                            } else {
7140                                client.buffers.extend(search.await?.into_keys());
7141                            }
7142                        }
7143                        60..=69 => {
7144                            let worktree = project
7145                                .read_with(cx, |project, cx| {
7146                                    project
7147                                        .worktrees(&cx)
7148                                        .filter(|worktree| {
7149                                            let worktree = worktree.read(cx);
7150                                            worktree.is_visible()
7151                                                && worktree.entries(false).any(|e| e.is_file())
7152                                                && worktree
7153                                                    .root_entry()
7154                                                    .map_or(false, |e| e.is_dir())
7155                                        })
7156                                        .choose(&mut *rng.lock())
7157                                })
7158                                .unwrap();
7159                            let (worktree_id, worktree_root_name) = worktree
7160                                .read_with(cx, |worktree, _| {
7161                                    (worktree.id(), worktree.root_name().to_string())
7162                                });
7163
7164                            let mut new_name = String::new();
7165                            for _ in 0..10 {
7166                                let letter = rng.lock().gen_range('a'..='z');
7167                                new_name.push(letter);
7168                            }
7169                            let mut new_path = PathBuf::new();
7170                            new_path.push(new_name);
7171                            new_path.set_extension("rs");
7172                            log::info!(
7173                                "{}: creating {:?} in worktree {} ({})",
7174                                guest_username,
7175                                new_path,
7176                                worktree_id,
7177                                worktree_root_name,
7178                            );
7179                            project
7180                                .update(cx, |project, cx| {
7181                                    project.create_entry((worktree_id, new_path), false, cx)
7182                                })
7183                                .unwrap()
7184                                .await?;
7185                        }
7186                        _ => {
7187                            buffer.update(cx, |buffer, cx| {
7188                                log::info!(
7189                                    "{}: updating buffer {} ({:?})",
7190                                    guest_username,
7191                                    buffer.remote_id(),
7192                                    buffer.file().unwrap().full_path(cx)
7193                                );
7194                                if rng.lock().gen_bool(0.7) {
7195                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7196                                } else {
7197                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7198                                }
7199                            });
7200                        }
7201                    }
7202                    cx.background().simulate_random_delay().await;
7203                }
7204                Ok(())
7205            }
7206
7207            let result = simulate_guest_internal(
7208                &mut self,
7209                &guest_username,
7210                project.clone(),
7211                op_start_signal,
7212                rng,
7213                &mut cx,
7214            )
7215            .await;
7216            log::info!("{}: done", guest_username);
7217
7218            self.project = Some(project);
7219            (self, cx, result.err())
7220        }
7221    }
7222
7223    impl Drop for TestClient {
7224        fn drop(&mut self) {
7225            self.client.tear_down();
7226        }
7227    }
7228
7229    impl Executor for Arc<gpui::executor::Background> {
7230        type Sleep = gpui::executor::Timer;
7231
7232        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7233            self.spawn(future).detach();
7234        }
7235
7236        fn sleep(&self, duration: Duration) -> Self::Sleep {
7237            self.as_ref().timer(duration)
7238        }
7239    }
7240
7241    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7242        channel
7243            .messages()
7244            .cursor::<()>()
7245            .map(|m| {
7246                (
7247                    m.sender.github_login.clone(),
7248                    m.body.clone(),
7249                    m.is_pending(),
7250                )
7251            })
7252            .collect()
7253    }
7254
7255    struct EmptyView;
7256
7257    impl gpui::Entity for EmptyView {
7258        type Event = ();
7259    }
7260
7261    impl gpui::View for EmptyView {
7262        fn ui_name() -> &'static str {
7263            "empty view"
7264        }
7265
7266        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7267            gpui::Element::boxed(gpui::elements::Empty)
7268        }
7269    }
7270}