rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{self, ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::IntoResponse,
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::HashMap;
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::{
  40        atomic::{AtomicBool, Ordering::SeqCst},
  41        Arc,
  42    },
  43    time::Duration,
  44};
  45use store::{Store, Worktree};
  46use time::OffsetDateTime;
  47use tokio::{
  48    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  49    time::Sleep,
  50};
  51use tower::ServiceBuilder;
  52use tracing::{info_span, Instrument};
  53
  54type MessageHandler =
  55    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  56
  57struct Response<R> {
  58    server: Arc<Server>,
  59    receipt: Receipt<R>,
  60    responded: Arc<AtomicBool>,
  61}
  62
  63impl<R: RequestMessage> Response<R> {
  64    fn send(self, payload: R::Response) -> Result<()> {
  65        self.responded.store(true, SeqCst);
  66        self.server.peer.respond(self.receipt, payload)?;
  67        Ok(())
  68    }
  69}
  70
  71pub struct Server {
  72    peer: Arc<Peer>,
  73    store: RwLock<Store>,
  74    app_state: Arc<AppState>,
  75    handlers: HashMap<TypeId, MessageHandler>,
  76    notifications: Option<mpsc::UnboundedSender<()>>,
  77}
  78
  79pub trait Executor: Send + Clone {
  80    type Sleep: Send + Future;
  81    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  82    fn sleep(&self, duration: Duration) -> Self::Sleep;
  83}
  84
  85#[derive(Clone)]
  86pub struct RealExecutor;
  87
  88const MESSAGE_COUNT_PER_PAGE: usize = 100;
  89const MAX_MESSAGE_LEN: usize = 1024;
  90
  91struct StoreReadGuard<'a> {
  92    guard: RwLockReadGuard<'a, Store>,
  93    _not_send: PhantomData<Rc<()>>,
  94}
  95
  96struct StoreWriteGuard<'a> {
  97    guard: RwLockWriteGuard<'a, Store>,
  98    _not_send: PhantomData<Rc<()>>,
  99}
 100
 101impl Server {
 102    pub fn new(
 103        app_state: Arc<AppState>,
 104        notifications: Option<mpsc::UnboundedSender<()>>,
 105    ) -> Arc<Self> {
 106        let mut server = Self {
 107            peer: Peer::new(),
 108            app_state,
 109            store: Default::default(),
 110            handlers: Default::default(),
 111            notifications,
 112        };
 113
 114        server
 115            .add_request_handler(Server::ping)
 116            .add_request_handler(Server::register_project)
 117            .add_message_handler(Server::unregister_project)
 118            .add_request_handler(Server::share_project)
 119            .add_message_handler(Server::unshare_project)
 120            .add_request_handler(Server::join_project)
 121            .add_message_handler(Server::leave_project)
 122            .add_request_handler(Server::register_worktree)
 123            .add_message_handler(Server::unregister_worktree)
 124            .add_request_handler(Server::update_worktree)
 125            .add_message_handler(Server::start_language_server)
 126            .add_message_handler(Server::update_language_server)
 127            .add_message_handler(Server::update_diagnostic_summary)
 128            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 129            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 130            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 131            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 132            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 133            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 134            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 135            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 136            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 137            .add_request_handler(
 138                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 139            )
 140            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 141            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 142            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 143            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 144            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 145            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 146            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 147            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 148            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 149            .add_request_handler(Server::update_buffer)
 150            .add_message_handler(Server::update_buffer_file)
 151            .add_message_handler(Server::buffer_reloaded)
 152            .add_message_handler(Server::buffer_saved)
 153            .add_request_handler(Server::save_buffer)
 154            .add_request_handler(Server::get_channels)
 155            .add_request_handler(Server::get_users)
 156            .add_request_handler(Server::fuzzy_search_users)
 157            .add_request_handler(Server::request_contact)
 158            .add_request_handler(Server::remove_contact)
 159            .add_request_handler(Server::respond_to_contact_request)
 160            .add_request_handler(Server::join_channel)
 161            .add_message_handler(Server::leave_channel)
 162            .add_request_handler(Server::send_channel_message)
 163            .add_request_handler(Server::follow)
 164            .add_message_handler(Server::unfollow)
 165            .add_message_handler(Server::update_followers)
 166            .add_request_handler(Server::get_channel_messages);
 167
 168        Arc::new(server)
 169    }
 170
 171    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 172    where
 173        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 174        Fut: 'static + Send + Future<Output = Result<()>>,
 175        M: EnvelopedMessage,
 176    {
 177        let prev_handler = self.handlers.insert(
 178            TypeId::of::<M>(),
 179            Box::new(move |server, envelope| {
 180                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 181                let span = info_span!(
 182                    "handle message",
 183                    payload_type = envelope.payload_type_name(),
 184                    payload = format!("{:?}", envelope.payload).as_str(),
 185                );
 186                let future = (handler)(server, *envelope);
 187                async move {
 188                    if let Err(error) = future.await {
 189                        tracing::error!(%error, "error handling message");
 190                    }
 191                }
 192                .instrument(span)
 193                .boxed()
 194            }),
 195        );
 196        if prev_handler.is_some() {
 197            panic!("registered a handler for the same message twice");
 198        }
 199        self
 200    }
 201
 202    /// Handle a request while holding a lock to the store. This is useful when we're registering
 203    /// a connection but we want to respond on the connection before anybody else can send on it.
 204    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 205    where
 206        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
 207        Fut: Send + Future<Output = Result<()>>,
 208        M: RequestMessage,
 209    {
 210        let handler = Arc::new(handler);
 211        self.add_message_handler(move |server, envelope| {
 212            let receipt = envelope.receipt();
 213            let handler = handler.clone();
 214            async move {
 215                let responded = Arc::new(AtomicBool::default());
 216                let response = Response {
 217                    server: server.clone(),
 218                    responded: responded.clone(),
 219                    receipt: envelope.receipt(),
 220                };
 221                match (handler)(server.clone(), envelope, response).await {
 222                    Ok(()) => {
 223                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 224                            Ok(())
 225                        } else {
 226                            Err(anyhow!("handler did not send a response"))?
 227                        }
 228                    }
 229                    Err(error) => {
 230                        server.peer.respond_with_error(
 231                            receipt,
 232                            proto::Error {
 233                                message: error.to_string(),
 234                            },
 235                        )?;
 236                        Err(error)
 237                    }
 238                }
 239            }
 240        })
 241    }
 242
 243    pub fn handle_connection<E: Executor>(
 244        self: &Arc<Self>,
 245        connection: Connection,
 246        address: String,
 247        user_id: UserId,
 248        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 249        executor: E,
 250    ) -> impl Future<Output = Result<()>> {
 251        let mut this = self.clone();
 252        let span = info_span!("handle connection", %user_id, %address);
 253        async move {
 254            let (connection_id, handle_io, mut incoming_rx) = this
 255                .peer
 256                .add_connection(connection, {
 257                    let executor = executor.clone();
 258                    move |duration| {
 259                        let timer = executor.sleep(duration);
 260                        async move {
 261                            timer.await;
 262                        }
 263                    }
 264                })
 265                .await;
 266
 267            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 268
 269            if let Some(send_connection_id) = send_connection_id.as_mut() {
 270                let _ = send_connection_id.send(connection_id).await;
 271            }
 272
 273            let contacts = this.app_state.db.get_contacts(user_id).await?;
 274
 275            {
 276                let mut store = this.store_mut().await;
 277                store.add_connection(connection_id, user_id);
 278                this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 279            }
 280            this.update_user_contacts(user_id).await?;
 281
 282            let handle_io = handle_io.fuse();
 283            futures::pin_mut!(handle_io);
 284            loop {
 285                let next_message = incoming_rx.next().fuse();
 286                futures::pin_mut!(next_message);
 287                futures::select_biased! {
 288                    result = handle_io => {
 289                        if let Err(error) = result {
 290                            tracing::error!(%error, "error handling I/O");
 291                        }
 292                        break;
 293                    }
 294                    message = next_message => {
 295                        if let Some(message) = message {
 296                            let type_name = message.payload_type_name();
 297                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 298                            async {
 299                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 300                                    let notifications = this.notifications.clone();
 301                                    let is_background = message.is_background();
 302                                    let handle_message = (handler)(this.clone(), message);
 303                                    let handle_message = async move {
 304                                        handle_message.await;
 305                                        if let Some(mut notifications) = notifications {
 306                                            let _ = notifications.send(()).await;
 307                                        }
 308                                    };
 309                                    if is_background {
 310                                        executor.spawn_detached(handle_message);
 311                                    } else {
 312                                        handle_message.await;
 313                                    }
 314                                } else {
 315                                    tracing::error!("no message handler");
 316                                }
 317                            }.instrument(span).await;
 318                        } else {
 319                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 320                            break;
 321                        }
 322                    }
 323                }
 324            }
 325
 326            if let Err(error) = this.sign_out(connection_id).await {
 327                tracing::error!(%error, "error signing out");
 328            }
 329
 330            Ok(())
 331        }.instrument(span)
 332    }
 333
 334    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 335        self.peer.disconnect(connection_id);
 336        let removed_connection = self.store_mut().await.remove_connection(connection_id)?;
 337
 338        for (project_id, project) in removed_connection.hosted_projects {
 339            if let Some(share) = project.share {
 340                broadcast(connection_id, share.guests.keys().copied(), |conn_id| {
 341                    self.peer
 342                        .send(conn_id, proto::UnshareProject { project_id })
 343                });
 344            }
 345        }
 346
 347        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 348            broadcast(connection_id, peer_ids, |conn_id| {
 349                self.peer.send(
 350                    conn_id,
 351                    proto::RemoveProjectCollaborator {
 352                        project_id,
 353                        peer_id: connection_id.0,
 354                    },
 355                )
 356            });
 357        }
 358
 359        self.update_user_contacts(removed_connection.user_id)
 360            .await?;
 361
 362        Ok(())
 363    }
 364
 365    async fn ping(
 366        self: Arc<Server>,
 367        _: TypedEnvelope<proto::Ping>,
 368        response: Response<proto::Ping>,
 369    ) -> Result<()> {
 370        response.send(proto::Ack {})?;
 371        Ok(())
 372    }
 373
 374    async fn register_project(
 375        self: Arc<Server>,
 376        request: TypedEnvelope<proto::RegisterProject>,
 377        response: Response<proto::RegisterProject>,
 378    ) -> Result<()> {
 379        let user_id;
 380        let project_id;
 381        {
 382            let mut state = self.store_mut().await;
 383            user_id = state.user_id_for_connection(request.sender_id)?;
 384            project_id = state.register_project(request.sender_id, user_id);
 385        };
 386        self.update_user_contacts(user_id).await?;
 387        response.send(proto::RegisterProjectResponse { project_id })?;
 388        Ok(())
 389    }
 390
 391    async fn unregister_project(
 392        self: Arc<Server>,
 393        request: TypedEnvelope<proto::UnregisterProject>,
 394    ) -> Result<()> {
 395        let user_id = {
 396            let mut state = self.store_mut().await;
 397            state.unregister_project(request.payload.project_id, request.sender_id)?;
 398            state.user_id_for_connection(request.sender_id)?
 399        };
 400
 401        self.update_user_contacts(user_id).await?;
 402        Ok(())
 403    }
 404
 405    async fn share_project(
 406        self: Arc<Server>,
 407        request: TypedEnvelope<proto::ShareProject>,
 408        response: Response<proto::ShareProject>,
 409    ) -> Result<()> {
 410        let user_id = {
 411            let mut state = self.store_mut().await;
 412            state.share_project(request.payload.project_id, request.sender_id)?;
 413            state.user_id_for_connection(request.sender_id)?
 414        };
 415        self.update_user_contacts(user_id).await?;
 416        response.send(proto::Ack {})?;
 417        Ok(())
 418    }
 419
 420    async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
 421        let contacts = self.app_state.db.get_contacts(user_id).await?;
 422        let store = self.store().await;
 423        let updated_contact = store.contact_for_user(user_id, false);
 424        for contact in contacts {
 425            if let db::Contact::Accepted {
 426                user_id: contact_user_id,
 427                ..
 428            } = contact
 429            {
 430                for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
 431                    self.peer
 432                        .send(
 433                            contact_conn_id,
 434                            proto::UpdateContacts {
 435                                contacts: vec![updated_contact.clone()],
 436                                remove_contacts: Default::default(),
 437                                incoming_requests: Default::default(),
 438                                remove_incoming_requests: Default::default(),
 439                                outgoing_requests: Default::default(),
 440                                remove_outgoing_requests: Default::default(),
 441                            },
 442                        )
 443                        .trace_err();
 444                }
 445            }
 446        }
 447        Ok(())
 448    }
 449
 450    async fn unshare_project(
 451        self: Arc<Server>,
 452        request: TypedEnvelope<proto::UnshareProject>,
 453    ) -> Result<()> {
 454        let project_id = request.payload.project_id;
 455        let project;
 456        {
 457            let mut state = self.store_mut().await;
 458            project = state.unshare_project(project_id, request.sender_id)?;
 459            broadcast(request.sender_id, project.connection_ids, |conn_id| {
 460                self.peer
 461                    .send(conn_id, proto::UnshareProject { project_id })
 462            });
 463        }
 464        self.update_user_contacts(project.host_user_id).await?;
 465        Ok(())
 466    }
 467
 468    async fn join_project(
 469        self: Arc<Server>,
 470        request: TypedEnvelope<proto::JoinProject>,
 471        response: Response<proto::JoinProject>,
 472    ) -> Result<()> {
 473        let project_id = request.payload.project_id;
 474        let host_user_id;
 475        let guest_user_id;
 476        {
 477            let state = self.store().await;
 478            host_user_id = state.project(project_id)?.host_user_id;
 479            guest_user_id = state.user_id_for_connection(request.sender_id)?;
 480        };
 481
 482        let has_contact = self
 483            .app_state
 484            .db
 485            .has_contact(guest_user_id, host_user_id)
 486            .await?;
 487        if !has_contact {
 488            return Err(anyhow!("no such project"))?;
 489        }
 490
 491        {
 492            let state = &mut *self.store_mut().await;
 493            let joined = state.join_project(request.sender_id, guest_user_id, project_id)?;
 494            let share = joined.project.share()?;
 495            let peer_count = share.guests.len();
 496            let mut collaborators = Vec::with_capacity(peer_count);
 497            collaborators.push(proto::Collaborator {
 498                peer_id: joined.project.host_connection_id.0,
 499                replica_id: 0,
 500                user_id: joined.project.host_user_id.to_proto(),
 501            });
 502            let worktrees = share
 503                .worktrees
 504                .iter()
 505                .filter_map(|(id, shared_worktree)| {
 506                    let worktree = joined.project.worktrees.get(&id)?;
 507                    Some(proto::Worktree {
 508                        id: *id,
 509                        root_name: worktree.root_name.clone(),
 510                        entries: shared_worktree.entries.values().cloned().collect(),
 511                        diagnostic_summaries: shared_worktree
 512                            .diagnostic_summaries
 513                            .values()
 514                            .cloned()
 515                            .collect(),
 516                        visible: worktree.visible,
 517                        scan_id: shared_worktree.scan_id,
 518                    })
 519                })
 520                .collect();
 521            for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 522                if *peer_conn_id != request.sender_id {
 523                    collaborators.push(proto::Collaborator {
 524                        peer_id: peer_conn_id.0,
 525                        replica_id: *peer_replica_id as u32,
 526                        user_id: peer_user_id.to_proto(),
 527                    });
 528                }
 529            }
 530            broadcast(
 531                request.sender_id,
 532                joined.project.connection_ids(),
 533                |conn_id| {
 534                    self.peer.send(
 535                        conn_id,
 536                        proto::AddProjectCollaborator {
 537                            project_id,
 538                            collaborator: Some(proto::Collaborator {
 539                                peer_id: request.sender_id.0,
 540                                replica_id: joined.replica_id as u32,
 541                                user_id: guest_user_id.to_proto(),
 542                            }),
 543                        },
 544                    )
 545                },
 546            );
 547            response.send(proto::JoinProjectResponse {
 548                worktrees,
 549                replica_id: joined.replica_id as u32,
 550                collaborators,
 551                language_servers: joined.project.language_servers.clone(),
 552            })?;
 553        }
 554        self.update_user_contacts(host_user_id).await?;
 555        Ok(())
 556    }
 557
 558    async fn leave_project(
 559        self: Arc<Server>,
 560        request: TypedEnvelope<proto::LeaveProject>,
 561    ) -> Result<()> {
 562        let sender_id = request.sender_id;
 563        let project_id = request.payload.project_id;
 564        let project;
 565        {
 566            let mut state = self.store_mut().await;
 567            project = state.leave_project(sender_id, project_id)?;
 568            broadcast(sender_id, project.connection_ids, |conn_id| {
 569                self.peer.send(
 570                    conn_id,
 571                    proto::RemoveProjectCollaborator {
 572                        project_id,
 573                        peer_id: sender_id.0,
 574                    },
 575                )
 576            });
 577        }
 578        self.update_user_contacts(project.host_user_id).await?;
 579        Ok(())
 580    }
 581
 582    async fn register_worktree(
 583        self: Arc<Server>,
 584        request: TypedEnvelope<proto::RegisterWorktree>,
 585        response: Response<proto::RegisterWorktree>,
 586    ) -> Result<()> {
 587        let host_user_id;
 588        {
 589            let mut state = self.store_mut().await;
 590            host_user_id = state.user_id_for_connection(request.sender_id)?;
 591
 592            let guest_connection_ids = state
 593                .read_project(request.payload.project_id, request.sender_id)?
 594                .guest_connection_ids();
 595            state.register_worktree(
 596                request.payload.project_id,
 597                request.payload.worktree_id,
 598                request.sender_id,
 599                Worktree {
 600                    root_name: request.payload.root_name.clone(),
 601                    visible: request.payload.visible,
 602                },
 603            )?;
 604
 605            broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 606                self.peer
 607                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 608            });
 609        }
 610        self.update_user_contacts(host_user_id).await?;
 611        response.send(proto::Ack {})?;
 612        Ok(())
 613    }
 614
 615    async fn unregister_worktree(
 616        self: Arc<Server>,
 617        request: TypedEnvelope<proto::UnregisterWorktree>,
 618    ) -> Result<()> {
 619        let host_user_id;
 620        let project_id = request.payload.project_id;
 621        let worktree_id = request.payload.worktree_id;
 622        {
 623            let mut state = self.store_mut().await;
 624            let (_, guest_connection_ids) =
 625                state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 626            host_user_id = state.user_id_for_connection(request.sender_id)?;
 627            broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 628                self.peer.send(
 629                    conn_id,
 630                    proto::UnregisterWorktree {
 631                        project_id,
 632                        worktree_id,
 633                    },
 634                )
 635            });
 636        }
 637        self.update_user_contacts(host_user_id).await?;
 638        Ok(())
 639    }
 640
 641    async fn update_worktree(
 642        self: Arc<Server>,
 643        request: TypedEnvelope<proto::UpdateWorktree>,
 644        response: Response<proto::UpdateWorktree>,
 645    ) -> Result<()> {
 646        let connection_ids = self.store_mut().await.update_worktree(
 647            request.sender_id,
 648            request.payload.project_id,
 649            request.payload.worktree_id,
 650            &request.payload.removed_entries,
 651            &request.payload.updated_entries,
 652            request.payload.scan_id,
 653        )?;
 654
 655        broadcast(request.sender_id, connection_ids, |connection_id| {
 656            self.peer
 657                .forward_send(request.sender_id, connection_id, request.payload.clone())
 658        });
 659        response.send(proto::Ack {})?;
 660        Ok(())
 661    }
 662
 663    async fn update_diagnostic_summary(
 664        self: Arc<Server>,
 665        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 666    ) -> Result<()> {
 667        let summary = request
 668            .payload
 669            .summary
 670            .clone()
 671            .ok_or_else(|| anyhow!("invalid summary"))?;
 672        let receiver_ids = self.store_mut().await.update_diagnostic_summary(
 673            request.payload.project_id,
 674            request.payload.worktree_id,
 675            request.sender_id,
 676            summary,
 677        )?;
 678
 679        broadcast(request.sender_id, receiver_ids, |connection_id| {
 680            self.peer
 681                .forward_send(request.sender_id, connection_id, request.payload.clone())
 682        });
 683        Ok(())
 684    }
 685
 686    async fn start_language_server(
 687        self: Arc<Server>,
 688        request: TypedEnvelope<proto::StartLanguageServer>,
 689    ) -> Result<()> {
 690        let receiver_ids = self.store_mut().await.start_language_server(
 691            request.payload.project_id,
 692            request.sender_id,
 693            request
 694                .payload
 695                .server
 696                .clone()
 697                .ok_or_else(|| anyhow!("invalid language server"))?,
 698        )?;
 699        broadcast(request.sender_id, receiver_ids, |connection_id| {
 700            self.peer
 701                .forward_send(request.sender_id, connection_id, request.payload.clone())
 702        });
 703        Ok(())
 704    }
 705
 706    async fn update_language_server(
 707        self: Arc<Server>,
 708        request: TypedEnvelope<proto::UpdateLanguageServer>,
 709    ) -> Result<()> {
 710        let receiver_ids = self
 711            .store()
 712            .await
 713            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 714        broadcast(request.sender_id, receiver_ids, |connection_id| {
 715            self.peer
 716                .forward_send(request.sender_id, connection_id, request.payload.clone())
 717        });
 718        Ok(())
 719    }
 720
 721    async fn forward_project_request<T>(
 722        self: Arc<Server>,
 723        request: TypedEnvelope<T>,
 724        response: Response<T>,
 725    ) -> Result<()>
 726    where
 727        T: EntityMessage + RequestMessage,
 728    {
 729        let host_connection_id = self
 730            .store()
 731            .await
 732            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 733            .host_connection_id;
 734
 735        response.send(
 736            self.peer
 737                .forward_request(request.sender_id, host_connection_id, request.payload)
 738                .await?,
 739        )?;
 740        Ok(())
 741    }
 742
 743    async fn save_buffer(
 744        self: Arc<Server>,
 745        request: TypedEnvelope<proto::SaveBuffer>,
 746        response: Response<proto::SaveBuffer>,
 747    ) -> Result<()> {
 748        let host = self
 749            .store()
 750            .await
 751            .read_project(request.payload.project_id, request.sender_id)?
 752            .host_connection_id;
 753        let response_payload = self
 754            .peer
 755            .forward_request(request.sender_id, host, request.payload.clone())
 756            .await?;
 757
 758        let mut guests = self
 759            .store()
 760            .await
 761            .read_project(request.payload.project_id, request.sender_id)?
 762            .connection_ids();
 763        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 764        broadcast(host, guests, |conn_id| {
 765            self.peer
 766                .forward_send(host, conn_id, response_payload.clone())
 767        });
 768        response.send(response_payload)?;
 769        Ok(())
 770    }
 771
 772    async fn update_buffer(
 773        self: Arc<Server>,
 774        request: TypedEnvelope<proto::UpdateBuffer>,
 775        response: Response<proto::UpdateBuffer>,
 776    ) -> Result<()> {
 777        let receiver_ids = self
 778            .store()
 779            .await
 780            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 781        broadcast(request.sender_id, receiver_ids, |connection_id| {
 782            self.peer
 783                .forward_send(request.sender_id, connection_id, request.payload.clone())
 784        });
 785        response.send(proto::Ack {})?;
 786        Ok(())
 787    }
 788
 789    async fn update_buffer_file(
 790        self: Arc<Server>,
 791        request: TypedEnvelope<proto::UpdateBufferFile>,
 792    ) -> Result<()> {
 793        let receiver_ids = self
 794            .store()
 795            .await
 796            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 797        broadcast(request.sender_id, receiver_ids, |connection_id| {
 798            self.peer
 799                .forward_send(request.sender_id, connection_id, request.payload.clone())
 800        });
 801        Ok(())
 802    }
 803
 804    async fn buffer_reloaded(
 805        self: Arc<Server>,
 806        request: TypedEnvelope<proto::BufferReloaded>,
 807    ) -> Result<()> {
 808        let receiver_ids = self
 809            .store()
 810            .await
 811            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 812        broadcast(request.sender_id, receiver_ids, |connection_id| {
 813            self.peer
 814                .forward_send(request.sender_id, connection_id, request.payload.clone())
 815        });
 816        Ok(())
 817    }
 818
 819    async fn buffer_saved(
 820        self: Arc<Server>,
 821        request: TypedEnvelope<proto::BufferSaved>,
 822    ) -> Result<()> {
 823        let receiver_ids = self
 824            .store()
 825            .await
 826            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 827        broadcast(request.sender_id, receiver_ids, |connection_id| {
 828            self.peer
 829                .forward_send(request.sender_id, connection_id, request.payload.clone())
 830        });
 831        Ok(())
 832    }
 833
 834    async fn follow(
 835        self: Arc<Self>,
 836        request: TypedEnvelope<proto::Follow>,
 837        response: Response<proto::Follow>,
 838    ) -> Result<()> {
 839        let leader_id = ConnectionId(request.payload.leader_id);
 840        let follower_id = request.sender_id;
 841        if !self
 842            .store()
 843            .await
 844            .project_connection_ids(request.payload.project_id, follower_id)?
 845            .contains(&leader_id)
 846        {
 847            Err(anyhow!("no such peer"))?;
 848        }
 849        let mut response_payload = self
 850            .peer
 851            .forward_request(request.sender_id, leader_id, request.payload)
 852            .await?;
 853        response_payload
 854            .views
 855            .retain(|view| view.leader_id != Some(follower_id.0));
 856        response.send(response_payload)?;
 857        Ok(())
 858    }
 859
 860    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 861        let leader_id = ConnectionId(request.payload.leader_id);
 862        if !self
 863            .store()
 864            .await
 865            .project_connection_ids(request.payload.project_id, request.sender_id)?
 866            .contains(&leader_id)
 867        {
 868            Err(anyhow!("no such peer"))?;
 869        }
 870        self.peer
 871            .forward_send(request.sender_id, leader_id, request.payload)?;
 872        Ok(())
 873    }
 874
 875    async fn update_followers(
 876        self: Arc<Self>,
 877        request: TypedEnvelope<proto::UpdateFollowers>,
 878    ) -> Result<()> {
 879        let connection_ids = self
 880            .store()
 881            .await
 882            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 883        let leader_id = request
 884            .payload
 885            .variant
 886            .as_ref()
 887            .and_then(|variant| match variant {
 888                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 889                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 890                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 891            });
 892        for follower_id in &request.payload.follower_ids {
 893            let follower_id = ConnectionId(*follower_id);
 894            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 895                self.peer
 896                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 897            }
 898        }
 899        Ok(())
 900    }
 901
 902    async fn get_channels(
 903        self: Arc<Server>,
 904        request: TypedEnvelope<proto::GetChannels>,
 905        response: Response<proto::GetChannels>,
 906    ) -> Result<()> {
 907        let user_id = self
 908            .store()
 909            .await
 910            .user_id_for_connection(request.sender_id)?;
 911        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 912        response.send(proto::GetChannelsResponse {
 913            channels: channels
 914                .into_iter()
 915                .map(|chan| proto::Channel {
 916                    id: chan.id.to_proto(),
 917                    name: chan.name,
 918                })
 919                .collect(),
 920        })?;
 921        Ok(())
 922    }
 923
 924    async fn get_users(
 925        self: Arc<Server>,
 926        request: TypedEnvelope<proto::GetUsers>,
 927        response: Response<proto::GetUsers>,
 928    ) -> Result<()> {
 929        let user_ids = request
 930            .payload
 931            .user_ids
 932            .into_iter()
 933            .map(UserId::from_proto)
 934            .collect();
 935        let users = self
 936            .app_state
 937            .db
 938            .get_users_by_ids(user_ids)
 939            .await?
 940            .into_iter()
 941            .map(|user| proto::User {
 942                id: user.id.to_proto(),
 943                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 944                github_login: user.github_login,
 945            })
 946            .collect();
 947        response.send(proto::UsersResponse { users })?;
 948        Ok(())
 949    }
 950
 951    async fn fuzzy_search_users(
 952        self: Arc<Server>,
 953        request: TypedEnvelope<proto::FuzzySearchUsers>,
 954        response: Response<proto::FuzzySearchUsers>,
 955    ) -> Result<()> {
 956        let user_id = self
 957            .store()
 958            .await
 959            .user_id_for_connection(request.sender_id)?;
 960        let query = request.payload.query;
 961        let db = &self.app_state.db;
 962        let users = match query.len() {
 963            0 => vec![],
 964            1 | 2 => db
 965                .get_user_by_github_login(&query)
 966                .await?
 967                .into_iter()
 968                .collect(),
 969            _ => db.fuzzy_search_users(&query, 10).await?,
 970        };
 971        let users = users
 972            .into_iter()
 973            .filter(|user| user.id != user_id)
 974            .map(|user| proto::User {
 975                id: user.id.to_proto(),
 976                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 977                github_login: user.github_login,
 978            })
 979            .collect();
 980        response.send(proto::UsersResponse { users })?;
 981        Ok(())
 982    }
 983
 984    async fn request_contact(
 985        self: Arc<Server>,
 986        request: TypedEnvelope<proto::RequestContact>,
 987        response: Response<proto::RequestContact>,
 988    ) -> Result<()> {
 989        let requester_id = self
 990            .store()
 991            .await
 992            .user_id_for_connection(request.sender_id)?;
 993        let responder_id = UserId::from_proto(request.payload.responder_id);
 994        if requester_id == responder_id {
 995            return Err(anyhow!("cannot add yourself as a contact"))?;
 996        }
 997
 998        self.app_state
 999            .db
1000            .send_contact_request(requester_id, responder_id)
1001            .await?;
1002
1003        // Update outgoing contact requests of requester
1004        let mut update = proto::UpdateContacts::default();
1005        update.outgoing_requests.push(responder_id.to_proto());
1006        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1007            self.peer.send(connection_id, update.clone())?;
1008        }
1009
1010        // Update incoming contact requests of responder
1011        let mut update = proto::UpdateContacts::default();
1012        update
1013            .incoming_requests
1014            .push(proto::IncomingContactRequest {
1015                requester_id: requester_id.to_proto(),
1016                should_notify: true,
1017            });
1018        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1019            self.peer.send(connection_id, update.clone())?;
1020        }
1021
1022        response.send(proto::Ack {})?;
1023        Ok(())
1024    }
1025
1026    async fn respond_to_contact_request(
1027        self: Arc<Server>,
1028        request: TypedEnvelope<proto::RespondToContactRequest>,
1029        response: Response<proto::RespondToContactRequest>,
1030    ) -> Result<()> {
1031        let responder_id = self
1032            .store()
1033            .await
1034            .user_id_for_connection(request.sender_id)?;
1035        let requester_id = UserId::from_proto(request.payload.requester_id);
1036        if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1037            self.app_state
1038                .db
1039                .dismiss_contact_notification(responder_id, requester_id)
1040                .await?;
1041        } else {
1042            let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1043            self.app_state
1044                .db
1045                .respond_to_contact_request(responder_id, requester_id, accept)
1046                .await?;
1047
1048            let store = self.store().await;
1049            // Update responder with new contact
1050            let mut update = proto::UpdateContacts::default();
1051            if accept {
1052                update
1053                    .contacts
1054                    .push(store.contact_for_user(requester_id, false));
1055            }
1056            update
1057                .remove_incoming_requests
1058                .push(requester_id.to_proto());
1059            for connection_id in store.connection_ids_for_user(responder_id) {
1060                self.peer.send(connection_id, update.clone())?;
1061            }
1062
1063            // Update requester with new contact
1064            let mut update = proto::UpdateContacts::default();
1065            if accept {
1066                update
1067                    .contacts
1068                    .push(store.contact_for_user(responder_id, true));
1069            }
1070            update
1071                .remove_outgoing_requests
1072                .push(responder_id.to_proto());
1073            for connection_id in store.connection_ids_for_user(requester_id) {
1074                self.peer.send(connection_id, update.clone())?;
1075            }
1076        }
1077
1078        response.send(proto::Ack {})?;
1079        Ok(())
1080    }
1081
1082    async fn remove_contact(
1083        self: Arc<Server>,
1084        request: TypedEnvelope<proto::RemoveContact>,
1085        response: Response<proto::RemoveContact>,
1086    ) -> Result<()> {
1087        let requester_id = self
1088            .store()
1089            .await
1090            .user_id_for_connection(request.sender_id)?;
1091        let responder_id = UserId::from_proto(request.payload.user_id);
1092        self.app_state
1093            .db
1094            .remove_contact(requester_id, responder_id)
1095            .await?;
1096
1097        // Update outgoing contact requests of requester
1098        let mut update = proto::UpdateContacts::default();
1099        update
1100            .remove_outgoing_requests
1101            .push(responder_id.to_proto());
1102        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1103            self.peer.send(connection_id, update.clone())?;
1104        }
1105
1106        // Update incoming contact requests of responder
1107        let mut update = proto::UpdateContacts::default();
1108        update
1109            .remove_incoming_requests
1110            .push(requester_id.to_proto());
1111        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1112            self.peer.send(connection_id, update.clone())?;
1113        }
1114
1115        response.send(proto::Ack {})?;
1116        Ok(())
1117    }
1118
1119    // #[instrument(skip(self, state, user_ids))]
1120    // fn update_contacts_for_users<'a>(
1121    //     self: &Arc<Self>,
1122    //     state: &Store,
1123    //     user_ids: impl IntoIterator<Item = &'a UserId>,
1124    // ) {
1125    //     for user_id in user_ids {
1126    //         let contacts = state.contacts_for_user(*user_id);
1127    //         for connection_id in state.connection_ids_for_user(*user_id) {
1128    //             self.peer
1129    //                 .send(
1130    //                     connection_id,
1131    //                     proto::UpdateContacts {
1132    //                         contacts: contacts.clone(),
1133    //                         pending_requests_from_user_ids: Default::default(),
1134    //                         pending_requests_to_user_ids: Default::default(),
1135    //                     },
1136    //                 )
1137    //                 .trace_err();
1138    //         }
1139    //     }
1140    // }
1141
1142    async fn join_channel(
1143        self: Arc<Self>,
1144        request: TypedEnvelope<proto::JoinChannel>,
1145        response: Response<proto::JoinChannel>,
1146    ) -> Result<()> {
1147        let user_id = self
1148            .store()
1149            .await
1150            .user_id_for_connection(request.sender_id)?;
1151        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1152        if !self
1153            .app_state
1154            .db
1155            .can_user_access_channel(user_id, channel_id)
1156            .await?
1157        {
1158            Err(anyhow!("access denied"))?;
1159        }
1160
1161        self.store_mut()
1162            .await
1163            .join_channel(request.sender_id, channel_id);
1164        let messages = self
1165            .app_state
1166            .db
1167            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1168            .await?
1169            .into_iter()
1170            .map(|msg| proto::ChannelMessage {
1171                id: msg.id.to_proto(),
1172                body: msg.body,
1173                timestamp: msg.sent_at.unix_timestamp() as u64,
1174                sender_id: msg.sender_id.to_proto(),
1175                nonce: Some(msg.nonce.as_u128().into()),
1176            })
1177            .collect::<Vec<_>>();
1178        response.send(proto::JoinChannelResponse {
1179            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1180            messages,
1181        })?;
1182        Ok(())
1183    }
1184
1185    async fn leave_channel(
1186        self: Arc<Self>,
1187        request: TypedEnvelope<proto::LeaveChannel>,
1188    ) -> Result<()> {
1189        let user_id = self
1190            .store()
1191            .await
1192            .user_id_for_connection(request.sender_id)?;
1193        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1194        if !self
1195            .app_state
1196            .db
1197            .can_user_access_channel(user_id, channel_id)
1198            .await?
1199        {
1200            Err(anyhow!("access denied"))?;
1201        }
1202
1203        self.store_mut()
1204            .await
1205            .leave_channel(request.sender_id, channel_id);
1206
1207        Ok(())
1208    }
1209
1210    async fn send_channel_message(
1211        self: Arc<Self>,
1212        request: TypedEnvelope<proto::SendChannelMessage>,
1213        response: Response<proto::SendChannelMessage>,
1214    ) -> Result<()> {
1215        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1216        let user_id;
1217        let connection_ids;
1218        {
1219            let state = self.store().await;
1220            user_id = state.user_id_for_connection(request.sender_id)?;
1221            connection_ids = state.channel_connection_ids(channel_id)?;
1222        }
1223
1224        // Validate the message body.
1225        let body = request.payload.body.trim().to_string();
1226        if body.len() > MAX_MESSAGE_LEN {
1227            return Err(anyhow!("message is too long"))?;
1228        }
1229        if body.is_empty() {
1230            return Err(anyhow!("message can't be blank"))?;
1231        }
1232
1233        let timestamp = OffsetDateTime::now_utc();
1234        let nonce = request
1235            .payload
1236            .nonce
1237            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1238
1239        let message_id = self
1240            .app_state
1241            .db
1242            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1243            .await?
1244            .to_proto();
1245        let message = proto::ChannelMessage {
1246            sender_id: user_id.to_proto(),
1247            id: message_id,
1248            body,
1249            timestamp: timestamp.unix_timestamp() as u64,
1250            nonce: Some(nonce),
1251        };
1252        broadcast(request.sender_id, connection_ids, |conn_id| {
1253            self.peer.send(
1254                conn_id,
1255                proto::ChannelMessageSent {
1256                    channel_id: channel_id.to_proto(),
1257                    message: Some(message.clone()),
1258                },
1259            )
1260        });
1261        response.send(proto::SendChannelMessageResponse {
1262            message: Some(message),
1263        })?;
1264        Ok(())
1265    }
1266
1267    async fn get_channel_messages(
1268        self: Arc<Self>,
1269        request: TypedEnvelope<proto::GetChannelMessages>,
1270        response: Response<proto::GetChannelMessages>,
1271    ) -> Result<()> {
1272        let user_id = self
1273            .store()
1274            .await
1275            .user_id_for_connection(request.sender_id)?;
1276        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1277        if !self
1278            .app_state
1279            .db
1280            .can_user_access_channel(user_id, channel_id)
1281            .await?
1282        {
1283            Err(anyhow!("access denied"))?;
1284        }
1285
1286        let messages = self
1287            .app_state
1288            .db
1289            .get_channel_messages(
1290                channel_id,
1291                MESSAGE_COUNT_PER_PAGE,
1292                Some(MessageId::from_proto(request.payload.before_message_id)),
1293            )
1294            .await?
1295            .into_iter()
1296            .map(|msg| proto::ChannelMessage {
1297                id: msg.id.to_proto(),
1298                body: msg.body,
1299                timestamp: msg.sent_at.unix_timestamp() as u64,
1300                sender_id: msg.sender_id.to_proto(),
1301                nonce: Some(msg.nonce.as_u128().into()),
1302            })
1303            .collect::<Vec<_>>();
1304        response.send(proto::GetChannelMessagesResponse {
1305            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1306            messages,
1307        })?;
1308        Ok(())
1309    }
1310
1311    async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1312        #[cfg(test)]
1313        tokio::task::yield_now().await;
1314        let guard = self.store.read().await;
1315        #[cfg(test)]
1316        tokio::task::yield_now().await;
1317        StoreReadGuard {
1318            guard,
1319            _not_send: PhantomData,
1320        }
1321    }
1322
1323    async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1324        #[cfg(test)]
1325        tokio::task::yield_now().await;
1326        let guard = self.store.write().await;
1327        #[cfg(test)]
1328        tokio::task::yield_now().await;
1329        StoreWriteGuard {
1330            guard,
1331            _not_send: PhantomData,
1332        }
1333    }
1334}
1335
1336impl<'a> Deref for StoreReadGuard<'a> {
1337    type Target = Store;
1338
1339    fn deref(&self) -> &Self::Target {
1340        &*self.guard
1341    }
1342}
1343
1344impl<'a> Deref for StoreWriteGuard<'a> {
1345    type Target = Store;
1346
1347    fn deref(&self) -> &Self::Target {
1348        &*self.guard
1349    }
1350}
1351
1352impl<'a> DerefMut for StoreWriteGuard<'a> {
1353    fn deref_mut(&mut self) -> &mut Self::Target {
1354        &mut *self.guard
1355    }
1356}
1357
1358impl<'a> Drop for StoreWriteGuard<'a> {
1359    fn drop(&mut self) {
1360        #[cfg(test)]
1361        self.check_invariants();
1362
1363        let metrics = self.metrics();
1364        tracing::info!(
1365            connections = metrics.connections,
1366            registered_projects = metrics.registered_projects,
1367            shared_projects = metrics.shared_projects,
1368            "metrics"
1369        );
1370    }
1371}
1372
1373impl Executor for RealExecutor {
1374    type Sleep = Sleep;
1375
1376    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1377        tokio::task::spawn(future);
1378    }
1379
1380    fn sleep(&self, duration: Duration) -> Self::Sleep {
1381        tokio::time::sleep(duration)
1382    }
1383}
1384
1385fn broadcast<F>(
1386    sender_id: ConnectionId,
1387    receiver_ids: impl IntoIterator<Item = ConnectionId>,
1388    mut f: F,
1389) where
1390    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1391{
1392    for receiver_id in receiver_ids {
1393        if receiver_id != sender_id {
1394            f(receiver_id).trace_err();
1395        }
1396    }
1397}
1398
1399lazy_static! {
1400    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1401}
1402
1403pub struct ProtocolVersion(u32);
1404
1405impl Header for ProtocolVersion {
1406    fn name() -> &'static HeaderName {
1407        &ZED_PROTOCOL_VERSION
1408    }
1409
1410    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1411    where
1412        Self: Sized,
1413        I: Iterator<Item = &'i axum::http::HeaderValue>,
1414    {
1415        let version = values
1416            .next()
1417            .ok_or_else(|| axum::headers::Error::invalid())?
1418            .to_str()
1419            .map_err(|_| axum::headers::Error::invalid())?
1420            .parse()
1421            .map_err(|_| axum::headers::Error::invalid())?;
1422        Ok(Self(version))
1423    }
1424
1425    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1426        values.extend([self.0.to_string().parse().unwrap()]);
1427    }
1428}
1429
1430pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1431    let server = Server::new(app_state.clone(), None);
1432    Router::new()
1433        .route("/rpc", get(handle_websocket_request))
1434        .layer(
1435            ServiceBuilder::new()
1436                .layer(Extension(app_state))
1437                .layer(middleware::from_fn(auth::validate_header))
1438                .layer(Extension(server)),
1439        )
1440}
1441
1442pub async fn handle_websocket_request(
1443    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1444    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1445    Extension(server): Extension<Arc<Server>>,
1446    Extension(user_id): Extension<UserId>,
1447    ws: WebSocketUpgrade,
1448) -> axum::response::Response {
1449    if protocol_version != rpc::PROTOCOL_VERSION {
1450        return (
1451            StatusCode::UPGRADE_REQUIRED,
1452            "client must be upgraded".to_string(),
1453        )
1454            .into_response();
1455    }
1456    let socket_address = socket_address.to_string();
1457    ws.on_upgrade(move |socket| {
1458        use util::ResultExt;
1459        let socket = socket
1460            .map_ok(to_tungstenite_message)
1461            .err_into()
1462            .with(|message| async move { Ok(to_axum_message(message)) });
1463        let connection = Connection::new(Box::pin(socket));
1464        async move {
1465            server
1466                .handle_connection(connection, socket_address, user_id, None, RealExecutor)
1467                .await
1468                .log_err();
1469        }
1470    })
1471}
1472
1473fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1474    match message {
1475        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1476        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1477        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1478        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1479        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1480            code: frame.code.into(),
1481            reason: frame.reason,
1482        })),
1483    }
1484}
1485
1486fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1487    match message {
1488        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1489        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1490        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1491        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1492        AxumMessage::Close(frame) => {
1493            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1494                code: frame.code.into(),
1495                reason: frame.reason,
1496            }))
1497        }
1498    }
1499}
1500
1501pub trait ResultExt {
1502    type Ok;
1503
1504    fn trace_err(self) -> Option<Self::Ok>;
1505}
1506
1507impl<T, E> ResultExt for Result<T, E>
1508where
1509    E: std::fmt::Debug,
1510{
1511    type Ok = T;
1512
1513    fn trace_err(self) -> Option<T> {
1514        match self {
1515            Ok(value) => Some(value),
1516            Err(error) => {
1517                tracing::error!("{:?}", error);
1518                None
1519            }
1520        }
1521    }
1522}
1523
1524#[cfg(test)]
1525mod tests {
1526    use super::*;
1527    use crate::{
1528        db::{tests::TestDb, UserId},
1529        AppState,
1530    };
1531    use ::rpc::Peer;
1532    use client::{
1533        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1534        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1535    };
1536    use collections::{BTreeMap, HashSet};
1537    use editor::{
1538        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1539        ToOffset, ToggleCodeActions, Undo,
1540    };
1541    use gpui::{
1542        executor::{self, Deterministic},
1543        geometry::vector::vec2f,
1544        ModelHandle, TestAppContext, ViewHandle,
1545    };
1546    use language::{
1547        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1548        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1549    };
1550    use lsp::{self, FakeLanguageServer};
1551    use parking_lot::Mutex;
1552    use project::{
1553        fs::{FakeFs, Fs as _},
1554        search::SearchQuery,
1555        worktree::WorktreeHandle,
1556        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1557    };
1558    use rand::prelude::*;
1559    use rpc::PeerId;
1560    use serde_json::json;
1561    use settings::Settings;
1562    use sqlx::types::time::OffsetDateTime;
1563    use std::{
1564        env,
1565        ops::Deref,
1566        path::{Path, PathBuf},
1567        rc::Rc,
1568        sync::{
1569            atomic::{AtomicBool, Ordering::SeqCst},
1570            Arc,
1571        },
1572        time::Duration,
1573    };
1574    use theme::ThemeRegistry;
1575    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1576
1577    #[cfg(test)]
1578    #[ctor::ctor]
1579    fn init_logger() {
1580        if std::env::var("RUST_LOG").is_ok() {
1581            env_logger::init();
1582        }
1583    }
1584
1585    #[gpui::test(iterations = 10)]
1586    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1587        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1588        let lang_registry = Arc::new(LanguageRegistry::test());
1589        let fs = FakeFs::new(cx_a.background());
1590        cx_a.foreground().forbid_parking();
1591
1592        // Connect to a server as 2 clients.
1593        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1594        let client_a = server.create_client(cx_a, "user_a").await;
1595        let client_b = server.create_client(cx_b, "user_b").await;
1596        server
1597            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1598            .await;
1599
1600        // Share a project as client A
1601        fs.insert_tree(
1602            "/a",
1603            json!({
1604                "a.txt": "a-contents",
1605                "b.txt": "b-contents",
1606            }),
1607        )
1608        .await;
1609        let project_a = cx_a.update(|cx| {
1610            Project::local(
1611                client_a.clone(),
1612                client_a.user_store.clone(),
1613                lang_registry.clone(),
1614                fs.clone(),
1615                cx,
1616            )
1617        });
1618        let (worktree_a, _) = project_a
1619            .update(cx_a, |p, cx| {
1620                p.find_or_create_local_worktree("/a", true, cx)
1621            })
1622            .await
1623            .unwrap();
1624        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1625        worktree_a
1626            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1627            .await;
1628        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1629        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1630
1631        // Join that project as client B
1632        let project_b = Project::remote(
1633            project_id,
1634            client_b.clone(),
1635            client_b.user_store.clone(),
1636            lang_registry.clone(),
1637            fs.clone(),
1638            &mut cx_b.to_async(),
1639        )
1640        .await
1641        .unwrap();
1642
1643        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1644            assert_eq!(
1645                project
1646                    .collaborators()
1647                    .get(&client_a.peer_id)
1648                    .unwrap()
1649                    .user
1650                    .github_login,
1651                "user_a"
1652            );
1653            project.replica_id()
1654        });
1655        project_a
1656            .condition(&cx_a, |tree, _| {
1657                tree.collaborators()
1658                    .get(&client_b.peer_id)
1659                    .map_or(false, |collaborator| {
1660                        collaborator.replica_id == replica_id_b
1661                            && collaborator.user.github_login == "user_b"
1662                    })
1663            })
1664            .await;
1665
1666        // Open the same file as client B and client A.
1667        let buffer_b = project_b
1668            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1669            .await
1670            .unwrap();
1671        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1672        project_a.read_with(cx_a, |project, cx| {
1673            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1674        });
1675        let buffer_a = project_a
1676            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1677            .await
1678            .unwrap();
1679
1680        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1681
1682        // TODO
1683        // // Create a selection set as client B and see that selection set as client A.
1684        // buffer_a
1685        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1686        //     .await;
1687
1688        // Edit the buffer as client B and see that edit as client A.
1689        editor_b.update(cx_b, |editor, cx| {
1690            editor.handle_input(&Input("ok, ".into()), cx)
1691        });
1692        buffer_a
1693            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1694            .await;
1695
1696        // TODO
1697        // // Remove the selection set as client B, see those selections disappear as client A.
1698        cx_b.update(move |_| drop(editor_b));
1699        // buffer_a
1700        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1701        //     .await;
1702
1703        // Dropping the client B's project removes client B from client A's collaborators.
1704        cx_b.update(move |_| drop(project_b));
1705        project_a
1706            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1707            .await;
1708    }
1709
1710    #[gpui::test(iterations = 10)]
1711    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1712        let lang_registry = Arc::new(LanguageRegistry::test());
1713        let fs = FakeFs::new(cx_a.background());
1714        cx_a.foreground().forbid_parking();
1715
1716        // Connect to a server as 2 clients.
1717        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1718        let client_a = server.create_client(cx_a, "user_a").await;
1719        let client_b = server.create_client(cx_b, "user_b").await;
1720        server
1721            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1722            .await;
1723
1724        // Share a project as client A
1725        fs.insert_tree(
1726            "/a",
1727            json!({
1728                "a.txt": "a-contents",
1729                "b.txt": "b-contents",
1730            }),
1731        )
1732        .await;
1733        let project_a = cx_a.update(|cx| {
1734            Project::local(
1735                client_a.clone(),
1736                client_a.user_store.clone(),
1737                lang_registry.clone(),
1738                fs.clone(),
1739                cx,
1740            )
1741        });
1742        let (worktree_a, _) = project_a
1743            .update(cx_a, |p, cx| {
1744                p.find_or_create_local_worktree("/a", true, cx)
1745            })
1746            .await
1747            .unwrap();
1748        worktree_a
1749            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1750            .await;
1751        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1752        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1753        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1754        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1755
1756        // Join that project as client B
1757        let project_b = Project::remote(
1758            project_id,
1759            client_b.clone(),
1760            client_b.user_store.clone(),
1761            lang_registry.clone(),
1762            fs.clone(),
1763            &mut cx_b.to_async(),
1764        )
1765        .await
1766        .unwrap();
1767        project_b
1768            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1769            .await
1770            .unwrap();
1771
1772        // Unshare the project as client A
1773        project_a.update(cx_a, |project, cx| project.unshare(cx));
1774        project_b
1775            .condition(cx_b, |project, _| project.is_read_only())
1776            .await;
1777        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1778        cx_b.update(|_| {
1779            drop(project_b);
1780        });
1781
1782        // Share the project again and ensure guests can still join.
1783        project_a
1784            .update(cx_a, |project, cx| project.share(cx))
1785            .await
1786            .unwrap();
1787        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1788
1789        let project_b2 = Project::remote(
1790            project_id,
1791            client_b.clone(),
1792            client_b.user_store.clone(),
1793            lang_registry.clone(),
1794            fs.clone(),
1795            &mut cx_b.to_async(),
1796        )
1797        .await
1798        .unwrap();
1799        project_b2
1800            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1801            .await
1802            .unwrap();
1803    }
1804
1805    #[gpui::test(iterations = 10)]
1806    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1807        let lang_registry = Arc::new(LanguageRegistry::test());
1808        let fs = FakeFs::new(cx_a.background());
1809        cx_a.foreground().forbid_parking();
1810
1811        // Connect to a server as 2 clients.
1812        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1813        let client_a = server.create_client(cx_a, "user_a").await;
1814        let client_b = server.create_client(cx_b, "user_b").await;
1815        server
1816            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1817            .await;
1818
1819        // Share a project as client A
1820        fs.insert_tree(
1821            "/a",
1822            json!({
1823                "a.txt": "a-contents",
1824                "b.txt": "b-contents",
1825            }),
1826        )
1827        .await;
1828        let project_a = cx_a.update(|cx| {
1829            Project::local(
1830                client_a.clone(),
1831                client_a.user_store.clone(),
1832                lang_registry.clone(),
1833                fs.clone(),
1834                cx,
1835            )
1836        });
1837        let (worktree_a, _) = project_a
1838            .update(cx_a, |p, cx| {
1839                p.find_or_create_local_worktree("/a", true, cx)
1840            })
1841            .await
1842            .unwrap();
1843        worktree_a
1844            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1845            .await;
1846        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1847        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1848        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1849        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1850
1851        // Join that project as client B
1852        let project_b = Project::remote(
1853            project_id,
1854            client_b.clone(),
1855            client_b.user_store.clone(),
1856            lang_registry.clone(),
1857            fs.clone(),
1858            &mut cx_b.to_async(),
1859        )
1860        .await
1861        .unwrap();
1862        project_b
1863            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1864            .await
1865            .unwrap();
1866
1867        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1868        server.disconnect_client(client_a.current_user_id(cx_a));
1869        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1870        project_a
1871            .condition(cx_a, |project, _| project.collaborators().is_empty())
1872            .await;
1873        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1874        project_b
1875            .condition(cx_b, |project, _| project.is_read_only())
1876            .await;
1877        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1878        cx_b.update(|_| {
1879            drop(project_b);
1880        });
1881
1882        // Await reconnection
1883        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1884
1885        // Share the project again and ensure guests can still join.
1886        project_a
1887            .update(cx_a, |project, cx| project.share(cx))
1888            .await
1889            .unwrap();
1890        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1891
1892        let project_b2 = Project::remote(
1893            project_id,
1894            client_b.clone(),
1895            client_b.user_store.clone(),
1896            lang_registry.clone(),
1897            fs.clone(),
1898            &mut cx_b.to_async(),
1899        )
1900        .await
1901        .unwrap();
1902        project_b2
1903            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1904            .await
1905            .unwrap();
1906    }
1907
1908    #[gpui::test(iterations = 10)]
1909    async fn test_propagate_saves_and_fs_changes(
1910        cx_a: &mut TestAppContext,
1911        cx_b: &mut TestAppContext,
1912        cx_c: &mut TestAppContext,
1913    ) {
1914        let lang_registry = Arc::new(LanguageRegistry::test());
1915        let fs = FakeFs::new(cx_a.background());
1916        cx_a.foreground().forbid_parking();
1917
1918        // Connect to a server as 3 clients.
1919        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1920        let client_a = server.create_client(cx_a, "user_a").await;
1921        let client_b = server.create_client(cx_b, "user_b").await;
1922        let client_c = server.create_client(cx_c, "user_c").await;
1923        server
1924            .make_contacts(vec![
1925                (&client_a, cx_a),
1926                (&client_b, cx_b),
1927                (&client_c, cx_c),
1928            ])
1929            .await;
1930
1931        // Share a worktree as client A.
1932        fs.insert_tree(
1933            "/a",
1934            json!({
1935                "file1": "",
1936                "file2": ""
1937            }),
1938        )
1939        .await;
1940        let project_a = cx_a.update(|cx| {
1941            Project::local(
1942                client_a.clone(),
1943                client_a.user_store.clone(),
1944                lang_registry.clone(),
1945                fs.clone(),
1946                cx,
1947            )
1948        });
1949        let (worktree_a, _) = project_a
1950            .update(cx_a, |p, cx| {
1951                p.find_or_create_local_worktree("/a", true, cx)
1952            })
1953            .await
1954            .unwrap();
1955        worktree_a
1956            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1957            .await;
1958        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1959        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1960        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1961
1962        // Join that worktree as clients B and C.
1963        let project_b = Project::remote(
1964            project_id,
1965            client_b.clone(),
1966            client_b.user_store.clone(),
1967            lang_registry.clone(),
1968            fs.clone(),
1969            &mut cx_b.to_async(),
1970        )
1971        .await
1972        .unwrap();
1973        let project_c = Project::remote(
1974            project_id,
1975            client_c.clone(),
1976            client_c.user_store.clone(),
1977            lang_registry.clone(),
1978            fs.clone(),
1979            &mut cx_c.to_async(),
1980        )
1981        .await
1982        .unwrap();
1983        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1984        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1985
1986        // Open and edit a buffer as both guests B and C.
1987        let buffer_b = project_b
1988            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1989            .await
1990            .unwrap();
1991        let buffer_c = project_c
1992            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1993            .await
1994            .unwrap();
1995        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1996        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1997
1998        // Open and edit that buffer as the host.
1999        let buffer_a = project_a
2000            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
2001            .await
2002            .unwrap();
2003
2004        buffer_a
2005            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
2006            .await;
2007        buffer_a.update(cx_a, |buf, cx| {
2008            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
2009        });
2010
2011        // Wait for edits to propagate
2012        buffer_a
2013            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2014            .await;
2015        buffer_b
2016            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2017            .await;
2018        buffer_c
2019            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2020            .await;
2021
2022        // Edit the buffer as the host and concurrently save as guest B.
2023        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2024        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2025        save_b.await.unwrap();
2026        assert_eq!(
2027            fs.load("/a/file1".as_ref()).await.unwrap(),
2028            "hi-a, i-am-c, i-am-b, i-am-a"
2029        );
2030        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2031        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2032        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2033
2034        worktree_a.flush_fs_events(cx_a).await;
2035
2036        // Make changes on host's file system, see those changes on guest worktrees.
2037        fs.rename(
2038            "/a/file1".as_ref(),
2039            "/a/file1-renamed".as_ref(),
2040            Default::default(),
2041        )
2042        .await
2043        .unwrap();
2044
2045        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2046            .await
2047            .unwrap();
2048        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2049
2050        worktree_a
2051            .condition(&cx_a, |tree, _| {
2052                tree.paths()
2053                    .map(|p| p.to_string_lossy())
2054                    .collect::<Vec<_>>()
2055                    == ["file1-renamed", "file3", "file4"]
2056            })
2057            .await;
2058        worktree_b
2059            .condition(&cx_b, |tree, _| {
2060                tree.paths()
2061                    .map(|p| p.to_string_lossy())
2062                    .collect::<Vec<_>>()
2063                    == ["file1-renamed", "file3", "file4"]
2064            })
2065            .await;
2066        worktree_c
2067            .condition(&cx_c, |tree, _| {
2068                tree.paths()
2069                    .map(|p| p.to_string_lossy())
2070                    .collect::<Vec<_>>()
2071                    == ["file1-renamed", "file3", "file4"]
2072            })
2073            .await;
2074
2075        // Ensure buffer files are updated as well.
2076        buffer_a
2077            .condition(&cx_a, |buf, _| {
2078                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2079            })
2080            .await;
2081        buffer_b
2082            .condition(&cx_b, |buf, _| {
2083                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2084            })
2085            .await;
2086        buffer_c
2087            .condition(&cx_c, |buf, _| {
2088                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2089            })
2090            .await;
2091    }
2092
2093    #[gpui::test(iterations = 10)]
2094    async fn test_fs_operations(
2095        executor: Arc<Deterministic>,
2096        cx_a: &mut TestAppContext,
2097        cx_b: &mut TestAppContext,
2098    ) {
2099        executor.forbid_parking();
2100        let fs = FakeFs::new(cx_a.background());
2101
2102        // Connect to a server as 2 clients.
2103        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2104        let mut client_a = server.create_client(cx_a, "user_a").await;
2105        let mut client_b = server.create_client(cx_b, "user_b").await;
2106        server
2107            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2108            .await;
2109
2110        // Share a project as client A
2111        fs.insert_tree(
2112            "/dir",
2113            json!({
2114                "a.txt": "a-contents",
2115                "b.txt": "b-contents",
2116            }),
2117        )
2118        .await;
2119
2120        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2121        let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
2122        project_a
2123            .update(cx_a, |project, cx| project.share(cx))
2124            .await
2125            .unwrap();
2126
2127        let project_b = client_b.build_remote_project(project_id, cx_b).await;
2128
2129        let worktree_a =
2130            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2131        let worktree_b =
2132            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2133
2134        let entry = project_b
2135            .update(cx_b, |project, cx| {
2136                project
2137                    .create_entry((worktree_id, "c.txt"), false, cx)
2138                    .unwrap()
2139            })
2140            .await
2141            .unwrap();
2142        worktree_a.read_with(cx_a, |worktree, _| {
2143            assert_eq!(
2144                worktree
2145                    .paths()
2146                    .map(|p| p.to_string_lossy())
2147                    .collect::<Vec<_>>(),
2148                ["a.txt", "b.txt", "c.txt"]
2149            );
2150        });
2151        worktree_b.read_with(cx_b, |worktree, _| {
2152            assert_eq!(
2153                worktree
2154                    .paths()
2155                    .map(|p| p.to_string_lossy())
2156                    .collect::<Vec<_>>(),
2157                ["a.txt", "b.txt", "c.txt"]
2158            );
2159        });
2160
2161        project_b
2162            .update(cx_b, |project, cx| {
2163                project.rename_entry(entry.id, Path::new("d.txt"), cx)
2164            })
2165            .unwrap()
2166            .await
2167            .unwrap();
2168        worktree_a.read_with(cx_a, |worktree, _| {
2169            assert_eq!(
2170                worktree
2171                    .paths()
2172                    .map(|p| p.to_string_lossy())
2173                    .collect::<Vec<_>>(),
2174                ["a.txt", "b.txt", "d.txt"]
2175            );
2176        });
2177        worktree_b.read_with(cx_b, |worktree, _| {
2178            assert_eq!(
2179                worktree
2180                    .paths()
2181                    .map(|p| p.to_string_lossy())
2182                    .collect::<Vec<_>>(),
2183                ["a.txt", "b.txt", "d.txt"]
2184            );
2185        });
2186
2187        let dir_entry = project_b
2188            .update(cx_b, |project, cx| {
2189                project
2190                    .create_entry((worktree_id, "DIR"), true, cx)
2191                    .unwrap()
2192            })
2193            .await
2194            .unwrap();
2195        worktree_a.read_with(cx_a, |worktree, _| {
2196            assert_eq!(
2197                worktree
2198                    .paths()
2199                    .map(|p| p.to_string_lossy())
2200                    .collect::<Vec<_>>(),
2201                ["DIR", "a.txt", "b.txt", "d.txt"]
2202            );
2203        });
2204        worktree_b.read_with(cx_b, |worktree, _| {
2205            assert_eq!(
2206                worktree
2207                    .paths()
2208                    .map(|p| p.to_string_lossy())
2209                    .collect::<Vec<_>>(),
2210                ["DIR", "a.txt", "b.txt", "d.txt"]
2211            );
2212        });
2213
2214        project_b
2215            .update(cx_b, |project, cx| {
2216                project.delete_entry(dir_entry.id, cx).unwrap()
2217            })
2218            .await
2219            .unwrap();
2220        worktree_a.read_with(cx_a, |worktree, _| {
2221            assert_eq!(
2222                worktree
2223                    .paths()
2224                    .map(|p| p.to_string_lossy())
2225                    .collect::<Vec<_>>(),
2226                ["a.txt", "b.txt", "d.txt"]
2227            );
2228        });
2229        worktree_b.read_with(cx_b, |worktree, _| {
2230            assert_eq!(
2231                worktree
2232                    .paths()
2233                    .map(|p| p.to_string_lossy())
2234                    .collect::<Vec<_>>(),
2235                ["a.txt", "b.txt", "d.txt"]
2236            );
2237        });
2238
2239        project_b
2240            .update(cx_b, |project, cx| {
2241                project.delete_entry(entry.id, cx).unwrap()
2242            })
2243            .await
2244            .unwrap();
2245        worktree_a.read_with(cx_a, |worktree, _| {
2246            assert_eq!(
2247                worktree
2248                    .paths()
2249                    .map(|p| p.to_string_lossy())
2250                    .collect::<Vec<_>>(),
2251                ["a.txt", "b.txt"]
2252            );
2253        });
2254        worktree_b.read_with(cx_b, |worktree, _| {
2255            assert_eq!(
2256                worktree
2257                    .paths()
2258                    .map(|p| p.to_string_lossy())
2259                    .collect::<Vec<_>>(),
2260                ["a.txt", "b.txt"]
2261            );
2262        });
2263    }
2264
2265    #[gpui::test(iterations = 10)]
2266    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2267        cx_a.foreground().forbid_parking();
2268        let lang_registry = Arc::new(LanguageRegistry::test());
2269        let fs = FakeFs::new(cx_a.background());
2270
2271        // Connect to a server as 2 clients.
2272        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2273        let client_a = server.create_client(cx_a, "user_a").await;
2274        let client_b = server.create_client(cx_b, "user_b").await;
2275        server
2276            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2277            .await;
2278
2279        // Share a project as client A
2280        fs.insert_tree(
2281            "/dir",
2282            json!({
2283                "a.txt": "a-contents",
2284            }),
2285        )
2286        .await;
2287
2288        let project_a = cx_a.update(|cx| {
2289            Project::local(
2290                client_a.clone(),
2291                client_a.user_store.clone(),
2292                lang_registry.clone(),
2293                fs.clone(),
2294                cx,
2295            )
2296        });
2297        let (worktree_a, _) = project_a
2298            .update(cx_a, |p, cx| {
2299                p.find_or_create_local_worktree("/dir", true, cx)
2300            })
2301            .await
2302            .unwrap();
2303        worktree_a
2304            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2305            .await;
2306        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2307        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2308        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2309
2310        // Join that project as client B
2311        let project_b = Project::remote(
2312            project_id,
2313            client_b.clone(),
2314            client_b.user_store.clone(),
2315            lang_registry.clone(),
2316            fs.clone(),
2317            &mut cx_b.to_async(),
2318        )
2319        .await
2320        .unwrap();
2321
2322        // Open a buffer as client B
2323        let buffer_b = project_b
2324            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2325            .await
2326            .unwrap();
2327
2328        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2329        buffer_b.read_with(cx_b, |buf, _| {
2330            assert!(buf.is_dirty());
2331            assert!(!buf.has_conflict());
2332        });
2333
2334        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2335        buffer_b
2336            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2337            .await;
2338        buffer_b.read_with(cx_b, |buf, _| {
2339            assert!(!buf.has_conflict());
2340        });
2341
2342        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2343        buffer_b.read_with(cx_b, |buf, _| {
2344            assert!(buf.is_dirty());
2345            assert!(!buf.has_conflict());
2346        });
2347    }
2348
2349    #[gpui::test(iterations = 10)]
2350    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2351        cx_a.foreground().forbid_parking();
2352        let lang_registry = Arc::new(LanguageRegistry::test());
2353        let fs = FakeFs::new(cx_a.background());
2354
2355        // Connect to a server as 2 clients.
2356        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2357        let client_a = server.create_client(cx_a, "user_a").await;
2358        let client_b = server.create_client(cx_b, "user_b").await;
2359        server
2360            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2361            .await;
2362
2363        // Share a project as client A
2364        fs.insert_tree(
2365            "/dir",
2366            json!({
2367                "a.txt": "a-contents",
2368            }),
2369        )
2370        .await;
2371
2372        let project_a = cx_a.update(|cx| {
2373            Project::local(
2374                client_a.clone(),
2375                client_a.user_store.clone(),
2376                lang_registry.clone(),
2377                fs.clone(),
2378                cx,
2379            )
2380        });
2381        let (worktree_a, _) = project_a
2382            .update(cx_a, |p, cx| {
2383                p.find_or_create_local_worktree("/dir", true, cx)
2384            })
2385            .await
2386            .unwrap();
2387        worktree_a
2388            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2389            .await;
2390        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2391        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2392        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2393
2394        // Join that project as client B
2395        let project_b = Project::remote(
2396            project_id,
2397            client_b.clone(),
2398            client_b.user_store.clone(),
2399            lang_registry.clone(),
2400            fs.clone(),
2401            &mut cx_b.to_async(),
2402        )
2403        .await
2404        .unwrap();
2405        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2406
2407        // Open a buffer as client B
2408        let buffer_b = project_b
2409            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2410            .await
2411            .unwrap();
2412        buffer_b.read_with(cx_b, |buf, _| {
2413            assert!(!buf.is_dirty());
2414            assert!(!buf.has_conflict());
2415        });
2416
2417        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2418            .await
2419            .unwrap();
2420        buffer_b
2421            .condition(&cx_b, |buf, _| {
2422                buf.text() == "new contents" && !buf.is_dirty()
2423            })
2424            .await;
2425        buffer_b.read_with(cx_b, |buf, _| {
2426            assert!(!buf.has_conflict());
2427        });
2428    }
2429
2430    #[gpui::test(iterations = 10)]
2431    async fn test_editing_while_guest_opens_buffer(
2432        cx_a: &mut TestAppContext,
2433        cx_b: &mut TestAppContext,
2434    ) {
2435        cx_a.foreground().forbid_parking();
2436        let lang_registry = Arc::new(LanguageRegistry::test());
2437        let fs = FakeFs::new(cx_a.background());
2438
2439        // Connect to a server as 2 clients.
2440        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2441        let client_a = server.create_client(cx_a, "user_a").await;
2442        let client_b = server.create_client(cx_b, "user_b").await;
2443        server
2444            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2445            .await;
2446
2447        // Share a project as client A
2448        fs.insert_tree(
2449            "/dir",
2450            json!({
2451                "a.txt": "a-contents",
2452            }),
2453        )
2454        .await;
2455        let project_a = cx_a.update(|cx| {
2456            Project::local(
2457                client_a.clone(),
2458                client_a.user_store.clone(),
2459                lang_registry.clone(),
2460                fs.clone(),
2461                cx,
2462            )
2463        });
2464        let (worktree_a, _) = project_a
2465            .update(cx_a, |p, cx| {
2466                p.find_or_create_local_worktree("/dir", true, cx)
2467            })
2468            .await
2469            .unwrap();
2470        worktree_a
2471            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2472            .await;
2473        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2474        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2475        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2476
2477        // Join that project as client B
2478        let project_b = Project::remote(
2479            project_id,
2480            client_b.clone(),
2481            client_b.user_store.clone(),
2482            lang_registry.clone(),
2483            fs.clone(),
2484            &mut cx_b.to_async(),
2485        )
2486        .await
2487        .unwrap();
2488
2489        // Open a buffer as client A
2490        let buffer_a = project_a
2491            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2492            .await
2493            .unwrap();
2494
2495        // Start opening the same buffer as client B
2496        let buffer_b = cx_b
2497            .background()
2498            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2499
2500        // Edit the buffer as client A while client B is still opening it.
2501        cx_b.background().simulate_random_delay().await;
2502        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2503        cx_b.background().simulate_random_delay().await;
2504        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2505
2506        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2507        let buffer_b = buffer_b.await.unwrap();
2508        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2509    }
2510
2511    #[gpui::test(iterations = 10)]
2512    async fn test_leaving_worktree_while_opening_buffer(
2513        cx_a: &mut TestAppContext,
2514        cx_b: &mut TestAppContext,
2515    ) {
2516        cx_a.foreground().forbid_parking();
2517        let lang_registry = Arc::new(LanguageRegistry::test());
2518        let fs = FakeFs::new(cx_a.background());
2519
2520        // Connect to a server as 2 clients.
2521        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2522        let client_a = server.create_client(cx_a, "user_a").await;
2523        let client_b = server.create_client(cx_b, "user_b").await;
2524        server
2525            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2526            .await;
2527
2528        // Share a project as client A
2529        fs.insert_tree(
2530            "/dir",
2531            json!({
2532                "a.txt": "a-contents",
2533            }),
2534        )
2535        .await;
2536        let project_a = cx_a.update(|cx| {
2537            Project::local(
2538                client_a.clone(),
2539                client_a.user_store.clone(),
2540                lang_registry.clone(),
2541                fs.clone(),
2542                cx,
2543            )
2544        });
2545        let (worktree_a, _) = project_a
2546            .update(cx_a, |p, cx| {
2547                p.find_or_create_local_worktree("/dir", true, cx)
2548            })
2549            .await
2550            .unwrap();
2551        worktree_a
2552            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2553            .await;
2554        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2555        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2556        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2557
2558        // Join that project as client B
2559        let project_b = Project::remote(
2560            project_id,
2561            client_b.clone(),
2562            client_b.user_store.clone(),
2563            lang_registry.clone(),
2564            fs.clone(),
2565            &mut cx_b.to_async(),
2566        )
2567        .await
2568        .unwrap();
2569
2570        // See that a guest has joined as client A.
2571        project_a
2572            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2573            .await;
2574
2575        // Begin opening a buffer as client B, but leave the project before the open completes.
2576        let buffer_b = cx_b
2577            .background()
2578            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2579        cx_b.update(|_| drop(project_b));
2580        drop(buffer_b);
2581
2582        // See that the guest has left.
2583        project_a
2584            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2585            .await;
2586    }
2587
2588    #[gpui::test(iterations = 10)]
2589    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2590        cx_a.foreground().forbid_parking();
2591        let lang_registry = Arc::new(LanguageRegistry::test());
2592        let fs = FakeFs::new(cx_a.background());
2593
2594        // Connect to a server as 2 clients.
2595        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2596        let client_a = server.create_client(cx_a, "user_a").await;
2597        let client_b = server.create_client(cx_b, "user_b").await;
2598        server
2599            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2600            .await;
2601
2602        // Share a project as client A
2603        fs.insert_tree(
2604            "/a",
2605            json!({
2606                "a.txt": "a-contents",
2607                "b.txt": "b-contents",
2608            }),
2609        )
2610        .await;
2611        let project_a = cx_a.update(|cx| {
2612            Project::local(
2613                client_a.clone(),
2614                client_a.user_store.clone(),
2615                lang_registry.clone(),
2616                fs.clone(),
2617                cx,
2618            )
2619        });
2620        let (worktree_a, _) = project_a
2621            .update(cx_a, |p, cx| {
2622                p.find_or_create_local_worktree("/a", true, cx)
2623            })
2624            .await
2625            .unwrap();
2626        worktree_a
2627            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2628            .await;
2629        let project_id = project_a
2630            .update(cx_a, |project, _| project.next_remote_id())
2631            .await;
2632        project_a
2633            .update(cx_a, |project, cx| project.share(cx))
2634            .await
2635            .unwrap();
2636
2637        // Join that project as client B
2638        let _project_b = Project::remote(
2639            project_id,
2640            client_b.clone(),
2641            client_b.user_store.clone(),
2642            lang_registry.clone(),
2643            fs.clone(),
2644            &mut cx_b.to_async(),
2645        )
2646        .await
2647        .unwrap();
2648
2649        // Client A sees that a guest has joined.
2650        project_a
2651            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2652            .await;
2653
2654        // Drop client B's connection and ensure client A observes client B leaving the project.
2655        client_b.disconnect(&cx_b.to_async()).unwrap();
2656        project_a
2657            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2658            .await;
2659
2660        // Rejoin the project as client B
2661        let _project_b = Project::remote(
2662            project_id,
2663            client_b.clone(),
2664            client_b.user_store.clone(),
2665            lang_registry.clone(),
2666            fs.clone(),
2667            &mut cx_b.to_async(),
2668        )
2669        .await
2670        .unwrap();
2671
2672        // Client A sees that a guest has re-joined.
2673        project_a
2674            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2675            .await;
2676
2677        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2678        client_b.wait_for_current_user(cx_b).await;
2679        server.disconnect_client(client_b.current_user_id(cx_b));
2680        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2681        project_a
2682            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2683            .await;
2684    }
2685
2686    #[gpui::test(iterations = 10)]
2687    async fn test_collaborating_with_diagnostics(
2688        cx_a: &mut TestAppContext,
2689        cx_b: &mut TestAppContext,
2690    ) {
2691        cx_a.foreground().forbid_parking();
2692        let lang_registry = Arc::new(LanguageRegistry::test());
2693        let fs = FakeFs::new(cx_a.background());
2694
2695        // Set up a fake language server.
2696        let mut language = Language::new(
2697            LanguageConfig {
2698                name: "Rust".into(),
2699                path_suffixes: vec!["rs".to_string()],
2700                ..Default::default()
2701            },
2702            Some(tree_sitter_rust::language()),
2703        );
2704        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2705        lang_registry.add(Arc::new(language));
2706
2707        // Connect to a server as 2 clients.
2708        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2709        let client_a = server.create_client(cx_a, "user_a").await;
2710        let client_b = server.create_client(cx_b, "user_b").await;
2711        server
2712            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2713            .await;
2714
2715        // Share a project as client A
2716        fs.insert_tree(
2717            "/a",
2718            json!({
2719                "a.rs": "let one = two",
2720                "other.rs": "",
2721            }),
2722        )
2723        .await;
2724        let project_a = cx_a.update(|cx| {
2725            Project::local(
2726                client_a.clone(),
2727                client_a.user_store.clone(),
2728                lang_registry.clone(),
2729                fs.clone(),
2730                cx,
2731            )
2732        });
2733        let (worktree_a, _) = project_a
2734            .update(cx_a, |p, cx| {
2735                p.find_or_create_local_worktree("/a", true, cx)
2736            })
2737            .await
2738            .unwrap();
2739        worktree_a
2740            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2741            .await;
2742        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2743        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2744        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2745
2746        // Cause the language server to start.
2747        let _ = cx_a
2748            .background()
2749            .spawn(project_a.update(cx_a, |project, cx| {
2750                project.open_buffer(
2751                    ProjectPath {
2752                        worktree_id,
2753                        path: Path::new("other.rs").into(),
2754                    },
2755                    cx,
2756                )
2757            }))
2758            .await
2759            .unwrap();
2760
2761        // Simulate a language server reporting errors for a file.
2762        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2763        fake_language_server
2764            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2765            .await;
2766        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2767            lsp::PublishDiagnosticsParams {
2768                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2769                version: None,
2770                diagnostics: vec![lsp::Diagnostic {
2771                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2772                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2773                    message: "message 1".to_string(),
2774                    ..Default::default()
2775                }],
2776            },
2777        );
2778
2779        // Wait for server to see the diagnostics update.
2780        server
2781            .condition(|store| {
2782                let worktree = store
2783                    .project(project_id)
2784                    .unwrap()
2785                    .share
2786                    .as_ref()
2787                    .unwrap()
2788                    .worktrees
2789                    .get(&worktree_id.to_proto())
2790                    .unwrap();
2791
2792                !worktree.diagnostic_summaries.is_empty()
2793            })
2794            .await;
2795
2796        // Join the worktree as client B.
2797        let project_b = Project::remote(
2798            project_id,
2799            client_b.clone(),
2800            client_b.user_store.clone(),
2801            lang_registry.clone(),
2802            fs.clone(),
2803            &mut cx_b.to_async(),
2804        )
2805        .await
2806        .unwrap();
2807
2808        project_b.read_with(cx_b, |project, cx| {
2809            assert_eq!(
2810                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2811                &[(
2812                    ProjectPath {
2813                        worktree_id,
2814                        path: Arc::from(Path::new("a.rs")),
2815                    },
2816                    DiagnosticSummary {
2817                        error_count: 1,
2818                        warning_count: 0,
2819                        ..Default::default()
2820                    },
2821                )]
2822            )
2823        });
2824
2825        // Simulate a language server reporting more errors for a file.
2826        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2827            lsp::PublishDiagnosticsParams {
2828                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2829                version: None,
2830                diagnostics: vec![
2831                    lsp::Diagnostic {
2832                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2833                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2834                        message: "message 1".to_string(),
2835                        ..Default::default()
2836                    },
2837                    lsp::Diagnostic {
2838                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2839                        range: lsp::Range::new(
2840                            lsp::Position::new(0, 10),
2841                            lsp::Position::new(0, 13),
2842                        ),
2843                        message: "message 2".to_string(),
2844                        ..Default::default()
2845                    },
2846                ],
2847            },
2848        );
2849
2850        // Client b gets the updated summaries
2851        project_b
2852            .condition(&cx_b, |project, cx| {
2853                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2854                    == &[(
2855                        ProjectPath {
2856                            worktree_id,
2857                            path: Arc::from(Path::new("a.rs")),
2858                        },
2859                        DiagnosticSummary {
2860                            error_count: 1,
2861                            warning_count: 1,
2862                            ..Default::default()
2863                        },
2864                    )]
2865            })
2866            .await;
2867
2868        // Open the file with the errors on client B. They should be present.
2869        let buffer_b = cx_b
2870            .background()
2871            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2872            .await
2873            .unwrap();
2874
2875        buffer_b.read_with(cx_b, |buffer, _| {
2876            assert_eq!(
2877                buffer
2878                    .snapshot()
2879                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2880                    .map(|entry| entry)
2881                    .collect::<Vec<_>>(),
2882                &[
2883                    DiagnosticEntry {
2884                        range: Point::new(0, 4)..Point::new(0, 7),
2885                        diagnostic: Diagnostic {
2886                            group_id: 0,
2887                            message: "message 1".to_string(),
2888                            severity: lsp::DiagnosticSeverity::ERROR,
2889                            is_primary: true,
2890                            ..Default::default()
2891                        }
2892                    },
2893                    DiagnosticEntry {
2894                        range: Point::new(0, 10)..Point::new(0, 13),
2895                        diagnostic: Diagnostic {
2896                            group_id: 1,
2897                            severity: lsp::DiagnosticSeverity::WARNING,
2898                            message: "message 2".to_string(),
2899                            is_primary: true,
2900                            ..Default::default()
2901                        }
2902                    }
2903                ]
2904            );
2905        });
2906
2907        // Simulate a language server reporting no errors for a file.
2908        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2909            lsp::PublishDiagnosticsParams {
2910                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2911                version: None,
2912                diagnostics: vec![],
2913            },
2914        );
2915        project_a
2916            .condition(cx_a, |project, cx| {
2917                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2918            })
2919            .await;
2920        project_b
2921            .condition(cx_b, |project, cx| {
2922                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2923            })
2924            .await;
2925    }
2926
2927    #[gpui::test(iterations = 10)]
2928    async fn test_collaborating_with_completion(
2929        cx_a: &mut TestAppContext,
2930        cx_b: &mut TestAppContext,
2931    ) {
2932        cx_a.foreground().forbid_parking();
2933        let lang_registry = Arc::new(LanguageRegistry::test());
2934        let fs = FakeFs::new(cx_a.background());
2935
2936        // Set up a fake language server.
2937        let mut language = Language::new(
2938            LanguageConfig {
2939                name: "Rust".into(),
2940                path_suffixes: vec!["rs".to_string()],
2941                ..Default::default()
2942            },
2943            Some(tree_sitter_rust::language()),
2944        );
2945        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2946            capabilities: lsp::ServerCapabilities {
2947                completion_provider: Some(lsp::CompletionOptions {
2948                    trigger_characters: Some(vec![".".to_string()]),
2949                    ..Default::default()
2950                }),
2951                ..Default::default()
2952            },
2953            ..Default::default()
2954        });
2955        lang_registry.add(Arc::new(language));
2956
2957        // Connect to a server as 2 clients.
2958        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2959        let client_a = server.create_client(cx_a, "user_a").await;
2960        let client_b = server.create_client(cx_b, "user_b").await;
2961        server
2962            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2963            .await;
2964
2965        // Share a project as client A
2966        fs.insert_tree(
2967            "/a",
2968            json!({
2969                "main.rs": "fn main() { a }",
2970                "other.rs": "",
2971            }),
2972        )
2973        .await;
2974        let project_a = cx_a.update(|cx| {
2975            Project::local(
2976                client_a.clone(),
2977                client_a.user_store.clone(),
2978                lang_registry.clone(),
2979                fs.clone(),
2980                cx,
2981            )
2982        });
2983        let (worktree_a, _) = project_a
2984            .update(cx_a, |p, cx| {
2985                p.find_or_create_local_worktree("/a", true, cx)
2986            })
2987            .await
2988            .unwrap();
2989        worktree_a
2990            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2991            .await;
2992        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2993        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2994        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2995
2996        // Join the worktree as client B.
2997        let project_b = Project::remote(
2998            project_id,
2999            client_b.clone(),
3000            client_b.user_store.clone(),
3001            lang_registry.clone(),
3002            fs.clone(),
3003            &mut cx_b.to_async(),
3004        )
3005        .await
3006        .unwrap();
3007
3008        // Open a file in an editor as the guest.
3009        let buffer_b = project_b
3010            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3011            .await
3012            .unwrap();
3013        let (window_b, _) = cx_b.add_window(|_| EmptyView);
3014        let editor_b = cx_b.add_view(window_b, |cx| {
3015            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
3016        });
3017
3018        let fake_language_server = fake_language_servers.next().await.unwrap();
3019        buffer_b
3020            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3021            .await;
3022
3023        // Type a completion trigger character as the guest.
3024        editor_b.update(cx_b, |editor, cx| {
3025            editor.select_ranges([13..13], None, cx);
3026            editor.handle_input(&Input(".".into()), cx);
3027            cx.focus(&editor_b);
3028        });
3029
3030        // Receive a completion request as the host's language server.
3031        // Return some completions from the host's language server.
3032        cx_a.foreground().start_waiting();
3033        fake_language_server
3034            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3035                assert_eq!(
3036                    params.text_document_position.text_document.uri,
3037                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3038                );
3039                assert_eq!(
3040                    params.text_document_position.position,
3041                    lsp::Position::new(0, 14),
3042                );
3043
3044                Ok(Some(lsp::CompletionResponse::Array(vec![
3045                    lsp::CompletionItem {
3046                        label: "first_method(…)".into(),
3047                        detail: Some("fn(&mut self, B) -> C".into()),
3048                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3049                            new_text: "first_method($1)".to_string(),
3050                            range: lsp::Range::new(
3051                                lsp::Position::new(0, 14),
3052                                lsp::Position::new(0, 14),
3053                            ),
3054                        })),
3055                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3056                        ..Default::default()
3057                    },
3058                    lsp::CompletionItem {
3059                        label: "second_method(…)".into(),
3060                        detail: Some("fn(&mut self, C) -> D<E>".into()),
3061                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3062                            new_text: "second_method()".to_string(),
3063                            range: lsp::Range::new(
3064                                lsp::Position::new(0, 14),
3065                                lsp::Position::new(0, 14),
3066                            ),
3067                        })),
3068                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3069                        ..Default::default()
3070                    },
3071                ])))
3072            })
3073            .next()
3074            .await
3075            .unwrap();
3076        cx_a.foreground().finish_waiting();
3077
3078        // Open the buffer on the host.
3079        let buffer_a = project_a
3080            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3081            .await
3082            .unwrap();
3083        buffer_a
3084            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3085            .await;
3086
3087        // Confirm a completion on the guest.
3088        editor_b
3089            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3090            .await;
3091        editor_b.update(cx_b, |editor, cx| {
3092            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3093            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3094        });
3095
3096        // Return a resolved completion from the host's language server.
3097        // The resolved completion has an additional text edit.
3098        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3099            |params, _| async move {
3100                assert_eq!(params.label, "first_method(…)");
3101                Ok(lsp::CompletionItem {
3102                    label: "first_method(…)".into(),
3103                    detail: Some("fn(&mut self, B) -> C".into()),
3104                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3105                        new_text: "first_method($1)".to_string(),
3106                        range: lsp::Range::new(
3107                            lsp::Position::new(0, 14),
3108                            lsp::Position::new(0, 14),
3109                        ),
3110                    })),
3111                    additional_text_edits: Some(vec![lsp::TextEdit {
3112                        new_text: "use d::SomeTrait;\n".to_string(),
3113                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3114                    }]),
3115                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3116                    ..Default::default()
3117                })
3118            },
3119        );
3120
3121        // The additional edit is applied.
3122        buffer_a
3123            .condition(&cx_a, |buffer, _| {
3124                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3125            })
3126            .await;
3127        buffer_b
3128            .condition(&cx_b, |buffer, _| {
3129                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3130            })
3131            .await;
3132    }
3133
3134    #[gpui::test(iterations = 10)]
3135    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3136        cx_a.foreground().forbid_parking();
3137        let lang_registry = Arc::new(LanguageRegistry::test());
3138        let fs = FakeFs::new(cx_a.background());
3139
3140        // Connect to a server as 2 clients.
3141        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3142        let client_a = server.create_client(cx_a, "user_a").await;
3143        let client_b = server.create_client(cx_b, "user_b").await;
3144        server
3145            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3146            .await;
3147
3148        // Share a project as client A
3149        fs.insert_tree(
3150            "/a",
3151            json!({
3152                "a.rs": "let one = 1;",
3153            }),
3154        )
3155        .await;
3156        let project_a = cx_a.update(|cx| {
3157            Project::local(
3158                client_a.clone(),
3159                client_a.user_store.clone(),
3160                lang_registry.clone(),
3161                fs.clone(),
3162                cx,
3163            )
3164        });
3165        let (worktree_a, _) = project_a
3166            .update(cx_a, |p, cx| {
3167                p.find_or_create_local_worktree("/a", true, cx)
3168            })
3169            .await
3170            .unwrap();
3171        worktree_a
3172            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3173            .await;
3174        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3175        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3176        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3177        let buffer_a = project_a
3178            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3179            .await
3180            .unwrap();
3181
3182        // Join the worktree as client B.
3183        let project_b = Project::remote(
3184            project_id,
3185            client_b.clone(),
3186            client_b.user_store.clone(),
3187            lang_registry.clone(),
3188            fs.clone(),
3189            &mut cx_b.to_async(),
3190        )
3191        .await
3192        .unwrap();
3193
3194        let buffer_b = cx_b
3195            .background()
3196            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3197            .await
3198            .unwrap();
3199        buffer_b.update(cx_b, |buffer, cx| {
3200            buffer.edit([(4..7, "six")], cx);
3201            buffer.edit([(10..11, "6")], cx);
3202            assert_eq!(buffer.text(), "let six = 6;");
3203            assert!(buffer.is_dirty());
3204            assert!(!buffer.has_conflict());
3205        });
3206        buffer_a
3207            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3208            .await;
3209
3210        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3211            .await
3212            .unwrap();
3213        buffer_a
3214            .condition(cx_a, |buffer, _| buffer.has_conflict())
3215            .await;
3216        buffer_b
3217            .condition(cx_b, |buffer, _| buffer.has_conflict())
3218            .await;
3219
3220        project_b
3221            .update(cx_b, |project, cx| {
3222                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3223            })
3224            .await
3225            .unwrap();
3226        buffer_a.read_with(cx_a, |buffer, _| {
3227            assert_eq!(buffer.text(), "let seven = 7;");
3228            assert!(!buffer.is_dirty());
3229            assert!(!buffer.has_conflict());
3230        });
3231        buffer_b.read_with(cx_b, |buffer, _| {
3232            assert_eq!(buffer.text(), "let seven = 7;");
3233            assert!(!buffer.is_dirty());
3234            assert!(!buffer.has_conflict());
3235        });
3236
3237        buffer_a.update(cx_a, |buffer, cx| {
3238            // Undoing on the host is a no-op when the reload was initiated by the guest.
3239            buffer.undo(cx);
3240            assert_eq!(buffer.text(), "let seven = 7;");
3241            assert!(!buffer.is_dirty());
3242            assert!(!buffer.has_conflict());
3243        });
3244        buffer_b.update(cx_b, |buffer, cx| {
3245            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3246            buffer.undo(cx);
3247            assert_eq!(buffer.text(), "let six = 6;");
3248            assert!(buffer.is_dirty());
3249            assert!(!buffer.has_conflict());
3250        });
3251    }
3252
3253    #[gpui::test(iterations = 10)]
3254    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3255        cx_a.foreground().forbid_parking();
3256        let lang_registry = Arc::new(LanguageRegistry::test());
3257        let fs = FakeFs::new(cx_a.background());
3258
3259        // Set up a fake language server.
3260        let mut language = Language::new(
3261            LanguageConfig {
3262                name: "Rust".into(),
3263                path_suffixes: vec!["rs".to_string()],
3264                ..Default::default()
3265            },
3266            Some(tree_sitter_rust::language()),
3267        );
3268        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3269        lang_registry.add(Arc::new(language));
3270
3271        // Connect to a server as 2 clients.
3272        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3273        let client_a = server.create_client(cx_a, "user_a").await;
3274        let client_b = server.create_client(cx_b, "user_b").await;
3275        server
3276            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3277            .await;
3278
3279        // Share a project as client A
3280        fs.insert_tree(
3281            "/a",
3282            json!({
3283                "a.rs": "let one = two",
3284            }),
3285        )
3286        .await;
3287        let project_a = cx_a.update(|cx| {
3288            Project::local(
3289                client_a.clone(),
3290                client_a.user_store.clone(),
3291                lang_registry.clone(),
3292                fs.clone(),
3293                cx,
3294            )
3295        });
3296        let (worktree_a, _) = project_a
3297            .update(cx_a, |p, cx| {
3298                p.find_or_create_local_worktree("/a", true, cx)
3299            })
3300            .await
3301            .unwrap();
3302        worktree_a
3303            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3304            .await;
3305        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3306        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3307        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3308
3309        // Join the worktree as client B.
3310        let project_b = Project::remote(
3311            project_id,
3312            client_b.clone(),
3313            client_b.user_store.clone(),
3314            lang_registry.clone(),
3315            fs.clone(),
3316            &mut cx_b.to_async(),
3317        )
3318        .await
3319        .unwrap();
3320
3321        let buffer_b = cx_b
3322            .background()
3323            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3324            .await
3325            .unwrap();
3326
3327        let fake_language_server = fake_language_servers.next().await.unwrap();
3328        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3329            Ok(Some(vec![
3330                lsp::TextEdit {
3331                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3332                    new_text: "h".to_string(),
3333                },
3334                lsp::TextEdit {
3335                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3336                    new_text: "y".to_string(),
3337                },
3338            ]))
3339        });
3340
3341        project_b
3342            .update(cx_b, |project, cx| {
3343                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3344            })
3345            .await
3346            .unwrap();
3347        assert_eq!(
3348            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3349            "let honey = two"
3350        );
3351    }
3352
3353    #[gpui::test(iterations = 10)]
3354    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3355        cx_a.foreground().forbid_parking();
3356        let lang_registry = Arc::new(LanguageRegistry::test());
3357        let fs = FakeFs::new(cx_a.background());
3358        fs.insert_tree(
3359            "/root-1",
3360            json!({
3361                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3362            }),
3363        )
3364        .await;
3365        fs.insert_tree(
3366            "/root-2",
3367            json!({
3368                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3369            }),
3370        )
3371        .await;
3372
3373        // Set up a fake language server.
3374        let mut language = Language::new(
3375            LanguageConfig {
3376                name: "Rust".into(),
3377                path_suffixes: vec!["rs".to_string()],
3378                ..Default::default()
3379            },
3380            Some(tree_sitter_rust::language()),
3381        );
3382        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3383        lang_registry.add(Arc::new(language));
3384
3385        // Connect to a server as 2 clients.
3386        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3387        let client_a = server.create_client(cx_a, "user_a").await;
3388        let client_b = server.create_client(cx_b, "user_b").await;
3389        server
3390            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3391            .await;
3392
3393        // Share a project as client A
3394        let project_a = cx_a.update(|cx| {
3395            Project::local(
3396                client_a.clone(),
3397                client_a.user_store.clone(),
3398                lang_registry.clone(),
3399                fs.clone(),
3400                cx,
3401            )
3402        });
3403        let (worktree_a, _) = project_a
3404            .update(cx_a, |p, cx| {
3405                p.find_or_create_local_worktree("/root-1", true, cx)
3406            })
3407            .await
3408            .unwrap();
3409        worktree_a
3410            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3411            .await;
3412        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3413        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3414        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3415
3416        // Join the worktree as client B.
3417        let project_b = Project::remote(
3418            project_id,
3419            client_b.clone(),
3420            client_b.user_store.clone(),
3421            lang_registry.clone(),
3422            fs.clone(),
3423            &mut cx_b.to_async(),
3424        )
3425        .await
3426        .unwrap();
3427
3428        // Open the file on client B.
3429        let buffer_b = cx_b
3430            .background()
3431            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3432            .await
3433            .unwrap();
3434
3435        // Request the definition of a symbol as the guest.
3436        let fake_language_server = fake_language_servers.next().await.unwrap();
3437        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3438            |_, _| async move {
3439                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3440                    lsp::Location::new(
3441                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3442                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3443                    ),
3444                )))
3445            },
3446        );
3447
3448        let definitions_1 = project_b
3449            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3450            .await
3451            .unwrap();
3452        cx_b.read(|cx| {
3453            assert_eq!(definitions_1.len(), 1);
3454            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3455            let target_buffer = definitions_1[0].buffer.read(cx);
3456            assert_eq!(
3457                target_buffer.text(),
3458                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3459            );
3460            assert_eq!(
3461                definitions_1[0].range.to_point(target_buffer),
3462                Point::new(0, 6)..Point::new(0, 9)
3463            );
3464        });
3465
3466        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3467        // the previous call to `definition`.
3468        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3469            |_, _| async move {
3470                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3471                    lsp::Location::new(
3472                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3473                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3474                    ),
3475                )))
3476            },
3477        );
3478
3479        let definitions_2 = project_b
3480            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3481            .await
3482            .unwrap();
3483        cx_b.read(|cx| {
3484            assert_eq!(definitions_2.len(), 1);
3485            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3486            let target_buffer = definitions_2[0].buffer.read(cx);
3487            assert_eq!(
3488                target_buffer.text(),
3489                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3490            );
3491            assert_eq!(
3492                definitions_2[0].range.to_point(target_buffer),
3493                Point::new(1, 6)..Point::new(1, 11)
3494            );
3495        });
3496        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3497    }
3498
3499    #[gpui::test(iterations = 10)]
3500    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3501        cx_a.foreground().forbid_parking();
3502        let lang_registry = Arc::new(LanguageRegistry::test());
3503        let fs = FakeFs::new(cx_a.background());
3504        fs.insert_tree(
3505            "/root-1",
3506            json!({
3507                "one.rs": "const ONE: usize = 1;",
3508                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3509            }),
3510        )
3511        .await;
3512        fs.insert_tree(
3513            "/root-2",
3514            json!({
3515                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3516            }),
3517        )
3518        .await;
3519
3520        // Set up a fake language server.
3521        let mut language = Language::new(
3522            LanguageConfig {
3523                name: "Rust".into(),
3524                path_suffixes: vec!["rs".to_string()],
3525                ..Default::default()
3526            },
3527            Some(tree_sitter_rust::language()),
3528        );
3529        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3530        lang_registry.add(Arc::new(language));
3531
3532        // Connect to a server as 2 clients.
3533        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3534        let client_a = server.create_client(cx_a, "user_a").await;
3535        let client_b = server.create_client(cx_b, "user_b").await;
3536        server
3537            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3538            .await;
3539
3540        // Share a project as client A
3541        let project_a = cx_a.update(|cx| {
3542            Project::local(
3543                client_a.clone(),
3544                client_a.user_store.clone(),
3545                lang_registry.clone(),
3546                fs.clone(),
3547                cx,
3548            )
3549        });
3550        let (worktree_a, _) = project_a
3551            .update(cx_a, |p, cx| {
3552                p.find_or_create_local_worktree("/root-1", true, cx)
3553            })
3554            .await
3555            .unwrap();
3556        worktree_a
3557            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3558            .await;
3559        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3560        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3561        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3562
3563        // Join the worktree as client B.
3564        let project_b = Project::remote(
3565            project_id,
3566            client_b.clone(),
3567            client_b.user_store.clone(),
3568            lang_registry.clone(),
3569            fs.clone(),
3570            &mut cx_b.to_async(),
3571        )
3572        .await
3573        .unwrap();
3574
3575        // Open the file on client B.
3576        let buffer_b = cx_b
3577            .background()
3578            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3579            .await
3580            .unwrap();
3581
3582        // Request references to a symbol as the guest.
3583        let fake_language_server = fake_language_servers.next().await.unwrap();
3584        fake_language_server.handle_request::<lsp::request::References, _, _>(
3585            |params, _| async move {
3586                assert_eq!(
3587                    params.text_document_position.text_document.uri.as_str(),
3588                    "file:///root-1/one.rs"
3589                );
3590                Ok(Some(vec![
3591                    lsp::Location {
3592                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3593                        range: lsp::Range::new(
3594                            lsp::Position::new(0, 24),
3595                            lsp::Position::new(0, 27),
3596                        ),
3597                    },
3598                    lsp::Location {
3599                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3600                        range: lsp::Range::new(
3601                            lsp::Position::new(0, 35),
3602                            lsp::Position::new(0, 38),
3603                        ),
3604                    },
3605                    lsp::Location {
3606                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3607                        range: lsp::Range::new(
3608                            lsp::Position::new(0, 37),
3609                            lsp::Position::new(0, 40),
3610                        ),
3611                    },
3612                ]))
3613            },
3614        );
3615
3616        let references = project_b
3617            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3618            .await
3619            .unwrap();
3620        cx_b.read(|cx| {
3621            assert_eq!(references.len(), 3);
3622            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3623
3624            let two_buffer = references[0].buffer.read(cx);
3625            let three_buffer = references[2].buffer.read(cx);
3626            assert_eq!(
3627                two_buffer.file().unwrap().path().as_ref(),
3628                Path::new("two.rs")
3629            );
3630            assert_eq!(references[1].buffer, references[0].buffer);
3631            assert_eq!(
3632                three_buffer.file().unwrap().full_path(cx),
3633                Path::new("three.rs")
3634            );
3635
3636            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3637            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3638            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3639        });
3640    }
3641
3642    #[gpui::test(iterations = 10)]
3643    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3644        cx_a.foreground().forbid_parking();
3645        let lang_registry = Arc::new(LanguageRegistry::test());
3646        let fs = FakeFs::new(cx_a.background());
3647        fs.insert_tree(
3648            "/root-1",
3649            json!({
3650                "a": "hello world",
3651                "b": "goodnight moon",
3652                "c": "a world of goo",
3653                "d": "world champion of clown world",
3654            }),
3655        )
3656        .await;
3657        fs.insert_tree(
3658            "/root-2",
3659            json!({
3660                "e": "disney world is fun",
3661            }),
3662        )
3663        .await;
3664
3665        // Connect to a server as 2 clients.
3666        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3667        let client_a = server.create_client(cx_a, "user_a").await;
3668        let client_b = server.create_client(cx_b, "user_b").await;
3669        server
3670            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3671            .await;
3672
3673        // Share a project as client A
3674        let project_a = cx_a.update(|cx| {
3675            Project::local(
3676                client_a.clone(),
3677                client_a.user_store.clone(),
3678                lang_registry.clone(),
3679                fs.clone(),
3680                cx,
3681            )
3682        });
3683        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3684
3685        let (worktree_1, _) = project_a
3686            .update(cx_a, |p, cx| {
3687                p.find_or_create_local_worktree("/root-1", true, cx)
3688            })
3689            .await
3690            .unwrap();
3691        worktree_1
3692            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3693            .await;
3694        let (worktree_2, _) = project_a
3695            .update(cx_a, |p, cx| {
3696                p.find_or_create_local_worktree("/root-2", true, cx)
3697            })
3698            .await
3699            .unwrap();
3700        worktree_2
3701            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3702            .await;
3703
3704        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3705
3706        // Join the worktree as client B.
3707        let project_b = Project::remote(
3708            project_id,
3709            client_b.clone(),
3710            client_b.user_store.clone(),
3711            lang_registry.clone(),
3712            fs.clone(),
3713            &mut cx_b.to_async(),
3714        )
3715        .await
3716        .unwrap();
3717
3718        let results = project_b
3719            .update(cx_b, |project, cx| {
3720                project.search(SearchQuery::text("world", false, false), cx)
3721            })
3722            .await
3723            .unwrap();
3724
3725        let mut ranges_by_path = results
3726            .into_iter()
3727            .map(|(buffer, ranges)| {
3728                buffer.read_with(cx_b, |buffer, cx| {
3729                    let path = buffer.file().unwrap().full_path(cx);
3730                    let offset_ranges = ranges
3731                        .into_iter()
3732                        .map(|range| range.to_offset(buffer))
3733                        .collect::<Vec<_>>();
3734                    (path, offset_ranges)
3735                })
3736            })
3737            .collect::<Vec<_>>();
3738        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3739
3740        assert_eq!(
3741            ranges_by_path,
3742            &[
3743                (PathBuf::from("root-1/a"), vec![6..11]),
3744                (PathBuf::from("root-1/c"), vec![2..7]),
3745                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3746                (PathBuf::from("root-2/e"), vec![7..12]),
3747            ]
3748        );
3749    }
3750
3751    #[gpui::test(iterations = 10)]
3752    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3753        cx_a.foreground().forbid_parking();
3754        let lang_registry = Arc::new(LanguageRegistry::test());
3755        let fs = FakeFs::new(cx_a.background());
3756        fs.insert_tree(
3757            "/root-1",
3758            json!({
3759                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3760            }),
3761        )
3762        .await;
3763
3764        // Set up a fake language server.
3765        let mut language = Language::new(
3766            LanguageConfig {
3767                name: "Rust".into(),
3768                path_suffixes: vec!["rs".to_string()],
3769                ..Default::default()
3770            },
3771            Some(tree_sitter_rust::language()),
3772        );
3773        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3774        lang_registry.add(Arc::new(language));
3775
3776        // Connect to a server as 2 clients.
3777        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3778        let client_a = server.create_client(cx_a, "user_a").await;
3779        let client_b = server.create_client(cx_b, "user_b").await;
3780        server
3781            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3782            .await;
3783
3784        // Share a project as client A
3785        let project_a = cx_a.update(|cx| {
3786            Project::local(
3787                client_a.clone(),
3788                client_a.user_store.clone(),
3789                lang_registry.clone(),
3790                fs.clone(),
3791                cx,
3792            )
3793        });
3794        let (worktree_a, _) = project_a
3795            .update(cx_a, |p, cx| {
3796                p.find_or_create_local_worktree("/root-1", true, cx)
3797            })
3798            .await
3799            .unwrap();
3800        worktree_a
3801            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3802            .await;
3803        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3804        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3805        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3806
3807        // Join the worktree as client B.
3808        let project_b = Project::remote(
3809            project_id,
3810            client_b.clone(),
3811            client_b.user_store.clone(),
3812            lang_registry.clone(),
3813            fs.clone(),
3814            &mut cx_b.to_async(),
3815        )
3816        .await
3817        .unwrap();
3818
3819        // Open the file on client B.
3820        let buffer_b = cx_b
3821            .background()
3822            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3823            .await
3824            .unwrap();
3825
3826        // Request document highlights as the guest.
3827        let fake_language_server = fake_language_servers.next().await.unwrap();
3828        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3829            |params, _| async move {
3830                assert_eq!(
3831                    params
3832                        .text_document_position_params
3833                        .text_document
3834                        .uri
3835                        .as_str(),
3836                    "file:///root-1/main.rs"
3837                );
3838                assert_eq!(
3839                    params.text_document_position_params.position,
3840                    lsp::Position::new(0, 34)
3841                );
3842                Ok(Some(vec![
3843                    lsp::DocumentHighlight {
3844                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3845                        range: lsp::Range::new(
3846                            lsp::Position::new(0, 10),
3847                            lsp::Position::new(0, 16),
3848                        ),
3849                    },
3850                    lsp::DocumentHighlight {
3851                        kind: Some(lsp::DocumentHighlightKind::READ),
3852                        range: lsp::Range::new(
3853                            lsp::Position::new(0, 32),
3854                            lsp::Position::new(0, 38),
3855                        ),
3856                    },
3857                    lsp::DocumentHighlight {
3858                        kind: Some(lsp::DocumentHighlightKind::READ),
3859                        range: lsp::Range::new(
3860                            lsp::Position::new(0, 41),
3861                            lsp::Position::new(0, 47),
3862                        ),
3863                    },
3864                ]))
3865            },
3866        );
3867
3868        let highlights = project_b
3869            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3870            .await
3871            .unwrap();
3872        buffer_b.read_with(cx_b, |buffer, _| {
3873            let snapshot = buffer.snapshot();
3874
3875            let highlights = highlights
3876                .into_iter()
3877                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3878                .collect::<Vec<_>>();
3879            assert_eq!(
3880                highlights,
3881                &[
3882                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3883                    (lsp::DocumentHighlightKind::READ, 32..38),
3884                    (lsp::DocumentHighlightKind::READ, 41..47)
3885                ]
3886            )
3887        });
3888    }
3889
3890    #[gpui::test(iterations = 10)]
3891    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3892        cx_a.foreground().forbid_parking();
3893        let lang_registry = Arc::new(LanguageRegistry::test());
3894        let fs = FakeFs::new(cx_a.background());
3895        fs.insert_tree(
3896            "/code",
3897            json!({
3898                "crate-1": {
3899                    "one.rs": "const ONE: usize = 1;",
3900                },
3901                "crate-2": {
3902                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3903                },
3904                "private": {
3905                    "passwords.txt": "the-password",
3906                }
3907            }),
3908        )
3909        .await;
3910
3911        // Set up a fake language server.
3912        let mut language = Language::new(
3913            LanguageConfig {
3914                name: "Rust".into(),
3915                path_suffixes: vec!["rs".to_string()],
3916                ..Default::default()
3917            },
3918            Some(tree_sitter_rust::language()),
3919        );
3920        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3921        lang_registry.add(Arc::new(language));
3922
3923        // Connect to a server as 2 clients.
3924        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3925        let client_a = server.create_client(cx_a, "user_a").await;
3926        let client_b = server.create_client(cx_b, "user_b").await;
3927        server
3928            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3929            .await;
3930
3931        // Share a project as client A
3932        let project_a = cx_a.update(|cx| {
3933            Project::local(
3934                client_a.clone(),
3935                client_a.user_store.clone(),
3936                lang_registry.clone(),
3937                fs.clone(),
3938                cx,
3939            )
3940        });
3941        let (worktree_a, _) = project_a
3942            .update(cx_a, |p, cx| {
3943                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3944            })
3945            .await
3946            .unwrap();
3947        worktree_a
3948            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3949            .await;
3950        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3951        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3952        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3953
3954        // Join the worktree as client B.
3955        let project_b = Project::remote(
3956            project_id,
3957            client_b.clone(),
3958            client_b.user_store.clone(),
3959            lang_registry.clone(),
3960            fs.clone(),
3961            &mut cx_b.to_async(),
3962        )
3963        .await
3964        .unwrap();
3965
3966        // Cause the language server to start.
3967        let _buffer = cx_b
3968            .background()
3969            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3970            .await
3971            .unwrap();
3972
3973        let fake_language_server = fake_language_servers.next().await.unwrap();
3974        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3975            |_, _| async move {
3976                #[allow(deprecated)]
3977                Ok(Some(vec![lsp::SymbolInformation {
3978                    name: "TWO".into(),
3979                    location: lsp::Location {
3980                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3981                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3982                    },
3983                    kind: lsp::SymbolKind::CONSTANT,
3984                    tags: None,
3985                    container_name: None,
3986                    deprecated: None,
3987                }]))
3988            },
3989        );
3990
3991        // Request the definition of a symbol as the guest.
3992        let symbols = project_b
3993            .update(cx_b, |p, cx| p.symbols("two", cx))
3994            .await
3995            .unwrap();
3996        assert_eq!(symbols.len(), 1);
3997        assert_eq!(symbols[0].name, "TWO");
3998
3999        // Open one of the returned symbols.
4000        let buffer_b_2 = project_b
4001            .update(cx_b, |project, cx| {
4002                project.open_buffer_for_symbol(&symbols[0], cx)
4003            })
4004            .await
4005            .unwrap();
4006        buffer_b_2.read_with(cx_b, |buffer, _| {
4007            assert_eq!(
4008                buffer.file().unwrap().path().as_ref(),
4009                Path::new("../crate-2/two.rs")
4010            );
4011        });
4012
4013        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
4014        let mut fake_symbol = symbols[0].clone();
4015        fake_symbol.path = Path::new("/code/secrets").into();
4016        let error = project_b
4017            .update(cx_b, |project, cx| {
4018                project.open_buffer_for_symbol(&fake_symbol, cx)
4019            })
4020            .await
4021            .unwrap_err();
4022        assert!(error.to_string().contains("invalid symbol signature"));
4023    }
4024
4025    #[gpui::test(iterations = 10)]
4026    async fn test_open_buffer_while_getting_definition_pointing_to_it(
4027        cx_a: &mut TestAppContext,
4028        cx_b: &mut TestAppContext,
4029        mut rng: StdRng,
4030    ) {
4031        cx_a.foreground().forbid_parking();
4032        let lang_registry = Arc::new(LanguageRegistry::test());
4033        let fs = FakeFs::new(cx_a.background());
4034        fs.insert_tree(
4035            "/root",
4036            json!({
4037                "a.rs": "const ONE: usize = b::TWO;",
4038                "b.rs": "const TWO: usize = 2",
4039            }),
4040        )
4041        .await;
4042
4043        // Set up a fake language server.
4044        let mut language = Language::new(
4045            LanguageConfig {
4046                name: "Rust".into(),
4047                path_suffixes: vec!["rs".to_string()],
4048                ..Default::default()
4049            },
4050            Some(tree_sitter_rust::language()),
4051        );
4052        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4053        lang_registry.add(Arc::new(language));
4054
4055        // Connect to a server as 2 clients.
4056        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4057        let client_a = server.create_client(cx_a, "user_a").await;
4058        let client_b = server.create_client(cx_b, "user_b").await;
4059        server
4060            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4061            .await;
4062
4063        // Share a project as client A
4064        let project_a = cx_a.update(|cx| {
4065            Project::local(
4066                client_a.clone(),
4067                client_a.user_store.clone(),
4068                lang_registry.clone(),
4069                fs.clone(),
4070                cx,
4071            )
4072        });
4073
4074        let (worktree_a, _) = project_a
4075            .update(cx_a, |p, cx| {
4076                p.find_or_create_local_worktree("/root", true, cx)
4077            })
4078            .await
4079            .unwrap();
4080        worktree_a
4081            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4082            .await;
4083        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4084        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4085        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4086
4087        // Join the worktree as client B.
4088        let project_b = Project::remote(
4089            project_id,
4090            client_b.clone(),
4091            client_b.user_store.clone(),
4092            lang_registry.clone(),
4093            fs.clone(),
4094            &mut cx_b.to_async(),
4095        )
4096        .await
4097        .unwrap();
4098
4099        let buffer_b1 = cx_b
4100            .background()
4101            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4102            .await
4103            .unwrap();
4104
4105        let fake_language_server = fake_language_servers.next().await.unwrap();
4106        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4107            |_, _| async move {
4108                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4109                    lsp::Location::new(
4110                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
4111                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4112                    ),
4113                )))
4114            },
4115        );
4116
4117        let definitions;
4118        let buffer_b2;
4119        if rng.gen() {
4120            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4121            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4122        } else {
4123            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4124            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4125        }
4126
4127        let buffer_b2 = buffer_b2.await.unwrap();
4128        let definitions = definitions.await.unwrap();
4129        assert_eq!(definitions.len(), 1);
4130        assert_eq!(definitions[0].buffer, buffer_b2);
4131    }
4132
4133    #[gpui::test(iterations = 10)]
4134    async fn test_collaborating_with_code_actions(
4135        cx_a: &mut TestAppContext,
4136        cx_b: &mut TestAppContext,
4137    ) {
4138        cx_a.foreground().forbid_parking();
4139        let lang_registry = Arc::new(LanguageRegistry::test());
4140        let fs = FakeFs::new(cx_a.background());
4141        cx_b.update(|cx| editor::init(cx));
4142
4143        // Set up a fake language server.
4144        let mut language = Language::new(
4145            LanguageConfig {
4146                name: "Rust".into(),
4147                path_suffixes: vec!["rs".to_string()],
4148                ..Default::default()
4149            },
4150            Some(tree_sitter_rust::language()),
4151        );
4152        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4153        lang_registry.add(Arc::new(language));
4154
4155        // Connect to a server as 2 clients.
4156        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4157        let client_a = server.create_client(cx_a, "user_a").await;
4158        let client_b = server.create_client(cx_b, "user_b").await;
4159        server
4160            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4161            .await;
4162
4163        // Share a project as client A
4164        fs.insert_tree(
4165            "/a",
4166            json!({
4167                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4168                "other.rs": "pub fn foo() -> usize { 4 }",
4169            }),
4170        )
4171        .await;
4172        let project_a = cx_a.update(|cx| {
4173            Project::local(
4174                client_a.clone(),
4175                client_a.user_store.clone(),
4176                lang_registry.clone(),
4177                fs.clone(),
4178                cx,
4179            )
4180        });
4181        let (worktree_a, _) = project_a
4182            .update(cx_a, |p, cx| {
4183                p.find_or_create_local_worktree("/a", true, cx)
4184            })
4185            .await
4186            .unwrap();
4187        worktree_a
4188            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4189            .await;
4190        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4191        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4192        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4193
4194        // Join the worktree as client B.
4195        let project_b = Project::remote(
4196            project_id,
4197            client_b.clone(),
4198            client_b.user_store.clone(),
4199            lang_registry.clone(),
4200            fs.clone(),
4201            &mut cx_b.to_async(),
4202        )
4203        .await
4204        .unwrap();
4205        let mut params = cx_b.update(WorkspaceParams::test);
4206        params.languages = lang_registry.clone();
4207        params.client = client_b.client.clone();
4208        params.user_store = client_b.user_store.clone();
4209        params.project = project_b;
4210
4211        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4212        let editor_b = workspace_b
4213            .update(cx_b, |workspace, cx| {
4214                workspace.open_path((worktree_id, "main.rs"), true, cx)
4215            })
4216            .await
4217            .unwrap()
4218            .downcast::<Editor>()
4219            .unwrap();
4220
4221        let mut fake_language_server = fake_language_servers.next().await.unwrap();
4222        fake_language_server
4223            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4224                assert_eq!(
4225                    params.text_document.uri,
4226                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4227                );
4228                assert_eq!(params.range.start, lsp::Position::new(0, 0));
4229                assert_eq!(params.range.end, lsp::Position::new(0, 0));
4230                Ok(None)
4231            })
4232            .next()
4233            .await;
4234
4235        // Move cursor to a location that contains code actions.
4236        editor_b.update(cx_b, |editor, cx| {
4237            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4238            cx.focus(&editor_b);
4239        });
4240
4241        fake_language_server
4242            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4243                assert_eq!(
4244                    params.text_document.uri,
4245                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4246                );
4247                assert_eq!(params.range.start, lsp::Position::new(1, 31));
4248                assert_eq!(params.range.end, lsp::Position::new(1, 31));
4249
4250                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4251                    lsp::CodeAction {
4252                        title: "Inline into all callers".to_string(),
4253                        edit: Some(lsp::WorkspaceEdit {
4254                            changes: Some(
4255                                [
4256                                    (
4257                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
4258                                        vec![lsp::TextEdit::new(
4259                                            lsp::Range::new(
4260                                                lsp::Position::new(1, 22),
4261                                                lsp::Position::new(1, 34),
4262                                            ),
4263                                            "4".to_string(),
4264                                        )],
4265                                    ),
4266                                    (
4267                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
4268                                        vec![lsp::TextEdit::new(
4269                                            lsp::Range::new(
4270                                                lsp::Position::new(0, 0),
4271                                                lsp::Position::new(0, 27),
4272                                            ),
4273                                            "".to_string(),
4274                                        )],
4275                                    ),
4276                                ]
4277                                .into_iter()
4278                                .collect(),
4279                            ),
4280                            ..Default::default()
4281                        }),
4282                        data: Some(json!({
4283                            "codeActionParams": {
4284                                "range": {
4285                                    "start": {"line": 1, "column": 31},
4286                                    "end": {"line": 1, "column": 31},
4287                                }
4288                            }
4289                        })),
4290                        ..Default::default()
4291                    },
4292                )]))
4293            })
4294            .next()
4295            .await;
4296
4297        // Toggle code actions and wait for them to display.
4298        editor_b.update(cx_b, |editor, cx| {
4299            editor.toggle_code_actions(
4300                &ToggleCodeActions {
4301                    deployed_from_indicator: false,
4302                },
4303                cx,
4304            );
4305        });
4306        editor_b
4307            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4308            .await;
4309
4310        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4311
4312        // Confirming the code action will trigger a resolve request.
4313        let confirm_action = workspace_b
4314            .update(cx_b, |workspace, cx| {
4315                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4316            })
4317            .unwrap();
4318        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4319            |_, _| async move {
4320                Ok(lsp::CodeAction {
4321                    title: "Inline into all callers".to_string(),
4322                    edit: Some(lsp::WorkspaceEdit {
4323                        changes: Some(
4324                            [
4325                                (
4326                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4327                                    vec![lsp::TextEdit::new(
4328                                        lsp::Range::new(
4329                                            lsp::Position::new(1, 22),
4330                                            lsp::Position::new(1, 34),
4331                                        ),
4332                                        "4".to_string(),
4333                                    )],
4334                                ),
4335                                (
4336                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4337                                    vec![lsp::TextEdit::new(
4338                                        lsp::Range::new(
4339                                            lsp::Position::new(0, 0),
4340                                            lsp::Position::new(0, 27),
4341                                        ),
4342                                        "".to_string(),
4343                                    )],
4344                                ),
4345                            ]
4346                            .into_iter()
4347                            .collect(),
4348                        ),
4349                        ..Default::default()
4350                    }),
4351                    ..Default::default()
4352                })
4353            },
4354        );
4355
4356        // After the action is confirmed, an editor containing both modified files is opened.
4357        confirm_action.await.unwrap();
4358        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4359            workspace
4360                .active_item(cx)
4361                .unwrap()
4362                .downcast::<Editor>()
4363                .unwrap()
4364        });
4365        code_action_editor.update(cx_b, |editor, cx| {
4366            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4367            editor.undo(&Undo, cx);
4368            assert_eq!(
4369                editor.text(cx),
4370                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4371            );
4372            editor.redo(&Redo, cx);
4373            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4374        });
4375    }
4376
4377    #[gpui::test(iterations = 10)]
4378    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4379        cx_a.foreground().forbid_parking();
4380        let lang_registry = Arc::new(LanguageRegistry::test());
4381        let fs = FakeFs::new(cx_a.background());
4382        cx_b.update(|cx| editor::init(cx));
4383
4384        // Set up a fake language server.
4385        let mut language = Language::new(
4386            LanguageConfig {
4387                name: "Rust".into(),
4388                path_suffixes: vec!["rs".to_string()],
4389                ..Default::default()
4390            },
4391            Some(tree_sitter_rust::language()),
4392        );
4393        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4394            capabilities: lsp::ServerCapabilities {
4395                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4396                    prepare_provider: Some(true),
4397                    work_done_progress_options: Default::default(),
4398                })),
4399                ..Default::default()
4400            },
4401            ..Default::default()
4402        });
4403        lang_registry.add(Arc::new(language));
4404
4405        // Connect to a server as 2 clients.
4406        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4407        let client_a = server.create_client(cx_a, "user_a").await;
4408        let client_b = server.create_client(cx_b, "user_b").await;
4409        server
4410            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4411            .await;
4412
4413        // Share a project as client A
4414        fs.insert_tree(
4415            "/dir",
4416            json!({
4417                "one.rs": "const ONE: usize = 1;",
4418                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4419            }),
4420        )
4421        .await;
4422        let project_a = cx_a.update(|cx| {
4423            Project::local(
4424                client_a.clone(),
4425                client_a.user_store.clone(),
4426                lang_registry.clone(),
4427                fs.clone(),
4428                cx,
4429            )
4430        });
4431        let (worktree_a, _) = project_a
4432            .update(cx_a, |p, cx| {
4433                p.find_or_create_local_worktree("/dir", true, cx)
4434            })
4435            .await
4436            .unwrap();
4437        worktree_a
4438            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4439            .await;
4440        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4441        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4442        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4443
4444        // Join the worktree as client B.
4445        let project_b = Project::remote(
4446            project_id,
4447            client_b.clone(),
4448            client_b.user_store.clone(),
4449            lang_registry.clone(),
4450            fs.clone(),
4451            &mut cx_b.to_async(),
4452        )
4453        .await
4454        .unwrap();
4455        let mut params = cx_b.update(WorkspaceParams::test);
4456        params.languages = lang_registry.clone();
4457        params.client = client_b.client.clone();
4458        params.user_store = client_b.user_store.clone();
4459        params.project = project_b;
4460
4461        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4462        let editor_b = workspace_b
4463            .update(cx_b, |workspace, cx| {
4464                workspace.open_path((worktree_id, "one.rs"), true, cx)
4465            })
4466            .await
4467            .unwrap()
4468            .downcast::<Editor>()
4469            .unwrap();
4470        let fake_language_server = fake_language_servers.next().await.unwrap();
4471
4472        // Move cursor to a location that can be renamed.
4473        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4474            editor.select_ranges([7..7], None, cx);
4475            editor.rename(&Rename, cx).unwrap()
4476        });
4477
4478        fake_language_server
4479            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4480                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4481                assert_eq!(params.position, lsp::Position::new(0, 7));
4482                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4483                    lsp::Position::new(0, 6),
4484                    lsp::Position::new(0, 9),
4485                ))))
4486            })
4487            .next()
4488            .await
4489            .unwrap();
4490        prepare_rename.await.unwrap();
4491        editor_b.update(cx_b, |editor, cx| {
4492            let rename = editor.pending_rename().unwrap();
4493            let buffer = editor.buffer().read(cx).snapshot(cx);
4494            assert_eq!(
4495                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4496                6..9
4497            );
4498            rename.editor.update(cx, |rename_editor, cx| {
4499                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4500                    rename_buffer.edit([(0..3, "THREE")], cx);
4501                });
4502            });
4503        });
4504
4505        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4506            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4507        });
4508        fake_language_server
4509            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4510                assert_eq!(
4511                    params.text_document_position.text_document.uri.as_str(),
4512                    "file:///dir/one.rs"
4513                );
4514                assert_eq!(
4515                    params.text_document_position.position,
4516                    lsp::Position::new(0, 6)
4517                );
4518                assert_eq!(params.new_name, "THREE");
4519                Ok(Some(lsp::WorkspaceEdit {
4520                    changes: Some(
4521                        [
4522                            (
4523                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4524                                vec![lsp::TextEdit::new(
4525                                    lsp::Range::new(
4526                                        lsp::Position::new(0, 6),
4527                                        lsp::Position::new(0, 9),
4528                                    ),
4529                                    "THREE".to_string(),
4530                                )],
4531                            ),
4532                            (
4533                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4534                                vec![
4535                                    lsp::TextEdit::new(
4536                                        lsp::Range::new(
4537                                            lsp::Position::new(0, 24),
4538                                            lsp::Position::new(0, 27),
4539                                        ),
4540                                        "THREE".to_string(),
4541                                    ),
4542                                    lsp::TextEdit::new(
4543                                        lsp::Range::new(
4544                                            lsp::Position::new(0, 35),
4545                                            lsp::Position::new(0, 38),
4546                                        ),
4547                                        "THREE".to_string(),
4548                                    ),
4549                                ],
4550                            ),
4551                        ]
4552                        .into_iter()
4553                        .collect(),
4554                    ),
4555                    ..Default::default()
4556                }))
4557            })
4558            .next()
4559            .await
4560            .unwrap();
4561        confirm_rename.await.unwrap();
4562
4563        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4564            workspace
4565                .active_item(cx)
4566                .unwrap()
4567                .downcast::<Editor>()
4568                .unwrap()
4569        });
4570        rename_editor.update(cx_b, |editor, cx| {
4571            assert_eq!(
4572                editor.text(cx),
4573                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4574            );
4575            editor.undo(&Undo, cx);
4576            assert_eq!(
4577                editor.text(cx),
4578                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4579            );
4580            editor.redo(&Redo, cx);
4581            assert_eq!(
4582                editor.text(cx),
4583                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4584            );
4585        });
4586
4587        // Ensure temporary rename edits cannot be undone/redone.
4588        editor_b.update(cx_b, |editor, cx| {
4589            editor.undo(&Undo, cx);
4590            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4591            editor.undo(&Undo, cx);
4592            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4593            editor.redo(&Redo, cx);
4594            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4595        })
4596    }
4597
4598    #[gpui::test(iterations = 10)]
4599    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4600        cx_a.foreground().forbid_parking();
4601
4602        // Connect to a server as 2 clients.
4603        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4604        let client_a = server.create_client(cx_a, "user_a").await;
4605        let client_b = server.create_client(cx_b, "user_b").await;
4606
4607        // Create an org that includes these 2 users.
4608        let db = &server.app_state.db;
4609        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4610        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4611            .await
4612            .unwrap();
4613        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4614            .await
4615            .unwrap();
4616
4617        // Create a channel that includes all the users.
4618        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4619        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4620            .await
4621            .unwrap();
4622        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4623            .await
4624            .unwrap();
4625        db.create_channel_message(
4626            channel_id,
4627            client_b.current_user_id(&cx_b),
4628            "hello A, it's B.",
4629            OffsetDateTime::now_utc(),
4630            1,
4631        )
4632        .await
4633        .unwrap();
4634
4635        let channels_a = cx_a
4636            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4637        channels_a
4638            .condition(cx_a, |list, _| list.available_channels().is_some())
4639            .await;
4640        channels_a.read_with(cx_a, |list, _| {
4641            assert_eq!(
4642                list.available_channels().unwrap(),
4643                &[ChannelDetails {
4644                    id: channel_id.to_proto(),
4645                    name: "test-channel".to_string()
4646                }]
4647            )
4648        });
4649        let channel_a = channels_a.update(cx_a, |this, cx| {
4650            this.get_channel(channel_id.to_proto(), cx).unwrap()
4651        });
4652        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4653        channel_a
4654            .condition(&cx_a, |channel, _| {
4655                channel_messages(channel)
4656                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4657            })
4658            .await;
4659
4660        let channels_b = cx_b
4661            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4662        channels_b
4663            .condition(cx_b, |list, _| list.available_channels().is_some())
4664            .await;
4665        channels_b.read_with(cx_b, |list, _| {
4666            assert_eq!(
4667                list.available_channels().unwrap(),
4668                &[ChannelDetails {
4669                    id: channel_id.to_proto(),
4670                    name: "test-channel".to_string()
4671                }]
4672            )
4673        });
4674
4675        let channel_b = channels_b.update(cx_b, |this, cx| {
4676            this.get_channel(channel_id.to_proto(), cx).unwrap()
4677        });
4678        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4679        channel_b
4680            .condition(&cx_b, |channel, _| {
4681                channel_messages(channel)
4682                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4683            })
4684            .await;
4685
4686        channel_a
4687            .update(cx_a, |channel, cx| {
4688                channel
4689                    .send_message("oh, hi B.".to_string(), cx)
4690                    .unwrap()
4691                    .detach();
4692                let task = channel.send_message("sup".to_string(), cx).unwrap();
4693                assert_eq!(
4694                    channel_messages(channel),
4695                    &[
4696                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4697                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4698                        ("user_a".to_string(), "sup".to_string(), true)
4699                    ]
4700                );
4701                task
4702            })
4703            .await
4704            .unwrap();
4705
4706        channel_b
4707            .condition(&cx_b, |channel, _| {
4708                channel_messages(channel)
4709                    == [
4710                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4711                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4712                        ("user_a".to_string(), "sup".to_string(), false),
4713                    ]
4714            })
4715            .await;
4716
4717        assert_eq!(
4718            server
4719                .state()
4720                .await
4721                .channel(channel_id)
4722                .unwrap()
4723                .connection_ids
4724                .len(),
4725            2
4726        );
4727        cx_b.update(|_| drop(channel_b));
4728        server
4729            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4730            .await;
4731
4732        cx_a.update(|_| drop(channel_a));
4733        server
4734            .condition(|state| state.channel(channel_id).is_none())
4735            .await;
4736    }
4737
4738    #[gpui::test(iterations = 10)]
4739    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4740        cx_a.foreground().forbid_parking();
4741
4742        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4743        let client_a = server.create_client(cx_a, "user_a").await;
4744
4745        let db = &server.app_state.db;
4746        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4747        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4748        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4749            .await
4750            .unwrap();
4751        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4752            .await
4753            .unwrap();
4754
4755        let channels_a = cx_a
4756            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4757        channels_a
4758            .condition(cx_a, |list, _| list.available_channels().is_some())
4759            .await;
4760        let channel_a = channels_a.update(cx_a, |this, cx| {
4761            this.get_channel(channel_id.to_proto(), cx).unwrap()
4762        });
4763
4764        // Messages aren't allowed to be too long.
4765        channel_a
4766            .update(cx_a, |channel, cx| {
4767                let long_body = "this is long.\n".repeat(1024);
4768                channel.send_message(long_body, cx).unwrap()
4769            })
4770            .await
4771            .unwrap_err();
4772
4773        // Messages aren't allowed to be blank.
4774        channel_a.update(cx_a, |channel, cx| {
4775            channel.send_message(String::new(), cx).unwrap_err()
4776        });
4777
4778        // Leading and trailing whitespace are trimmed.
4779        channel_a
4780            .update(cx_a, |channel, cx| {
4781                channel
4782                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4783                    .unwrap()
4784            })
4785            .await
4786            .unwrap();
4787        assert_eq!(
4788            db.get_channel_messages(channel_id, 10, None)
4789                .await
4790                .unwrap()
4791                .iter()
4792                .map(|m| &m.body)
4793                .collect::<Vec<_>>(),
4794            &["surrounded by whitespace"]
4795        );
4796    }
4797
4798    #[gpui::test(iterations = 10)]
4799    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4800        cx_a.foreground().forbid_parking();
4801
4802        // Connect to a server as 2 clients.
4803        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4804        let client_a = server.create_client(cx_a, "user_a").await;
4805        let client_b = server.create_client(cx_b, "user_b").await;
4806        let mut status_b = client_b.status();
4807
4808        // Create an org that includes these 2 users.
4809        let db = &server.app_state.db;
4810        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4811        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4812            .await
4813            .unwrap();
4814        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4815            .await
4816            .unwrap();
4817
4818        // Create a channel that includes all the users.
4819        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4820        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4821            .await
4822            .unwrap();
4823        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4824            .await
4825            .unwrap();
4826        db.create_channel_message(
4827            channel_id,
4828            client_b.current_user_id(&cx_b),
4829            "hello A, it's B.",
4830            OffsetDateTime::now_utc(),
4831            2,
4832        )
4833        .await
4834        .unwrap();
4835
4836        let channels_a = cx_a
4837            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4838        channels_a
4839            .condition(cx_a, |list, _| list.available_channels().is_some())
4840            .await;
4841
4842        channels_a.read_with(cx_a, |list, _| {
4843            assert_eq!(
4844                list.available_channels().unwrap(),
4845                &[ChannelDetails {
4846                    id: channel_id.to_proto(),
4847                    name: "test-channel".to_string()
4848                }]
4849            )
4850        });
4851        let channel_a = channels_a.update(cx_a, |this, cx| {
4852            this.get_channel(channel_id.to_proto(), cx).unwrap()
4853        });
4854        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4855        channel_a
4856            .condition(&cx_a, |channel, _| {
4857                channel_messages(channel)
4858                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4859            })
4860            .await;
4861
4862        let channels_b = cx_b
4863            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4864        channels_b
4865            .condition(cx_b, |list, _| list.available_channels().is_some())
4866            .await;
4867        channels_b.read_with(cx_b, |list, _| {
4868            assert_eq!(
4869                list.available_channels().unwrap(),
4870                &[ChannelDetails {
4871                    id: channel_id.to_proto(),
4872                    name: "test-channel".to_string()
4873                }]
4874            )
4875        });
4876
4877        let channel_b = channels_b.update(cx_b, |this, cx| {
4878            this.get_channel(channel_id.to_proto(), cx).unwrap()
4879        });
4880        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4881        channel_b
4882            .condition(&cx_b, |channel, _| {
4883                channel_messages(channel)
4884                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4885            })
4886            .await;
4887
4888        // Disconnect client B, ensuring we can still access its cached channel data.
4889        server.forbid_connections();
4890        server.disconnect_client(client_b.current_user_id(&cx_b));
4891        cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4892        while !matches!(
4893            status_b.next().await,
4894            Some(client::Status::ReconnectionError { .. })
4895        ) {}
4896
4897        channels_b.read_with(cx_b, |channels, _| {
4898            assert_eq!(
4899                channels.available_channels().unwrap(),
4900                [ChannelDetails {
4901                    id: channel_id.to_proto(),
4902                    name: "test-channel".to_string()
4903                }]
4904            )
4905        });
4906        channel_b.read_with(cx_b, |channel, _| {
4907            assert_eq!(
4908                channel_messages(channel),
4909                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4910            )
4911        });
4912
4913        // Send a message from client B while it is disconnected.
4914        channel_b
4915            .update(cx_b, |channel, cx| {
4916                let task = channel
4917                    .send_message("can you see this?".to_string(), cx)
4918                    .unwrap();
4919                assert_eq!(
4920                    channel_messages(channel),
4921                    &[
4922                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4923                        ("user_b".to_string(), "can you see this?".to_string(), true)
4924                    ]
4925                );
4926                task
4927            })
4928            .await
4929            .unwrap_err();
4930
4931        // Send a message from client A while B is disconnected.
4932        channel_a
4933            .update(cx_a, |channel, cx| {
4934                channel
4935                    .send_message("oh, hi B.".to_string(), cx)
4936                    .unwrap()
4937                    .detach();
4938                let task = channel.send_message("sup".to_string(), cx).unwrap();
4939                assert_eq!(
4940                    channel_messages(channel),
4941                    &[
4942                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4943                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4944                        ("user_a".to_string(), "sup".to_string(), true)
4945                    ]
4946                );
4947                task
4948            })
4949            .await
4950            .unwrap();
4951
4952        // Give client B a chance to reconnect.
4953        server.allow_connections();
4954        cx_b.foreground().advance_clock(Duration::from_secs(10));
4955
4956        // Verify that B sees the new messages upon reconnection, as well as the message client B
4957        // sent while offline.
4958        channel_b
4959            .condition(&cx_b, |channel, _| {
4960                channel_messages(channel)
4961                    == [
4962                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4963                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4964                        ("user_a".to_string(), "sup".to_string(), false),
4965                        ("user_b".to_string(), "can you see this?".to_string(), false),
4966                    ]
4967            })
4968            .await;
4969
4970        // Ensure client A and B can communicate normally after reconnection.
4971        channel_a
4972            .update(cx_a, |channel, cx| {
4973                channel.send_message("you online?".to_string(), cx).unwrap()
4974            })
4975            .await
4976            .unwrap();
4977        channel_b
4978            .condition(&cx_b, |channel, _| {
4979                channel_messages(channel)
4980                    == [
4981                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4982                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4983                        ("user_a".to_string(), "sup".to_string(), false),
4984                        ("user_b".to_string(), "can you see this?".to_string(), false),
4985                        ("user_a".to_string(), "you online?".to_string(), false),
4986                    ]
4987            })
4988            .await;
4989
4990        channel_b
4991            .update(cx_b, |channel, cx| {
4992                channel.send_message("yep".to_string(), cx).unwrap()
4993            })
4994            .await
4995            .unwrap();
4996        channel_a
4997            .condition(&cx_a, |channel, _| {
4998                channel_messages(channel)
4999                    == [
5000                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
5001                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
5002                        ("user_a".to_string(), "sup".to_string(), false),
5003                        ("user_b".to_string(), "can you see this?".to_string(), false),
5004                        ("user_a".to_string(), "you online?".to_string(), false),
5005                        ("user_b".to_string(), "yep".to_string(), false),
5006                    ]
5007            })
5008            .await;
5009    }
5010
5011    #[gpui::test(iterations = 10)]
5012    async fn test_contacts(
5013        deterministic: Arc<Deterministic>,
5014        cx_a: &mut TestAppContext,
5015        cx_b: &mut TestAppContext,
5016        cx_c: &mut TestAppContext,
5017    ) {
5018        cx_a.foreground().forbid_parking();
5019
5020        // Connect to a server as 3 clients.
5021        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5022        let mut client_a = server.create_client(cx_a, "user_a").await;
5023        let mut client_b = server.create_client(cx_b, "user_b").await;
5024        let client_c = server.create_client(cx_c, "user_c").await;
5025        server
5026            .make_contacts(vec![
5027                (&client_a, cx_a),
5028                (&client_b, cx_b),
5029                (&client_c, cx_c),
5030            ])
5031            .await;
5032
5033        deterministic.run_until_parked();
5034        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5035            client.user_store.read_with(*cx, |store, _| {
5036                assert_eq!(
5037                    contacts(store),
5038                    [
5039                        ("user_a", true, vec![]),
5040                        ("user_b", true, vec![]),
5041                        ("user_c", true, vec![])
5042                    ]
5043                )
5044            });
5045        }
5046
5047        // Share a project as client A.
5048        let fs = FakeFs::new(cx_a.background());
5049        fs.create_dir(Path::new("/a")).await.unwrap();
5050        let (project_a, _) = client_a.build_local_project(fs, "/a", cx_a).await;
5051
5052        deterministic.run_until_parked();
5053        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5054            client.user_store.read_with(*cx, |store, _| {
5055                assert_eq!(
5056                    contacts(store),
5057                    [
5058                        ("user_a", true, vec![("a", false, vec![])]),
5059                        ("user_b", true, vec![]),
5060                        ("user_c", true, vec![])
5061                    ]
5062                )
5063            });
5064        }
5065
5066        let project_id = project_a
5067            .update(cx_a, |project, _| project.next_remote_id())
5068            .await;
5069        project_a
5070            .update(cx_a, |project, cx| project.share(cx))
5071            .await
5072            .unwrap();
5073
5074        deterministic.run_until_parked();
5075        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5076            client.user_store.read_with(*cx, |store, _| {
5077                assert_eq!(
5078                    contacts(store),
5079                    [
5080                        ("user_a", true, vec![("a", true, vec![])]),
5081                        ("user_b", true, vec![]),
5082                        ("user_c", true, vec![])
5083                    ]
5084                )
5085            });
5086        }
5087
5088        let _project_b = client_b.build_remote_project(project_id, cx_b).await;
5089
5090        deterministic.run_until_parked();
5091        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5092            client.user_store.read_with(*cx, |store, _| {
5093                assert_eq!(
5094                    contacts(store),
5095                    [
5096                        ("user_a", true, vec![("a", true, vec!["user_b"])]),
5097                        ("user_b", true, vec![]),
5098                        ("user_c", true, vec![])
5099                    ]
5100                )
5101            });
5102        }
5103
5104        // Add a local project as client B
5105        let fs = FakeFs::new(cx_b.background());
5106        fs.create_dir(Path::new("/b")).await.unwrap();
5107        let (_project_b, _) = client_b.build_local_project(fs, "/b", cx_a).await;
5108
5109        deterministic.run_until_parked();
5110        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5111            client.user_store.read_with(*cx, |store, _| {
5112                assert_eq!(
5113                    contacts(store),
5114                    [
5115                        ("user_a", true, vec![("a", true, vec!["user_b"])]),
5116                        ("user_b", true, vec![("b", false, vec![])]),
5117                        ("user_c", true, vec![])
5118                    ]
5119                )
5120            });
5121        }
5122
5123        project_a
5124            .condition(&cx_a, |project, _| {
5125                project.collaborators().contains_key(&client_b.peer_id)
5126            })
5127            .await;
5128
5129        client_a.project.take();
5130        cx_a.update(move |_| drop(project_a));
5131        deterministic.run_until_parked();
5132        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5133            client.user_store.read_with(*cx, |store, _| {
5134                assert_eq!(
5135                    contacts(store),
5136                    [
5137                        ("user_a", true, vec![]),
5138                        ("user_b", true, vec![("b", false, vec![])]),
5139                        ("user_c", true, vec![])
5140                    ]
5141                )
5142            });
5143        }
5144
5145        server.disconnect_client(client_c.current_user_id(cx_c));
5146        server.forbid_connections();
5147        deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5148        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5149            client.user_store.read_with(*cx, |store, _| {
5150                assert_eq!(
5151                    contacts(store),
5152                    [
5153                        ("user_a", true, vec![]),
5154                        ("user_b", true, vec![("b", false, vec![])]),
5155                        ("user_c", false, vec![])
5156                    ]
5157                )
5158            });
5159        }
5160        client_c
5161            .user_store
5162            .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5163
5164        server.allow_connections();
5165        client_c
5166            .authenticate_and_connect(false, &cx_c.to_async())
5167            .await
5168            .unwrap();
5169
5170        deterministic.run_until_parked();
5171        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5172            client.user_store.read_with(*cx, |store, _| {
5173                assert_eq!(
5174                    contacts(store),
5175                    [
5176                        ("user_a", true, vec![]),
5177                        ("user_b", true, vec![("b", false, vec![])]),
5178                        ("user_c", true, vec![])
5179                    ]
5180                )
5181            });
5182        }
5183
5184        fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, bool, Vec<&str>)>)> {
5185            user_store
5186                .contacts()
5187                .iter()
5188                .map(|contact| {
5189                    let projects = contact
5190                        .projects
5191                        .iter()
5192                        .map(|p| {
5193                            (
5194                                p.worktree_root_names[0].as_str(),
5195                                p.is_shared,
5196                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5197                            )
5198                        })
5199                        .collect();
5200                    (contact.user.github_login.as_str(), contact.online, projects)
5201                })
5202                .collect()
5203        }
5204    }
5205
5206    #[gpui::test(iterations = 10)]
5207    async fn test_contact_requests(
5208        executor: Arc<Deterministic>,
5209        cx_a: &mut TestAppContext,
5210        cx_a2: &mut TestAppContext,
5211        cx_b: &mut TestAppContext,
5212        cx_b2: &mut TestAppContext,
5213        cx_c: &mut TestAppContext,
5214        cx_c2: &mut TestAppContext,
5215    ) {
5216        cx_a.foreground().forbid_parking();
5217
5218        // Connect to a server as 3 clients.
5219        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5220        let client_a = server.create_client(cx_a, "user_a").await;
5221        let client_a2 = server.create_client(cx_a2, "user_a").await;
5222        let client_b = server.create_client(cx_b, "user_b").await;
5223        let client_b2 = server.create_client(cx_b2, "user_b").await;
5224        let client_c = server.create_client(cx_c, "user_c").await;
5225        let client_c2 = server.create_client(cx_c2, "user_c").await;
5226
5227        assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5228        assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5229        assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5230
5231        // User A and User C request that user B become their contact.
5232        client_a
5233            .user_store
5234            .update(cx_a, |store, cx| {
5235                store.request_contact(client_b.user_id().unwrap(), cx)
5236            })
5237            .await
5238            .unwrap();
5239        client_c
5240            .user_store
5241            .update(cx_c, |store, cx| {
5242                store.request_contact(client_b.user_id().unwrap(), cx)
5243            })
5244            .await
5245            .unwrap();
5246        executor.run_until_parked();
5247
5248        // All users see the pending request appear in all their clients.
5249        assert_eq!(
5250            client_a.summarize_contacts(&cx_a).outgoing_requests,
5251            &["user_b"]
5252        );
5253        assert_eq!(
5254            client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5255            &["user_b"]
5256        );
5257        assert_eq!(
5258            client_b.summarize_contacts(&cx_b).incoming_requests,
5259            &["user_a", "user_c"]
5260        );
5261        assert_eq!(
5262            client_b2.summarize_contacts(&cx_b2).incoming_requests,
5263            &["user_a", "user_c"]
5264        );
5265        assert_eq!(
5266            client_c.summarize_contacts(&cx_c).outgoing_requests,
5267            &["user_b"]
5268        );
5269        assert_eq!(
5270            client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5271            &["user_b"]
5272        );
5273
5274        // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5275        disconnect_and_reconnect(&client_a, cx_a).await;
5276        disconnect_and_reconnect(&client_b, cx_b).await;
5277        disconnect_and_reconnect(&client_c, cx_c).await;
5278        executor.run_until_parked();
5279        assert_eq!(
5280            client_a.summarize_contacts(&cx_a).outgoing_requests,
5281            &["user_b"]
5282        );
5283        assert_eq!(
5284            client_b.summarize_contacts(&cx_b).incoming_requests,
5285            &["user_a", "user_c"]
5286        );
5287        assert_eq!(
5288            client_c.summarize_contacts(&cx_c).outgoing_requests,
5289            &["user_b"]
5290        );
5291
5292        // User B accepts the request from user A.
5293        client_b
5294            .user_store
5295            .update(cx_b, |store, cx| {
5296                store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5297            })
5298            .await
5299            .unwrap();
5300
5301        executor.run_until_parked();
5302
5303        // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5304        let contacts_b = client_b.summarize_contacts(&cx_b);
5305        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5306        assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5307        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5308        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5309        assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5310
5311        // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5312        let contacts_a = client_a.summarize_contacts(&cx_a);
5313        assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5314        assert!(contacts_a.outgoing_requests.is_empty());
5315        let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5316        assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5317        assert!(contacts_a2.outgoing_requests.is_empty());
5318
5319        // Contacts are present upon connecting (tested here via disconnect/reconnect)
5320        disconnect_and_reconnect(&client_a, cx_a).await;
5321        disconnect_and_reconnect(&client_b, cx_b).await;
5322        disconnect_and_reconnect(&client_c, cx_c).await;
5323        executor.run_until_parked();
5324        assert_eq!(
5325            client_a.summarize_contacts(&cx_a).current,
5326            &["user_a", "user_b"]
5327        );
5328        assert_eq!(
5329            client_b.summarize_contacts(&cx_b).current,
5330            &["user_a", "user_b"]
5331        );
5332        assert_eq!(
5333            client_b.summarize_contacts(&cx_b).incoming_requests,
5334            &["user_c"]
5335        );
5336        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5337        assert_eq!(
5338            client_c.summarize_contacts(&cx_c).outgoing_requests,
5339            &["user_b"]
5340        );
5341
5342        // User B rejects the request from user C.
5343        client_b
5344            .user_store
5345            .update(cx_b, |store, cx| {
5346                store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5347            })
5348            .await
5349            .unwrap();
5350
5351        executor.run_until_parked();
5352
5353        // User B doesn't see user C as their contact, and the incoming request from them is removed.
5354        let contacts_b = client_b.summarize_contacts(&cx_b);
5355        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5356        assert!(contacts_b.incoming_requests.is_empty());
5357        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5358        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5359        assert!(contacts_b2.incoming_requests.is_empty());
5360
5361        // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5362        let contacts_c = client_c.summarize_contacts(&cx_c);
5363        assert_eq!(contacts_c.current, &["user_c"]);
5364        assert!(contacts_c.outgoing_requests.is_empty());
5365        let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5366        assert_eq!(contacts_c2.current, &["user_c"]);
5367        assert!(contacts_c2.outgoing_requests.is_empty());
5368
5369        // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5370        disconnect_and_reconnect(&client_a, cx_a).await;
5371        disconnect_and_reconnect(&client_b, cx_b).await;
5372        disconnect_and_reconnect(&client_c, cx_c).await;
5373        executor.run_until_parked();
5374        assert_eq!(
5375            client_a.summarize_contacts(&cx_a).current,
5376            &["user_a", "user_b"]
5377        );
5378        assert_eq!(
5379            client_b.summarize_contacts(&cx_b).current,
5380            &["user_a", "user_b"]
5381        );
5382        assert!(client_b
5383            .summarize_contacts(&cx_b)
5384            .incoming_requests
5385            .is_empty());
5386        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5387        assert!(client_c
5388            .summarize_contacts(&cx_c)
5389            .outgoing_requests
5390            .is_empty());
5391
5392        async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5393            client.disconnect(&cx.to_async()).unwrap();
5394            client.clear_contacts(cx).await;
5395            client
5396                .authenticate_and_connect(false, &cx.to_async())
5397                .await
5398                .unwrap();
5399        }
5400    }
5401
5402    #[gpui::test(iterations = 10)]
5403    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5404        cx_a.foreground().forbid_parking();
5405        let fs = FakeFs::new(cx_a.background());
5406
5407        // 2 clients connect to a server.
5408        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5409        let mut client_a = server.create_client(cx_a, "user_a").await;
5410        let mut client_b = server.create_client(cx_b, "user_b").await;
5411        server
5412            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5413            .await;
5414        cx_a.update(editor::init);
5415        cx_b.update(editor::init);
5416
5417        // Client A shares a project.
5418        fs.insert_tree(
5419            "/a",
5420            json!({
5421                "1.txt": "one",
5422                "2.txt": "two",
5423                "3.txt": "three",
5424            }),
5425        )
5426        .await;
5427        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5428        project_a
5429            .update(cx_a, |project, cx| project.share(cx))
5430            .await
5431            .unwrap();
5432
5433        // Client B joins the project.
5434        let project_b = client_b
5435            .build_remote_project(
5436                project_a
5437                    .read_with(cx_a, |project, _| project.remote_id())
5438                    .unwrap(),
5439                cx_b,
5440            )
5441            .await;
5442
5443        // Client A opens some editors.
5444        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5445        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5446        let editor_a1 = workspace_a
5447            .update(cx_a, |workspace, cx| {
5448                workspace.open_path((worktree_id, "1.txt"), true, cx)
5449            })
5450            .await
5451            .unwrap()
5452            .downcast::<Editor>()
5453            .unwrap();
5454        let editor_a2 = workspace_a
5455            .update(cx_a, |workspace, cx| {
5456                workspace.open_path((worktree_id, "2.txt"), true, cx)
5457            })
5458            .await
5459            .unwrap()
5460            .downcast::<Editor>()
5461            .unwrap();
5462
5463        // Client B opens an editor.
5464        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5465        let editor_b1 = workspace_b
5466            .update(cx_b, |workspace, cx| {
5467                workspace.open_path((worktree_id, "1.txt"), true, cx)
5468            })
5469            .await
5470            .unwrap()
5471            .downcast::<Editor>()
5472            .unwrap();
5473
5474        let client_a_id = project_b.read_with(cx_b, |project, _| {
5475            project.collaborators().values().next().unwrap().peer_id
5476        });
5477        let client_b_id = project_a.read_with(cx_a, |project, _| {
5478            project.collaborators().values().next().unwrap().peer_id
5479        });
5480
5481        // When client B starts following client A, all visible view states are replicated to client B.
5482        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5483        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5484        workspace_b
5485            .update(cx_b, |workspace, cx| {
5486                workspace
5487                    .toggle_follow(&ToggleFollow(client_a_id), cx)
5488                    .unwrap()
5489            })
5490            .await
5491            .unwrap();
5492        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5493            workspace
5494                .active_item(cx)
5495                .unwrap()
5496                .downcast::<Editor>()
5497                .unwrap()
5498        });
5499        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5500        assert_eq!(
5501            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5502            Some((worktree_id, "2.txt").into())
5503        );
5504        assert_eq!(
5505            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5506            vec![2..3]
5507        );
5508        assert_eq!(
5509            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5510            vec![0..1]
5511        );
5512
5513        // When client A activates a different editor, client B does so as well.
5514        workspace_a.update(cx_a, |workspace, cx| {
5515            workspace.activate_item(&editor_a1, cx)
5516        });
5517        workspace_b
5518            .condition(cx_b, |workspace, cx| {
5519                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5520            })
5521            .await;
5522
5523        // When client A navigates back and forth, client B does so as well.
5524        workspace_a
5525            .update(cx_a, |workspace, cx| {
5526                workspace::Pane::go_back(workspace, None, cx)
5527            })
5528            .await;
5529        workspace_b
5530            .condition(cx_b, |workspace, cx| {
5531                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5532            })
5533            .await;
5534
5535        workspace_a
5536            .update(cx_a, |workspace, cx| {
5537                workspace::Pane::go_forward(workspace, None, cx)
5538            })
5539            .await;
5540        workspace_b
5541            .condition(cx_b, |workspace, cx| {
5542                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5543            })
5544            .await;
5545
5546        // Changes to client A's editor are reflected on client B.
5547        editor_a1.update(cx_a, |editor, cx| {
5548            editor.select_ranges([1..1, 2..2], None, cx);
5549        });
5550        editor_b1
5551            .condition(cx_b, |editor, cx| {
5552                editor.selected_ranges(cx) == vec![1..1, 2..2]
5553            })
5554            .await;
5555
5556        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5557        editor_b1
5558            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5559            .await;
5560
5561        editor_a1.update(cx_a, |editor, cx| {
5562            editor.select_ranges([3..3], None, cx);
5563            editor.set_scroll_position(vec2f(0., 100.), cx);
5564        });
5565        editor_b1
5566            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5567            .await;
5568
5569        // After unfollowing, client B stops receiving updates from client A.
5570        workspace_b.update(cx_b, |workspace, cx| {
5571            workspace.unfollow(&workspace.active_pane().clone(), cx)
5572        });
5573        workspace_a.update(cx_a, |workspace, cx| {
5574            workspace.activate_item(&editor_a2, cx)
5575        });
5576        cx_a.foreground().run_until_parked();
5577        assert_eq!(
5578            workspace_b.read_with(cx_b, |workspace, cx| workspace
5579                .active_item(cx)
5580                .unwrap()
5581                .id()),
5582            editor_b1.id()
5583        );
5584
5585        // Client A starts following client B.
5586        workspace_a
5587            .update(cx_a, |workspace, cx| {
5588                workspace
5589                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5590                    .unwrap()
5591            })
5592            .await
5593            .unwrap();
5594        assert_eq!(
5595            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5596            Some(client_b_id)
5597        );
5598        assert_eq!(
5599            workspace_a.read_with(cx_a, |workspace, cx| workspace
5600                .active_item(cx)
5601                .unwrap()
5602                .id()),
5603            editor_a1.id()
5604        );
5605
5606        // Following interrupts when client B disconnects.
5607        client_b.disconnect(&cx_b.to_async()).unwrap();
5608        cx_a.foreground().run_until_parked();
5609        assert_eq!(
5610            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5611            None
5612        );
5613    }
5614
5615    #[gpui::test(iterations = 10)]
5616    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5617        cx_a.foreground().forbid_parking();
5618        let fs = FakeFs::new(cx_a.background());
5619
5620        // 2 clients connect to a server.
5621        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5622        let mut client_a = server.create_client(cx_a, "user_a").await;
5623        let mut client_b = server.create_client(cx_b, "user_b").await;
5624        server
5625            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5626            .await;
5627        cx_a.update(editor::init);
5628        cx_b.update(editor::init);
5629
5630        // Client A shares a project.
5631        fs.insert_tree(
5632            "/a",
5633            json!({
5634                "1.txt": "one",
5635                "2.txt": "two",
5636                "3.txt": "three",
5637                "4.txt": "four",
5638            }),
5639        )
5640        .await;
5641        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5642        project_a
5643            .update(cx_a, |project, cx| project.share(cx))
5644            .await
5645            .unwrap();
5646
5647        // Client B joins the project.
5648        let project_b = client_b
5649            .build_remote_project(
5650                project_a
5651                    .read_with(cx_a, |project, _| project.remote_id())
5652                    .unwrap(),
5653                cx_b,
5654            )
5655            .await;
5656
5657        // Client A opens some editors.
5658        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5659        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5660        let _editor_a1 = workspace_a
5661            .update(cx_a, |workspace, cx| {
5662                workspace.open_path((worktree_id, "1.txt"), true, cx)
5663            })
5664            .await
5665            .unwrap()
5666            .downcast::<Editor>()
5667            .unwrap();
5668
5669        // Client B opens an editor.
5670        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5671        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5672        let _editor_b1 = workspace_b
5673            .update(cx_b, |workspace, cx| {
5674                workspace.open_path((worktree_id, "2.txt"), true, cx)
5675            })
5676            .await
5677            .unwrap()
5678            .downcast::<Editor>()
5679            .unwrap();
5680
5681        // Clients A and B follow each other in split panes
5682        workspace_a
5683            .update(cx_a, |workspace, cx| {
5684                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5685                assert_ne!(*workspace.active_pane(), pane_a1);
5686                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5687                workspace
5688                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5689                    .unwrap()
5690            })
5691            .await
5692            .unwrap();
5693        workspace_b
5694            .update(cx_b, |workspace, cx| {
5695                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5696                assert_ne!(*workspace.active_pane(), pane_b1);
5697                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5698                workspace
5699                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5700                    .unwrap()
5701            })
5702            .await
5703            .unwrap();
5704
5705        workspace_a
5706            .update(cx_a, |workspace, cx| {
5707                workspace.activate_next_pane(cx);
5708                assert_eq!(*workspace.active_pane(), pane_a1);
5709                workspace.open_path((worktree_id, "3.txt"), true, cx)
5710            })
5711            .await
5712            .unwrap();
5713        workspace_b
5714            .update(cx_b, |workspace, cx| {
5715                workspace.activate_next_pane(cx);
5716                assert_eq!(*workspace.active_pane(), pane_b1);
5717                workspace.open_path((worktree_id, "4.txt"), true, cx)
5718            })
5719            .await
5720            .unwrap();
5721        cx_a.foreground().run_until_parked();
5722
5723        // Ensure leader updates don't change the active pane of followers
5724        workspace_a.read_with(cx_a, |workspace, _| {
5725            assert_eq!(*workspace.active_pane(), pane_a1);
5726        });
5727        workspace_b.read_with(cx_b, |workspace, _| {
5728            assert_eq!(*workspace.active_pane(), pane_b1);
5729        });
5730
5731        // Ensure peers following each other doesn't cause an infinite loop.
5732        assert_eq!(
5733            workspace_a.read_with(cx_a, |workspace, cx| workspace
5734                .active_item(cx)
5735                .unwrap()
5736                .project_path(cx)),
5737            Some((worktree_id, "3.txt").into())
5738        );
5739        workspace_a.update(cx_a, |workspace, cx| {
5740            assert_eq!(
5741                workspace.active_item(cx).unwrap().project_path(cx),
5742                Some((worktree_id, "3.txt").into())
5743            );
5744            workspace.activate_next_pane(cx);
5745            assert_eq!(
5746                workspace.active_item(cx).unwrap().project_path(cx),
5747                Some((worktree_id, "4.txt").into())
5748            );
5749        });
5750        workspace_b.update(cx_b, |workspace, cx| {
5751            assert_eq!(
5752                workspace.active_item(cx).unwrap().project_path(cx),
5753                Some((worktree_id, "4.txt").into())
5754            );
5755            workspace.activate_next_pane(cx);
5756            assert_eq!(
5757                workspace.active_item(cx).unwrap().project_path(cx),
5758                Some((worktree_id, "3.txt").into())
5759            );
5760        });
5761    }
5762
5763    #[gpui::test(iterations = 10)]
5764    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5765        cx_a.foreground().forbid_parking();
5766        let fs = FakeFs::new(cx_a.background());
5767
5768        // 2 clients connect to a server.
5769        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5770        let mut client_a = server.create_client(cx_a, "user_a").await;
5771        let mut client_b = server.create_client(cx_b, "user_b").await;
5772        server
5773            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5774            .await;
5775        cx_a.update(editor::init);
5776        cx_b.update(editor::init);
5777
5778        // Client A shares a project.
5779        fs.insert_tree(
5780            "/a",
5781            json!({
5782                "1.txt": "one",
5783                "2.txt": "two",
5784                "3.txt": "three",
5785            }),
5786        )
5787        .await;
5788        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5789        project_a
5790            .update(cx_a, |project, cx| project.share(cx))
5791            .await
5792            .unwrap();
5793
5794        // Client B joins the project.
5795        let project_b = client_b
5796            .build_remote_project(
5797                project_a
5798                    .read_with(cx_a, |project, _| project.remote_id())
5799                    .unwrap(),
5800                cx_b,
5801            )
5802            .await;
5803
5804        // Client A opens some editors.
5805        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5806        let _editor_a1 = workspace_a
5807            .update(cx_a, |workspace, cx| {
5808                workspace.open_path((worktree_id, "1.txt"), true, cx)
5809            })
5810            .await
5811            .unwrap()
5812            .downcast::<Editor>()
5813            .unwrap();
5814
5815        // Client B starts following client A.
5816        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5817        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5818        let leader_id = project_b.read_with(cx_b, |project, _| {
5819            project.collaborators().values().next().unwrap().peer_id
5820        });
5821        workspace_b
5822            .update(cx_b, |workspace, cx| {
5823                workspace
5824                    .toggle_follow(&ToggleFollow(leader_id), cx)
5825                    .unwrap()
5826            })
5827            .await
5828            .unwrap();
5829        assert_eq!(
5830            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5831            Some(leader_id)
5832        );
5833        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5834            workspace
5835                .active_item(cx)
5836                .unwrap()
5837                .downcast::<Editor>()
5838                .unwrap()
5839        });
5840
5841        // When client B moves, it automatically stops following client A.
5842        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5843        assert_eq!(
5844            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5845            None
5846        );
5847
5848        workspace_b
5849            .update(cx_b, |workspace, cx| {
5850                workspace
5851                    .toggle_follow(&ToggleFollow(leader_id), cx)
5852                    .unwrap()
5853            })
5854            .await
5855            .unwrap();
5856        assert_eq!(
5857            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5858            Some(leader_id)
5859        );
5860
5861        // When client B edits, it automatically stops following client A.
5862        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5863        assert_eq!(
5864            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5865            None
5866        );
5867
5868        workspace_b
5869            .update(cx_b, |workspace, cx| {
5870                workspace
5871                    .toggle_follow(&ToggleFollow(leader_id), cx)
5872                    .unwrap()
5873            })
5874            .await
5875            .unwrap();
5876        assert_eq!(
5877            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5878            Some(leader_id)
5879        );
5880
5881        // When client B scrolls, it automatically stops following client A.
5882        editor_b2.update(cx_b, |editor, cx| {
5883            editor.set_scroll_position(vec2f(0., 3.), cx)
5884        });
5885        assert_eq!(
5886            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5887            None
5888        );
5889
5890        workspace_b
5891            .update(cx_b, |workspace, cx| {
5892                workspace
5893                    .toggle_follow(&ToggleFollow(leader_id), cx)
5894                    .unwrap()
5895            })
5896            .await
5897            .unwrap();
5898        assert_eq!(
5899            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5900            Some(leader_id)
5901        );
5902
5903        // When client B activates a different pane, it continues following client A in the original pane.
5904        workspace_b.update(cx_b, |workspace, cx| {
5905            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5906        });
5907        assert_eq!(
5908            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5909            Some(leader_id)
5910        );
5911
5912        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5913        assert_eq!(
5914            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5915            Some(leader_id)
5916        );
5917
5918        // When client B activates a different item in the original pane, it automatically stops following client A.
5919        workspace_b
5920            .update(cx_b, |workspace, cx| {
5921                workspace.open_path((worktree_id, "2.txt"), true, cx)
5922            })
5923            .await
5924            .unwrap();
5925        assert_eq!(
5926            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5927            None
5928        );
5929    }
5930
5931    #[gpui::test(iterations = 100)]
5932    async fn test_random_collaboration(
5933        cx: &mut TestAppContext,
5934        deterministic: Arc<Deterministic>,
5935        rng: StdRng,
5936    ) {
5937        cx.foreground().forbid_parking();
5938        let max_peers = env::var("MAX_PEERS")
5939            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5940            .unwrap_or(5);
5941        assert!(max_peers <= 5);
5942
5943        let max_operations = env::var("OPERATIONS")
5944            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5945            .unwrap_or(10);
5946
5947        let rng = Arc::new(Mutex::new(rng));
5948
5949        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5950        let host_language_registry = Arc::new(LanguageRegistry::test());
5951
5952        let fs = FakeFs::new(cx.background());
5953        fs.insert_tree("/_collab", json!({"init": ""})).await;
5954
5955        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5956        let db = server.app_state.db.clone();
5957        let host_user_id = db.create_user("host", false).await.unwrap();
5958        for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
5959            let guest_user_id = db.create_user(username, false).await.unwrap();
5960            server
5961                .app_state
5962                .db
5963                .send_contact_request(guest_user_id, host_user_id)
5964                .await
5965                .unwrap();
5966            server
5967                .app_state
5968                .db
5969                .respond_to_contact_request(host_user_id, guest_user_id, true)
5970                .await
5971                .unwrap();
5972        }
5973
5974        let mut clients = Vec::new();
5975        let mut user_ids = Vec::new();
5976        let mut op_start_signals = Vec::new();
5977
5978        let mut next_entity_id = 100000;
5979        let mut host_cx = TestAppContext::new(
5980            cx.foreground_platform(),
5981            cx.platform(),
5982            deterministic.build_foreground(next_entity_id),
5983            deterministic.build_background(),
5984            cx.font_cache(),
5985            cx.leak_detector(),
5986            next_entity_id,
5987        );
5988        let host = server.create_client(&mut host_cx, "host").await;
5989        let host_project = host_cx.update(|cx| {
5990            Project::local(
5991                host.client.clone(),
5992                host.user_store.clone(),
5993                host_language_registry.clone(),
5994                fs.clone(),
5995                cx,
5996            )
5997        });
5998        let host_project_id = host_project
5999            .update(&mut host_cx, |p, _| p.next_remote_id())
6000            .await;
6001
6002        let (collab_worktree, _) = host_project
6003            .update(&mut host_cx, |project, cx| {
6004                project.find_or_create_local_worktree("/_collab", true, cx)
6005            })
6006            .await
6007            .unwrap();
6008        collab_worktree
6009            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
6010            .await;
6011        host_project
6012            .update(&mut host_cx, |project, cx| project.share(cx))
6013            .await
6014            .unwrap();
6015
6016        // Set up fake language servers.
6017        let mut language = Language::new(
6018            LanguageConfig {
6019                name: "Rust".into(),
6020                path_suffixes: vec!["rs".to_string()],
6021                ..Default::default()
6022            },
6023            None,
6024        );
6025        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6026            name: "the-fake-language-server",
6027            capabilities: lsp::LanguageServer::full_capabilities(),
6028            initializer: Some(Box::new({
6029                let rng = rng.clone();
6030                let fs = fs.clone();
6031                let project = host_project.downgrade();
6032                move |fake_server: &mut FakeLanguageServer| {
6033                    fake_server.handle_request::<lsp::request::Completion, _, _>(
6034                        |_, _| async move {
6035                            Ok(Some(lsp::CompletionResponse::Array(vec![
6036                                lsp::CompletionItem {
6037                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6038                                        range: lsp::Range::new(
6039                                            lsp::Position::new(0, 0),
6040                                            lsp::Position::new(0, 0),
6041                                        ),
6042                                        new_text: "the-new-text".to_string(),
6043                                    })),
6044                                    ..Default::default()
6045                                },
6046                            ])))
6047                        },
6048                    );
6049
6050                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6051                        |_, _| async move {
6052                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6053                                lsp::CodeAction {
6054                                    title: "the-code-action".to_string(),
6055                                    ..Default::default()
6056                                },
6057                            )]))
6058                        },
6059                    );
6060
6061                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6062                        |params, _| async move {
6063                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6064                                params.position,
6065                                params.position,
6066                            ))))
6067                        },
6068                    );
6069
6070                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6071                        let fs = fs.clone();
6072                        let rng = rng.clone();
6073                        move |_, _| {
6074                            let fs = fs.clone();
6075                            let rng = rng.clone();
6076                            async move {
6077                                let files = fs.files().await;
6078                                let mut rng = rng.lock();
6079                                let count = rng.gen_range::<usize, _>(1..3);
6080                                let files = (0..count)
6081                                    .map(|_| files.choose(&mut *rng).unwrap())
6082                                    .collect::<Vec<_>>();
6083                                log::info!("LSP: Returning definitions in files {:?}", &files);
6084                                Ok(Some(lsp::GotoDefinitionResponse::Array(
6085                                    files
6086                                        .into_iter()
6087                                        .map(|file| lsp::Location {
6088                                            uri: lsp::Url::from_file_path(file).unwrap(),
6089                                            range: Default::default(),
6090                                        })
6091                                        .collect(),
6092                                )))
6093                            }
6094                        }
6095                    });
6096
6097                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6098                        let rng = rng.clone();
6099                        let project = project.clone();
6100                        move |params, mut cx| {
6101                            let highlights = if let Some(project) = project.upgrade(&cx) {
6102                                project.update(&mut cx, |project, cx| {
6103                                    let path = params
6104                                        .text_document_position_params
6105                                        .text_document
6106                                        .uri
6107                                        .to_file_path()
6108                                        .unwrap();
6109                                    let (worktree, relative_path) =
6110                                        project.find_local_worktree(&path, cx)?;
6111                                    let project_path =
6112                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
6113                                    let buffer =
6114                                        project.get_open_buffer(&project_path, cx)?.read(cx);
6115
6116                                    let mut highlights = Vec::new();
6117                                    let highlight_count = rng.lock().gen_range(1..=5);
6118                                    let mut prev_end = 0;
6119                                    for _ in 0..highlight_count {
6120                                        let range =
6121                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
6122
6123                                        highlights.push(lsp::DocumentHighlight {
6124                                            range: range_to_lsp(range.to_point_utf16(buffer)),
6125                                            kind: Some(lsp::DocumentHighlightKind::READ),
6126                                        });
6127                                        prev_end = range.end;
6128                                    }
6129                                    Some(highlights)
6130                                })
6131                            } else {
6132                                None
6133                            };
6134                            async move { Ok(highlights) }
6135                        }
6136                    });
6137                }
6138            })),
6139            ..Default::default()
6140        });
6141        host_language_registry.add(Arc::new(language));
6142
6143        let op_start_signal = futures::channel::mpsc::unbounded();
6144        user_ids.push(host.current_user_id(&host_cx));
6145        op_start_signals.push(op_start_signal.0);
6146        clients.push(host_cx.foreground().spawn(host.simulate_host(
6147            host_project,
6148            op_start_signal.1,
6149            rng.clone(),
6150            host_cx,
6151        )));
6152
6153        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6154            rng.lock().gen_range(0..max_operations)
6155        } else {
6156            max_operations
6157        };
6158        let mut available_guests = vec![
6159            "guest-1".to_string(),
6160            "guest-2".to_string(),
6161            "guest-3".to_string(),
6162            "guest-4".to_string(),
6163        ];
6164        let mut operations = 0;
6165        while operations < max_operations {
6166            if operations == disconnect_host_at {
6167                server.disconnect_client(user_ids[0]);
6168                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6169                drop(op_start_signals);
6170                let mut clients = futures::future::join_all(clients).await;
6171                cx.foreground().run_until_parked();
6172
6173                let (host, mut host_cx, host_err) = clients.remove(0);
6174                if let Some(host_err) = host_err {
6175                    log::error!("host error - {}", host_err);
6176                }
6177                host.project
6178                    .as_ref()
6179                    .unwrap()
6180                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6181                for (guest, mut guest_cx, guest_err) in clients {
6182                    if let Some(guest_err) = guest_err {
6183                        log::error!("{} error - {}", guest.username, guest_err);
6184                    }
6185                    // TODO
6186                    // let contacts = server
6187                    //     .store
6188                    //     .read()
6189                    //     .await
6190                    //     .contacts_for_user(guest.current_user_id(&guest_cx));
6191                    // assert!(!contacts
6192                    //     .iter()
6193                    //     .flat_map(|contact| &contact.projects)
6194                    //     .any(|project| project.id == host_project_id));
6195                    guest
6196                        .project
6197                        .as_ref()
6198                        .unwrap()
6199                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6200                    guest_cx.update(|_| drop(guest));
6201                }
6202                host_cx.update(|_| drop(host));
6203
6204                return;
6205            }
6206
6207            let distribution = rng.lock().gen_range(0..100);
6208            match distribution {
6209                0..=19 if !available_guests.is_empty() => {
6210                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
6211                    let guest_username = available_guests.remove(guest_ix);
6212                    log::info!("Adding new connection for {}", guest_username);
6213                    next_entity_id += 100000;
6214                    let mut guest_cx = TestAppContext::new(
6215                        cx.foreground_platform(),
6216                        cx.platform(),
6217                        deterministic.build_foreground(next_entity_id),
6218                        deterministic.build_background(),
6219                        cx.font_cache(),
6220                        cx.leak_detector(),
6221                        next_entity_id,
6222                    );
6223                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
6224                    let guest_project = Project::remote(
6225                        host_project_id,
6226                        guest.client.clone(),
6227                        guest.user_store.clone(),
6228                        guest_lang_registry.clone(),
6229                        FakeFs::new(cx.background()),
6230                        &mut guest_cx.to_async(),
6231                    )
6232                    .await
6233                    .unwrap();
6234                    let op_start_signal = futures::channel::mpsc::unbounded();
6235                    user_ids.push(guest.current_user_id(&guest_cx));
6236                    op_start_signals.push(op_start_signal.0);
6237                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6238                        guest_username.clone(),
6239                        guest_project,
6240                        op_start_signal.1,
6241                        rng.clone(),
6242                        guest_cx,
6243                    )));
6244
6245                    log::info!("Added connection for {}", guest_username);
6246                    operations += 1;
6247                }
6248                20..=29 if clients.len() > 1 => {
6249                    let guest_ix = rng.lock().gen_range(1..clients.len());
6250                    log::info!("Removing guest {}", user_ids[guest_ix]);
6251                    let removed_guest_id = user_ids.remove(guest_ix);
6252                    let guest = clients.remove(guest_ix);
6253                    op_start_signals.remove(guest_ix);
6254                    server.disconnect_client(removed_guest_id);
6255                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6256                    let (guest, mut guest_cx, guest_err) = guest.await;
6257                    if let Some(guest_err) = guest_err {
6258                        log::error!("{} error - {}", guest.username, guest_err);
6259                    }
6260                    guest
6261                        .project
6262                        .as_ref()
6263                        .unwrap()
6264                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6265                    // TODO
6266                    // for user_id in &user_ids {
6267                    //     for contact in server.store.read().await.contacts_for_user(*user_id) {
6268                    //         assert_ne!(
6269                    //             contact.user_id, removed_guest_id.0 as u64,
6270                    //             "removed guest is still a contact of another peer"
6271                    //         );
6272                    //         for project in contact.projects {
6273                    //             for project_guest_id in project.guests {
6274                    //                 assert_ne!(
6275                    //                     project_guest_id, removed_guest_id.0 as u64,
6276                    //                     "removed guest appears as still participating on a project"
6277                    //                 );
6278                    //             }
6279                    //         }
6280                    //     }
6281                    // }
6282
6283                    log::info!("{} removed", guest.username);
6284                    available_guests.push(guest.username.clone());
6285                    guest_cx.update(|_| drop(guest));
6286
6287                    operations += 1;
6288                }
6289                _ => {
6290                    while operations < max_operations && rng.lock().gen_bool(0.7) {
6291                        op_start_signals
6292                            .choose(&mut *rng.lock())
6293                            .unwrap()
6294                            .unbounded_send(())
6295                            .unwrap();
6296                        operations += 1;
6297                    }
6298
6299                    if rng.lock().gen_bool(0.8) {
6300                        cx.foreground().run_until_parked();
6301                    }
6302                }
6303            }
6304        }
6305
6306        drop(op_start_signals);
6307        let mut clients = futures::future::join_all(clients).await;
6308        cx.foreground().run_until_parked();
6309
6310        let (host_client, mut host_cx, host_err) = clients.remove(0);
6311        if let Some(host_err) = host_err {
6312            panic!("host error - {}", host_err);
6313        }
6314        let host_project = host_client.project.as_ref().unwrap();
6315        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6316            project
6317                .worktrees(cx)
6318                .map(|worktree| {
6319                    let snapshot = worktree.read(cx).snapshot();
6320                    (snapshot.id(), snapshot)
6321                })
6322                .collect::<BTreeMap<_, _>>()
6323        });
6324
6325        host_client
6326            .project
6327            .as_ref()
6328            .unwrap()
6329            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6330
6331        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6332            if let Some(guest_err) = guest_err {
6333                panic!("{} error - {}", guest_client.username, guest_err);
6334            }
6335            let worktree_snapshots =
6336                guest_client
6337                    .project
6338                    .as_ref()
6339                    .unwrap()
6340                    .read_with(&guest_cx, |project, cx| {
6341                        project
6342                            .worktrees(cx)
6343                            .map(|worktree| {
6344                                let worktree = worktree.read(cx);
6345                                (worktree.id(), worktree.snapshot())
6346                            })
6347                            .collect::<BTreeMap<_, _>>()
6348                    });
6349
6350            assert_eq!(
6351                worktree_snapshots.keys().collect::<Vec<_>>(),
6352                host_worktree_snapshots.keys().collect::<Vec<_>>(),
6353                "{} has different worktrees than the host",
6354                guest_client.username
6355            );
6356            for (id, host_snapshot) in &host_worktree_snapshots {
6357                let guest_snapshot = &worktree_snapshots[id];
6358                assert_eq!(
6359                    guest_snapshot.root_name(),
6360                    host_snapshot.root_name(),
6361                    "{} has different root name than the host for worktree {}",
6362                    guest_client.username,
6363                    id
6364                );
6365                assert_eq!(
6366                    guest_snapshot.entries(false).collect::<Vec<_>>(),
6367                    host_snapshot.entries(false).collect::<Vec<_>>(),
6368                    "{} has different snapshot than the host for worktree {}",
6369                    guest_client.username,
6370                    id
6371                );
6372                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6373            }
6374
6375            guest_client
6376                .project
6377                .as_ref()
6378                .unwrap()
6379                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6380
6381            for guest_buffer in &guest_client.buffers {
6382                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6383                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6384                    project.buffer_for_id(buffer_id, cx).expect(&format!(
6385                        "host does not have buffer for guest:{}, peer:{}, id:{}",
6386                        guest_client.username, guest_client.peer_id, buffer_id
6387                    ))
6388                });
6389                let path = host_buffer
6390                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6391
6392                assert_eq!(
6393                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6394                    0,
6395                    "{}, buffer {}, path {:?} has deferred operations",
6396                    guest_client.username,
6397                    buffer_id,
6398                    path,
6399                );
6400                assert_eq!(
6401                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6402                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6403                    "{}, buffer {}, path {:?}, differs from the host's buffer",
6404                    guest_client.username,
6405                    buffer_id,
6406                    path
6407                );
6408            }
6409
6410            guest_cx.update(|_| drop(guest_client));
6411        }
6412
6413        host_cx.update(|_| drop(host_client));
6414    }
6415
6416    struct TestServer {
6417        peer: Arc<Peer>,
6418        app_state: Arc<AppState>,
6419        server: Arc<Server>,
6420        foreground: Rc<executor::Foreground>,
6421        notifications: mpsc::UnboundedReceiver<()>,
6422        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6423        forbid_connections: Arc<AtomicBool>,
6424        _test_db: TestDb,
6425    }
6426
6427    impl TestServer {
6428        async fn start(
6429            foreground: Rc<executor::Foreground>,
6430            background: Arc<executor::Background>,
6431        ) -> Self {
6432            let test_db = TestDb::fake(background);
6433            let app_state = Self::build_app_state(&test_db).await;
6434            let peer = Peer::new();
6435            let notifications = mpsc::unbounded();
6436            let server = Server::new(app_state.clone(), Some(notifications.0));
6437            Self {
6438                peer,
6439                app_state,
6440                server,
6441                foreground,
6442                notifications: notifications.1,
6443                connection_killers: Default::default(),
6444                forbid_connections: Default::default(),
6445                _test_db: test_db,
6446            }
6447        }
6448
6449        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6450            cx.update(|cx| {
6451                let settings = Settings::test(cx);
6452                cx.set_global(settings);
6453            });
6454
6455            let http = FakeHttpClient::with_404_response();
6456            let user_id =
6457                if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6458                    user.id
6459                } else {
6460                    self.app_state.db.create_user(name, false).await.unwrap()
6461                };
6462            let client_name = name.to_string();
6463            let mut client = Client::new(http.clone());
6464            let server = self.server.clone();
6465            let connection_killers = self.connection_killers.clone();
6466            let forbid_connections = self.forbid_connections.clone();
6467            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6468
6469            Arc::get_mut(&mut client)
6470                .unwrap()
6471                .override_authenticate(move |cx| {
6472                    cx.spawn(|_| async move {
6473                        let access_token = "the-token".to_string();
6474                        Ok(Credentials {
6475                            user_id: user_id.0 as u64,
6476                            access_token,
6477                        })
6478                    })
6479                })
6480                .override_establish_connection(move |credentials, cx| {
6481                    assert_eq!(credentials.user_id, user_id.0 as u64);
6482                    assert_eq!(credentials.access_token, "the-token");
6483
6484                    let server = server.clone();
6485                    let connection_killers = connection_killers.clone();
6486                    let forbid_connections = forbid_connections.clone();
6487                    let client_name = client_name.clone();
6488                    let connection_id_tx = connection_id_tx.clone();
6489                    cx.spawn(move |cx| async move {
6490                        if forbid_connections.load(SeqCst) {
6491                            Err(EstablishConnectionError::other(anyhow!(
6492                                "server is forbidding connections"
6493                            )))
6494                        } else {
6495                            let (client_conn, server_conn, killed) =
6496                                Connection::in_memory(cx.background());
6497                            connection_killers.lock().insert(user_id, killed);
6498                            cx.background()
6499                                .spawn(server.handle_connection(
6500                                    server_conn,
6501                                    client_name,
6502                                    user_id,
6503                                    Some(connection_id_tx),
6504                                    cx.background(),
6505                                ))
6506                                .detach();
6507                            Ok(client_conn)
6508                        }
6509                    })
6510                });
6511
6512            client
6513                .authenticate_and_connect(false, &cx.to_async())
6514                .await
6515                .unwrap();
6516
6517            Channel::init(&client);
6518            Project::init(&client);
6519            cx.update(|cx| {
6520                workspace::init(&client, cx);
6521            });
6522
6523            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6524            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6525
6526            let client = TestClient {
6527                client,
6528                peer_id,
6529                username: name.to_string(),
6530                user_store,
6531                language_registry: Arc::new(LanguageRegistry::test()),
6532                project: Default::default(),
6533                buffers: Default::default(),
6534            };
6535            client.wait_for_current_user(cx).await;
6536            client
6537        }
6538
6539        fn disconnect_client(&self, user_id: UserId) {
6540            self.connection_killers
6541                .lock()
6542                .remove(&user_id)
6543                .unwrap()
6544                .store(true, SeqCst);
6545        }
6546
6547        fn forbid_connections(&self) {
6548            self.forbid_connections.store(true, SeqCst);
6549        }
6550
6551        fn allow_connections(&self) {
6552            self.forbid_connections.store(false, SeqCst);
6553        }
6554
6555        async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6556            while let Some((client_a, cx_a)) = clients.pop() {
6557                for (client_b, cx_b) in &mut clients {
6558                    client_a
6559                        .user_store
6560                        .update(cx_a, |store, cx| {
6561                            store.request_contact(client_b.user_id().unwrap(), cx)
6562                        })
6563                        .await
6564                        .unwrap();
6565                    cx_a.foreground().run_until_parked();
6566                    client_b
6567                        .user_store
6568                        .update(*cx_b, |store, cx| {
6569                            store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6570                        })
6571                        .await
6572                        .unwrap();
6573                }
6574            }
6575        }
6576
6577        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6578            Arc::new(AppState {
6579                db: test_db.db().clone(),
6580                api_token: Default::default(),
6581            })
6582        }
6583
6584        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6585            self.server.store.read().await
6586        }
6587
6588        async fn condition<F>(&mut self, mut predicate: F)
6589        where
6590            F: FnMut(&Store) -> bool,
6591        {
6592            assert!(
6593                self.foreground.parking_forbidden(),
6594                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6595            );
6596            while !(predicate)(&*self.server.store.read().await) {
6597                self.foreground.start_waiting();
6598                self.notifications.next().await;
6599                self.foreground.finish_waiting();
6600            }
6601        }
6602    }
6603
6604    impl Deref for TestServer {
6605        type Target = Server;
6606
6607        fn deref(&self) -> &Self::Target {
6608            &self.server
6609        }
6610    }
6611
6612    impl Drop for TestServer {
6613        fn drop(&mut self) {
6614            self.peer.reset();
6615        }
6616    }
6617
6618    struct TestClient {
6619        client: Arc<Client>,
6620        username: String,
6621        pub peer_id: PeerId,
6622        pub user_store: ModelHandle<UserStore>,
6623        language_registry: Arc<LanguageRegistry>,
6624        project: Option<ModelHandle<Project>>,
6625        buffers: HashSet<ModelHandle<language::Buffer>>,
6626    }
6627
6628    impl Deref for TestClient {
6629        type Target = Arc<Client>;
6630
6631        fn deref(&self) -> &Self::Target {
6632            &self.client
6633        }
6634    }
6635
6636    struct ContactsSummary {
6637        pub current: Vec<String>,
6638        pub outgoing_requests: Vec<String>,
6639        pub incoming_requests: Vec<String>,
6640    }
6641
6642    impl TestClient {
6643        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6644            UserId::from_proto(
6645                self.user_store
6646                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6647            )
6648        }
6649
6650        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6651            let mut authed_user = self
6652                .user_store
6653                .read_with(cx, |user_store, _| user_store.watch_current_user());
6654            while authed_user.next().await.unwrap().is_none() {}
6655        }
6656
6657        async fn clear_contacts(&self, cx: &mut TestAppContext) {
6658            self.user_store
6659                .update(cx, |store, _| store.clear_contacts())
6660                .await;
6661        }
6662
6663        fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6664            self.user_store.read_with(cx, |store, _| ContactsSummary {
6665                current: store
6666                    .contacts()
6667                    .iter()
6668                    .map(|contact| contact.user.github_login.clone())
6669                    .collect(),
6670                outgoing_requests: store
6671                    .outgoing_contact_requests()
6672                    .iter()
6673                    .map(|user| user.github_login.clone())
6674                    .collect(),
6675                incoming_requests: store
6676                    .incoming_contact_requests()
6677                    .iter()
6678                    .map(|user| user.github_login.clone())
6679                    .collect(),
6680            })
6681        }
6682
6683        async fn build_local_project(
6684            &mut self,
6685            fs: Arc<FakeFs>,
6686            root_path: impl AsRef<Path>,
6687            cx: &mut TestAppContext,
6688        ) -> (ModelHandle<Project>, WorktreeId) {
6689            let project = cx.update(|cx| {
6690                Project::local(
6691                    self.client.clone(),
6692                    self.user_store.clone(),
6693                    self.language_registry.clone(),
6694                    fs,
6695                    cx,
6696                )
6697            });
6698            self.project = Some(project.clone());
6699            let (worktree, _) = project
6700                .update(cx, |p, cx| {
6701                    p.find_or_create_local_worktree(root_path, true, cx)
6702                })
6703                .await
6704                .unwrap();
6705            worktree
6706                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6707                .await;
6708            project
6709                .update(cx, |project, _| project.next_remote_id())
6710                .await;
6711            (project, worktree.read_with(cx, |tree, _| tree.id()))
6712        }
6713
6714        async fn build_remote_project(
6715            &mut self,
6716            project_id: u64,
6717            cx: &mut TestAppContext,
6718        ) -> ModelHandle<Project> {
6719            let project = Project::remote(
6720                project_id,
6721                self.client.clone(),
6722                self.user_store.clone(),
6723                self.language_registry.clone(),
6724                FakeFs::new(cx.background()),
6725                &mut cx.to_async(),
6726            )
6727            .await
6728            .unwrap();
6729            self.project = Some(project.clone());
6730            project
6731        }
6732
6733        fn build_workspace(
6734            &self,
6735            project: &ModelHandle<Project>,
6736            cx: &mut TestAppContext,
6737        ) -> ViewHandle<Workspace> {
6738            let (window_id, _) = cx.add_window(|_| EmptyView);
6739            cx.add_view(window_id, |cx| {
6740                let fs = project.read(cx).fs().clone();
6741                Workspace::new(
6742                    &WorkspaceParams {
6743                        fs,
6744                        project: project.clone(),
6745                        user_store: self.user_store.clone(),
6746                        languages: self.language_registry.clone(),
6747                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6748                        channel_list: cx.add_model(|cx| {
6749                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6750                        }),
6751                        client: self.client.clone(),
6752                    },
6753                    cx,
6754                )
6755            })
6756        }
6757
6758        async fn simulate_host(
6759            mut self,
6760            project: ModelHandle<Project>,
6761            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6762            rng: Arc<Mutex<StdRng>>,
6763            mut cx: TestAppContext,
6764        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6765            async fn simulate_host_internal(
6766                client: &mut TestClient,
6767                project: ModelHandle<Project>,
6768                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6769                rng: Arc<Mutex<StdRng>>,
6770                cx: &mut TestAppContext,
6771            ) -> anyhow::Result<()> {
6772                let fs = project.read_with(cx, |project, _| project.fs().clone());
6773
6774                while op_start_signal.next().await.is_some() {
6775                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6776                    let files = fs.as_fake().files().await;
6777                    match distribution {
6778                        0..=20 if !files.is_empty() => {
6779                            let path = files.choose(&mut *rng.lock()).unwrap();
6780                            let mut path = path.as_path();
6781                            while let Some(parent_path) = path.parent() {
6782                                path = parent_path;
6783                                if rng.lock().gen() {
6784                                    break;
6785                                }
6786                            }
6787
6788                            log::info!("Host: find/create local worktree {:?}", path);
6789                            let find_or_create_worktree = project.update(cx, |project, cx| {
6790                                project.find_or_create_local_worktree(path, true, cx)
6791                            });
6792                            if rng.lock().gen() {
6793                                cx.background().spawn(find_or_create_worktree).detach();
6794                            } else {
6795                                find_or_create_worktree.await?;
6796                            }
6797                        }
6798                        10..=80 if !files.is_empty() => {
6799                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6800                                let file = files.choose(&mut *rng.lock()).unwrap();
6801                                let (worktree, path) = project
6802                                    .update(cx, |project, cx| {
6803                                        project.find_or_create_local_worktree(
6804                                            file.clone(),
6805                                            true,
6806                                            cx,
6807                                        )
6808                                    })
6809                                    .await?;
6810                                let project_path =
6811                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6812                                log::info!(
6813                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6814                                    file,
6815                                    project_path.0,
6816                                    project_path.1
6817                                );
6818                                let buffer = project
6819                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6820                                    .await
6821                                    .unwrap();
6822                                client.buffers.insert(buffer.clone());
6823                                buffer
6824                            } else {
6825                                client
6826                                    .buffers
6827                                    .iter()
6828                                    .choose(&mut *rng.lock())
6829                                    .unwrap()
6830                                    .clone()
6831                            };
6832
6833                            if rng.lock().gen_bool(0.1) {
6834                                cx.update(|cx| {
6835                                    log::info!(
6836                                        "Host: dropping buffer {:?}",
6837                                        buffer.read(cx).file().unwrap().full_path(cx)
6838                                    );
6839                                    client.buffers.remove(&buffer);
6840                                    drop(buffer);
6841                                });
6842                            } else {
6843                                buffer.update(cx, |buffer, cx| {
6844                                    log::info!(
6845                                        "Host: updating buffer {:?} ({})",
6846                                        buffer.file().unwrap().full_path(cx),
6847                                        buffer.remote_id()
6848                                    );
6849
6850                                    if rng.lock().gen_bool(0.7) {
6851                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6852                                    } else {
6853                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6854                                    }
6855                                });
6856                            }
6857                        }
6858                        _ => loop {
6859                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6860                            let mut path = PathBuf::new();
6861                            path.push("/");
6862                            for _ in 0..path_component_count {
6863                                let letter = rng.lock().gen_range(b'a'..=b'z');
6864                                path.push(std::str::from_utf8(&[letter]).unwrap());
6865                            }
6866                            path.set_extension("rs");
6867                            let parent_path = path.parent().unwrap();
6868
6869                            log::info!("Host: creating file {:?}", path,);
6870
6871                            if fs.create_dir(&parent_path).await.is_ok()
6872                                && fs.create_file(&path, Default::default()).await.is_ok()
6873                            {
6874                                break;
6875                            } else {
6876                                log::info!("Host: cannot create file");
6877                            }
6878                        },
6879                    }
6880
6881                    cx.background().simulate_random_delay().await;
6882                }
6883
6884                Ok(())
6885            }
6886
6887            let result =
6888                simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
6889                    .await;
6890            log::info!("Host done");
6891            self.project = Some(project);
6892            (self, cx, result.err())
6893        }
6894
6895        pub async fn simulate_guest(
6896            mut self,
6897            guest_username: String,
6898            project: ModelHandle<Project>,
6899            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6900            rng: Arc<Mutex<StdRng>>,
6901            mut cx: TestAppContext,
6902        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6903            async fn simulate_guest_internal(
6904                client: &mut TestClient,
6905                guest_username: &str,
6906                project: ModelHandle<Project>,
6907                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6908                rng: Arc<Mutex<StdRng>>,
6909                cx: &mut TestAppContext,
6910            ) -> anyhow::Result<()> {
6911                while op_start_signal.next().await.is_some() {
6912                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6913                        let worktree = if let Some(worktree) =
6914                            project.read_with(cx, |project, cx| {
6915                                project
6916                                    .worktrees(&cx)
6917                                    .filter(|worktree| {
6918                                        let worktree = worktree.read(cx);
6919                                        worktree.is_visible()
6920                                            && worktree.entries(false).any(|e| e.is_file())
6921                                    })
6922                                    .choose(&mut *rng.lock())
6923                            }) {
6924                            worktree
6925                        } else {
6926                            cx.background().simulate_random_delay().await;
6927                            continue;
6928                        };
6929
6930                        let (worktree_root_name, project_path) =
6931                            worktree.read_with(cx, |worktree, _| {
6932                                let entry = worktree
6933                                    .entries(false)
6934                                    .filter(|e| e.is_file())
6935                                    .choose(&mut *rng.lock())
6936                                    .unwrap();
6937                                (
6938                                    worktree.root_name().to_string(),
6939                                    (worktree.id(), entry.path.clone()),
6940                                )
6941                            });
6942                        log::info!(
6943                            "{}: opening path {:?} in worktree {} ({})",
6944                            guest_username,
6945                            project_path.1,
6946                            project_path.0,
6947                            worktree_root_name,
6948                        );
6949                        let buffer = project
6950                            .update(cx, |project, cx| {
6951                                project.open_buffer(project_path.clone(), cx)
6952                            })
6953                            .await?;
6954                        log::info!(
6955                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6956                            guest_username,
6957                            project_path.1,
6958                            project_path.0,
6959                            worktree_root_name,
6960                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6961                        );
6962                        client.buffers.insert(buffer.clone());
6963                        buffer
6964                    } else {
6965                        client
6966                            .buffers
6967                            .iter()
6968                            .choose(&mut *rng.lock())
6969                            .unwrap()
6970                            .clone()
6971                    };
6972
6973                    let choice = rng.lock().gen_range(0..100);
6974                    match choice {
6975                        0..=9 => {
6976                            cx.update(|cx| {
6977                                log::info!(
6978                                    "{}: dropping buffer {:?}",
6979                                    guest_username,
6980                                    buffer.read(cx).file().unwrap().full_path(cx)
6981                                );
6982                                client.buffers.remove(&buffer);
6983                                drop(buffer);
6984                            });
6985                        }
6986                        10..=19 => {
6987                            let completions = project.update(cx, |project, cx| {
6988                                log::info!(
6989                                    "{}: requesting completions for buffer {} ({:?})",
6990                                    guest_username,
6991                                    buffer.read(cx).remote_id(),
6992                                    buffer.read(cx).file().unwrap().full_path(cx)
6993                                );
6994                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6995                                project.completions(&buffer, offset, cx)
6996                            });
6997                            let completions = cx.background().spawn(async move {
6998                                completions
6999                                    .await
7000                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
7001                            });
7002                            if rng.lock().gen_bool(0.3) {
7003                                log::info!("{}: detaching completions request", guest_username);
7004                                cx.update(|cx| completions.detach_and_log_err(cx));
7005                            } else {
7006                                completions.await?;
7007                            }
7008                        }
7009                        20..=29 => {
7010                            let code_actions = project.update(cx, |project, cx| {
7011                                log::info!(
7012                                    "{}: requesting code actions for buffer {} ({:?})",
7013                                    guest_username,
7014                                    buffer.read(cx).remote_id(),
7015                                    buffer.read(cx).file().unwrap().full_path(cx)
7016                                );
7017                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7018                                project.code_actions(&buffer, range, cx)
7019                            });
7020                            let code_actions = cx.background().spawn(async move {
7021                                code_actions.await.map_err(|err| {
7022                                    anyhow!("code actions request failed: {:?}", err)
7023                                })
7024                            });
7025                            if rng.lock().gen_bool(0.3) {
7026                                log::info!("{}: detaching code actions request", guest_username);
7027                                cx.update(|cx| code_actions.detach_and_log_err(cx));
7028                            } else {
7029                                code_actions.await?;
7030                            }
7031                        }
7032                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7033                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7034                                log::info!(
7035                                    "{}: saving buffer {} ({:?})",
7036                                    guest_username,
7037                                    buffer.remote_id(),
7038                                    buffer.file().unwrap().full_path(cx)
7039                                );
7040                                (buffer.version(), buffer.save(cx))
7041                            });
7042                            let save = cx.background().spawn(async move {
7043                                let (saved_version, _) = save
7044                                    .await
7045                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7046                                assert!(saved_version.observed_all(&requested_version));
7047                                Ok::<_, anyhow::Error>(())
7048                            });
7049                            if rng.lock().gen_bool(0.3) {
7050                                log::info!("{}: detaching save request", guest_username);
7051                                cx.update(|cx| save.detach_and_log_err(cx));
7052                            } else {
7053                                save.await?;
7054                            }
7055                        }
7056                        40..=44 => {
7057                            let prepare_rename = project.update(cx, |project, cx| {
7058                                log::info!(
7059                                    "{}: preparing rename for buffer {} ({:?})",
7060                                    guest_username,
7061                                    buffer.read(cx).remote_id(),
7062                                    buffer.read(cx).file().unwrap().full_path(cx)
7063                                );
7064                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7065                                project.prepare_rename(buffer, offset, cx)
7066                            });
7067                            let prepare_rename = cx.background().spawn(async move {
7068                                prepare_rename.await.map_err(|err| {
7069                                    anyhow!("prepare rename request failed: {:?}", err)
7070                                })
7071                            });
7072                            if rng.lock().gen_bool(0.3) {
7073                                log::info!("{}: detaching prepare rename request", guest_username);
7074                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7075                            } else {
7076                                prepare_rename.await?;
7077                            }
7078                        }
7079                        45..=49 => {
7080                            let definitions = project.update(cx, |project, cx| {
7081                                log::info!(
7082                                    "{}: requesting definitions for buffer {} ({:?})",
7083                                    guest_username,
7084                                    buffer.read(cx).remote_id(),
7085                                    buffer.read(cx).file().unwrap().full_path(cx)
7086                                );
7087                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7088                                project.definition(&buffer, offset, cx)
7089                            });
7090                            let definitions = cx.background().spawn(async move {
7091                                definitions
7092                                    .await
7093                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7094                            });
7095                            if rng.lock().gen_bool(0.3) {
7096                                log::info!("{}: detaching definitions request", guest_username);
7097                                cx.update(|cx| definitions.detach_and_log_err(cx));
7098                            } else {
7099                                client
7100                                    .buffers
7101                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7102                            }
7103                        }
7104                        50..=54 => {
7105                            let highlights = project.update(cx, |project, cx| {
7106                                log::info!(
7107                                    "{}: requesting highlights for buffer {} ({:?})",
7108                                    guest_username,
7109                                    buffer.read(cx).remote_id(),
7110                                    buffer.read(cx).file().unwrap().full_path(cx)
7111                                );
7112                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7113                                project.document_highlights(&buffer, offset, cx)
7114                            });
7115                            let highlights = cx.background().spawn(async move {
7116                                highlights
7117                                    .await
7118                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7119                            });
7120                            if rng.lock().gen_bool(0.3) {
7121                                log::info!("{}: detaching highlights request", guest_username);
7122                                cx.update(|cx| highlights.detach_and_log_err(cx));
7123                            } else {
7124                                highlights.await?;
7125                            }
7126                        }
7127                        55..=59 => {
7128                            let search = project.update(cx, |project, cx| {
7129                                let query = rng.lock().gen_range('a'..='z');
7130                                log::info!("{}: project-wide search {:?}", guest_username, query);
7131                                project.search(SearchQuery::text(query, false, false), cx)
7132                            });
7133                            let search = cx.background().spawn(async move {
7134                                search
7135                                    .await
7136                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
7137                            });
7138                            if rng.lock().gen_bool(0.3) {
7139                                log::info!("{}: detaching search request", guest_username);
7140                                cx.update(|cx| search.detach_and_log_err(cx));
7141                            } else {
7142                                client.buffers.extend(search.await?.into_keys());
7143                            }
7144                        }
7145                        60..=69 => {
7146                            let worktree = project
7147                                .read_with(cx, |project, cx| {
7148                                    project
7149                                        .worktrees(&cx)
7150                                        .filter(|worktree| {
7151                                            let worktree = worktree.read(cx);
7152                                            worktree.is_visible()
7153                                                && worktree.entries(false).any(|e| e.is_file())
7154                                                && worktree
7155                                                    .root_entry()
7156                                                    .map_or(false, |e| e.is_dir())
7157                                        })
7158                                        .choose(&mut *rng.lock())
7159                                })
7160                                .unwrap();
7161                            let (worktree_id, worktree_root_name) = worktree
7162                                .read_with(cx, |worktree, _| {
7163                                    (worktree.id(), worktree.root_name().to_string())
7164                                });
7165
7166                            let mut new_name = String::new();
7167                            for _ in 0..10 {
7168                                let letter = rng.lock().gen_range('a'..='z');
7169                                new_name.push(letter);
7170                            }
7171                            let mut new_path = PathBuf::new();
7172                            new_path.push(new_name);
7173                            new_path.set_extension("rs");
7174                            log::info!(
7175                                "{}: creating {:?} in worktree {} ({})",
7176                                guest_username,
7177                                new_path,
7178                                worktree_id,
7179                                worktree_root_name,
7180                            );
7181                            project
7182                                .update(cx, |project, cx| {
7183                                    project.create_entry((worktree_id, new_path), false, cx)
7184                                })
7185                                .unwrap()
7186                                .await?;
7187                        }
7188                        _ => {
7189                            buffer.update(cx, |buffer, cx| {
7190                                log::info!(
7191                                    "{}: updating buffer {} ({:?})",
7192                                    guest_username,
7193                                    buffer.remote_id(),
7194                                    buffer.file().unwrap().full_path(cx)
7195                                );
7196                                if rng.lock().gen_bool(0.7) {
7197                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7198                                } else {
7199                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7200                                }
7201                            });
7202                        }
7203                    }
7204                    cx.background().simulate_random_delay().await;
7205                }
7206                Ok(())
7207            }
7208
7209            let result = simulate_guest_internal(
7210                &mut self,
7211                &guest_username,
7212                project.clone(),
7213                op_start_signal,
7214                rng,
7215                &mut cx,
7216            )
7217            .await;
7218            log::info!("{}: done", guest_username);
7219
7220            self.project = Some(project);
7221            (self, cx, result.err())
7222        }
7223    }
7224
7225    impl Drop for TestClient {
7226        fn drop(&mut self) {
7227            self.client.tear_down();
7228        }
7229    }
7230
7231    impl Executor for Arc<gpui::executor::Background> {
7232        type Sleep = gpui::executor::Timer;
7233
7234        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7235            self.spawn(future).detach();
7236        }
7237
7238        fn sleep(&self, duration: Duration) -> Self::Sleep {
7239            self.as_ref().timer(duration)
7240        }
7241    }
7242
7243    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7244        channel
7245            .messages()
7246            .cursor::<()>()
7247            .map(|m| {
7248                (
7249                    m.sender.github_login.clone(),
7250                    m.body.clone(),
7251                    m.is_pending(),
7252                )
7253            })
7254            .collect()
7255    }
7256
7257    struct EmptyView;
7258
7259    impl gpui::Entity for EmptyView {
7260        type Event = ();
7261    }
7262
7263    impl gpui::View for EmptyView {
7264        fn ui_name() -> &'static str {
7265            "empty view"
7266        }
7267
7268        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7269            gpui::Element::boxed(gpui::elements::Empty::new())
7270        }
7271    }
7272}