rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::IntoResponse,
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::HashMap;
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::{
  40        atomic::{AtomicBool, Ordering::SeqCst},
  41        Arc,
  42    },
  43    time::Duration,
  44};
  45use store::{Store, Worktree};
  46use time::OffsetDateTime;
  47use tokio::{
  48    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  49    time::Sleep,
  50};
  51use tower::ServiceBuilder;
  52use tracing::{info_span, Instrument};
  53
  54type MessageHandler =
  55    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  56
  57struct Response<R> {
  58    server: Arc<Server>,
  59    receipt: Receipt<R>,
  60    responded: Arc<AtomicBool>,
  61}
  62
  63impl<R: RequestMessage> Response<R> {
  64    fn send(self, payload: R::Response) -> Result<()> {
  65        self.responded.store(true, SeqCst);
  66        self.server.peer.respond(self.receipt, payload)?;
  67        Ok(())
  68    }
  69}
  70
  71pub struct Server {
  72    peer: Arc<Peer>,
  73    store: RwLock<Store>,
  74    app_state: Arc<AppState>,
  75    handlers: HashMap<TypeId, MessageHandler>,
  76    notifications: Option<mpsc::UnboundedSender<()>>,
  77}
  78
  79pub trait Executor: Send + Clone {
  80    type Sleep: Send + Future;
  81    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  82    fn sleep(&self, duration: Duration) -> Self::Sleep;
  83}
  84
  85#[derive(Clone)]
  86pub struct RealExecutor;
  87
  88const MESSAGE_COUNT_PER_PAGE: usize = 100;
  89const MAX_MESSAGE_LEN: usize = 1024;
  90
  91struct StoreReadGuard<'a> {
  92    guard: RwLockReadGuard<'a, Store>,
  93    _not_send: PhantomData<Rc<()>>,
  94}
  95
  96struct StoreWriteGuard<'a> {
  97    guard: RwLockWriteGuard<'a, Store>,
  98    _not_send: PhantomData<Rc<()>>,
  99}
 100
 101impl Server {
 102    pub fn new(
 103        app_state: Arc<AppState>,
 104        notifications: Option<mpsc::UnboundedSender<()>>,
 105    ) -> Arc<Self> {
 106        let mut server = Self {
 107            peer: Peer::new(),
 108            app_state,
 109            store: Default::default(),
 110            handlers: Default::default(),
 111            notifications,
 112        };
 113
 114        server
 115            .add_request_handler(Server::ping)
 116            .add_request_handler(Server::register_project)
 117            .add_message_handler(Server::unregister_project)
 118            .add_request_handler(Server::share_project)
 119            .add_message_handler(Server::unshare_project)
 120            .add_request_handler(Server::join_project)
 121            .add_message_handler(Server::leave_project)
 122            .add_request_handler(Server::register_worktree)
 123            .add_message_handler(Server::unregister_worktree)
 124            .add_request_handler(Server::update_worktree)
 125            .add_message_handler(Server::start_language_server)
 126            .add_message_handler(Server::update_language_server)
 127            .add_message_handler(Server::update_diagnostic_summary)
 128            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 129            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 130            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 131            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 132            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 133            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 134            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 135            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 136            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 137            .add_request_handler(
 138                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 139            )
 140            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 141            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 142            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 143            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 144            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 145            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 146            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 147            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 148            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 149            .add_request_handler(Server::update_buffer)
 150            .add_message_handler(Server::update_buffer_file)
 151            .add_message_handler(Server::buffer_reloaded)
 152            .add_message_handler(Server::buffer_saved)
 153            .add_request_handler(Server::save_buffer)
 154            .add_request_handler(Server::get_channels)
 155            .add_request_handler(Server::get_users)
 156            .add_request_handler(Server::fuzzy_search_users)
 157            .add_request_handler(Server::request_contact)
 158            .add_request_handler(Server::remove_contact)
 159            .add_request_handler(Server::respond_to_contact_request)
 160            .add_request_handler(Server::join_channel)
 161            .add_message_handler(Server::leave_channel)
 162            .add_request_handler(Server::send_channel_message)
 163            .add_request_handler(Server::follow)
 164            .add_message_handler(Server::unfollow)
 165            .add_message_handler(Server::update_followers)
 166            .add_request_handler(Server::get_channel_messages);
 167
 168        Arc::new(server)
 169    }
 170
 171    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 172    where
 173        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 174        Fut: 'static + Send + Future<Output = Result<()>>,
 175        M: EnvelopedMessage,
 176    {
 177        let prev_handler = self.handlers.insert(
 178            TypeId::of::<M>(),
 179            Box::new(move |server, envelope| {
 180                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 181                let span = info_span!(
 182                    "handle message",
 183                    payload_type = envelope.payload_type_name(),
 184                    payload = format!("{:?}", envelope.payload).as_str(),
 185                );
 186                let future = (handler)(server, *envelope);
 187                async move {
 188                    if let Err(error) = future.await {
 189                        tracing::error!(%error, "error handling message");
 190                    }
 191                }
 192                .instrument(span)
 193                .boxed()
 194            }),
 195        );
 196        if prev_handler.is_some() {
 197            panic!("registered a handler for the same message twice");
 198        }
 199        self
 200    }
 201
 202    /// Handle a request while holding a lock to the store. This is useful when we're registering
 203    /// a connection but we want to respond on the connection before anybody else can send on it.
 204    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 205    where
 206        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
 207        Fut: Send + Future<Output = Result<()>>,
 208        M: RequestMessage,
 209    {
 210        let handler = Arc::new(handler);
 211        self.add_message_handler(move |server, envelope| {
 212            let receipt = envelope.receipt();
 213            let handler = handler.clone();
 214            async move {
 215                let responded = Arc::new(AtomicBool::default());
 216                let response = Response {
 217                    server: server.clone(),
 218                    responded: responded.clone(),
 219                    receipt: envelope.receipt(),
 220                };
 221                match (handler)(server.clone(), envelope, response).await {
 222                    Ok(()) => {
 223                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 224                            Ok(())
 225                        } else {
 226                            Err(anyhow!("handler did not send a response"))?
 227                        }
 228                    }
 229                    Err(error) => {
 230                        server.peer.respond_with_error(
 231                            receipt,
 232                            proto::Error {
 233                                message: error.to_string(),
 234                            },
 235                        )?;
 236                        Err(error)
 237                    }
 238                }
 239            }
 240        })
 241    }
 242
 243    pub fn handle_connection<E: Executor>(
 244        self: &Arc<Self>,
 245        connection: Connection,
 246        address: String,
 247        user_id: UserId,
 248        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 249        executor: E,
 250    ) -> impl Future<Output = Result<()>> {
 251        let mut this = self.clone();
 252        let span = info_span!("handle connection", %user_id, %address);
 253        async move {
 254            let (connection_id, handle_io, mut incoming_rx) = this
 255                .peer
 256                .add_connection(connection, {
 257                    let executor = executor.clone();
 258                    move |duration| {
 259                        let timer = executor.sleep(duration);
 260                        async move {
 261                            timer.await;
 262                        }
 263                    }
 264                })
 265                .await;
 266
 267            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 268
 269            if let Some(send_connection_id) = send_connection_id.as_mut() {
 270                let _ = send_connection_id.send(connection_id).await;
 271            }
 272
 273            let contacts = this.app_state.db.get_contacts(user_id).await?;
 274
 275            {
 276                let mut store = this.store_mut().await;
 277                store.add_connection(connection_id, user_id);
 278                this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 279            }
 280            this.update_user_contacts(user_id).await?;
 281
 282            let handle_io = handle_io.fuse();
 283            futures::pin_mut!(handle_io);
 284            loop {
 285                let next_message = incoming_rx.next().fuse();
 286                futures::pin_mut!(next_message);
 287                futures::select_biased! {
 288                    result = handle_io => {
 289                        if let Err(error) = result {
 290                            tracing::error!(%error, "error handling I/O");
 291                        }
 292                        break;
 293                    }
 294                    message = next_message => {
 295                        if let Some(message) = message {
 296                            let type_name = message.payload_type_name();
 297                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 298                            async {
 299                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 300                                    let notifications = this.notifications.clone();
 301                                    let is_background = message.is_background();
 302                                    let handle_message = (handler)(this.clone(), message);
 303                                    let handle_message = async move {
 304                                        handle_message.await;
 305                                        if let Some(mut notifications) = notifications {
 306                                            let _ = notifications.send(()).await;
 307                                        }
 308                                    };
 309                                    if is_background {
 310                                        executor.spawn_detached(handle_message);
 311                                    } else {
 312                                        handle_message.await;
 313                                    }
 314                                } else {
 315                                    tracing::error!("no message handler");
 316                                }
 317                            }.instrument(span).await;
 318                        } else {
 319                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 320                            break;
 321                        }
 322                    }
 323                }
 324            }
 325
 326            if let Err(error) = this.sign_out(connection_id).await {
 327                tracing::error!(%error, "error signing out");
 328            }
 329
 330            Ok(())
 331        }.instrument(span)
 332    }
 333
 334    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 335        self.peer.disconnect(connection_id);
 336        let removed_connection = self.store_mut().await.remove_connection(connection_id)?;
 337
 338        for (project_id, project) in removed_connection.hosted_projects {
 339            if let Some(share) = project.share {
 340                broadcast(connection_id, share.guests.keys().copied(), |conn_id| {
 341                    self.peer
 342                        .send(conn_id, proto::UnshareProject { project_id })
 343                });
 344            }
 345        }
 346
 347        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 348            broadcast(connection_id, peer_ids, |conn_id| {
 349                self.peer.send(
 350                    conn_id,
 351                    proto::RemoveProjectCollaborator {
 352                        project_id,
 353                        peer_id: connection_id.0,
 354                    },
 355                )
 356            });
 357        }
 358
 359        self.update_user_contacts(removed_connection.user_id)
 360            .await?;
 361
 362        Ok(())
 363    }
 364
 365    async fn ping(
 366        self: Arc<Server>,
 367        _: TypedEnvelope<proto::Ping>,
 368        response: Response<proto::Ping>,
 369    ) -> Result<()> {
 370        response.send(proto::Ack {})?;
 371        Ok(())
 372    }
 373
 374    async fn register_project(
 375        self: Arc<Server>,
 376        request: TypedEnvelope<proto::RegisterProject>,
 377        response: Response<proto::RegisterProject>,
 378    ) -> Result<()> {
 379        let user_id;
 380        let project_id;
 381        {
 382            let mut state = self.store_mut().await;
 383            user_id = state.user_id_for_connection(request.sender_id)?;
 384            project_id = state.register_project(request.sender_id, user_id);
 385        };
 386        self.update_user_contacts(user_id).await?;
 387        response.send(proto::RegisterProjectResponse { project_id })?;
 388        Ok(())
 389    }
 390
 391    async fn unregister_project(
 392        self: Arc<Server>,
 393        request: TypedEnvelope<proto::UnregisterProject>,
 394    ) -> Result<()> {
 395        let user_id = {
 396            let mut state = self.store_mut().await;
 397            state.unregister_project(request.payload.project_id, request.sender_id)?;
 398            state.user_id_for_connection(request.sender_id)?
 399        };
 400
 401        self.update_user_contacts(user_id).await?;
 402        Ok(())
 403    }
 404
 405    async fn share_project(
 406        self: Arc<Server>,
 407        request: TypedEnvelope<proto::ShareProject>,
 408        response: Response<proto::ShareProject>,
 409    ) -> Result<()> {
 410        let user_id = {
 411            let mut state = self.store_mut().await;
 412            state.share_project(request.payload.project_id, request.sender_id)?;
 413            state.user_id_for_connection(request.sender_id)?
 414        };
 415        self.update_user_contacts(user_id).await?;
 416        response.send(proto::Ack {})?;
 417        Ok(())
 418    }
 419
 420    async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
 421        let contacts = self.app_state.db.get_contacts(user_id).await?;
 422        let store = self.store().await;
 423        let updated_contact = store.contact_for_user(user_id);
 424        for contact_user_id in contacts.current {
 425            for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
 426                self.peer
 427                    .send(
 428                        contact_conn_id,
 429                        proto::UpdateContacts {
 430                            contacts: vec![updated_contact.clone()],
 431                            remove_contacts: Default::default(),
 432                            incoming_requests: Default::default(),
 433                            remove_incoming_requests: Default::default(),
 434                            outgoing_requests: Default::default(),
 435                            remove_outgoing_requests: Default::default(),
 436                        },
 437                    )
 438                    .trace_err();
 439            }
 440        }
 441        Ok(())
 442    }
 443
 444    async fn unshare_project(
 445        self: Arc<Server>,
 446        request: TypedEnvelope<proto::UnshareProject>,
 447    ) -> Result<()> {
 448        let project_id = request.payload.project_id;
 449        let project;
 450        {
 451            let mut state = self.store_mut().await;
 452            project = state.unshare_project(project_id, request.sender_id)?;
 453            broadcast(request.sender_id, project.connection_ids, |conn_id| {
 454                self.peer
 455                    .send(conn_id, proto::UnshareProject { project_id })
 456            });
 457        }
 458        self.update_user_contacts(project.host_user_id).await?;
 459        Ok(())
 460    }
 461
 462    async fn join_project(
 463        self: Arc<Server>,
 464        request: TypedEnvelope<proto::JoinProject>,
 465        response: Response<proto::JoinProject>,
 466    ) -> Result<()> {
 467        let project_id = request.payload.project_id;
 468        let host_user_id;
 469        let guest_user_id;
 470        {
 471            let state = self.store().await;
 472            host_user_id = state.project(project_id)?.host_user_id;
 473            guest_user_id = state.user_id_for_connection(request.sender_id)?;
 474        };
 475
 476        let guest_contacts = self.app_state.db.get_contacts(guest_user_id).await?;
 477        if !guest_contacts.current.contains(&host_user_id) {
 478            return Err(anyhow!("no such project"))?;
 479        }
 480
 481        {
 482            let state = &mut *self.store_mut().await;
 483            let joined = state.join_project(request.sender_id, guest_user_id, project_id)?;
 484            let share = joined.project.share()?;
 485            let peer_count = share.guests.len();
 486            let mut collaborators = Vec::with_capacity(peer_count);
 487            collaborators.push(proto::Collaborator {
 488                peer_id: joined.project.host_connection_id.0,
 489                replica_id: 0,
 490                user_id: joined.project.host_user_id.to_proto(),
 491            });
 492            let worktrees = share
 493                .worktrees
 494                .iter()
 495                .filter_map(|(id, shared_worktree)| {
 496                    let worktree = joined.project.worktrees.get(&id)?;
 497                    Some(proto::Worktree {
 498                        id: *id,
 499                        root_name: worktree.root_name.clone(),
 500                        entries: shared_worktree.entries.values().cloned().collect(),
 501                        diagnostic_summaries: shared_worktree
 502                            .diagnostic_summaries
 503                            .values()
 504                            .cloned()
 505                            .collect(),
 506                        visible: worktree.visible,
 507                        scan_id: shared_worktree.scan_id,
 508                    })
 509                })
 510                .collect();
 511            for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 512                if *peer_conn_id != request.sender_id {
 513                    collaborators.push(proto::Collaborator {
 514                        peer_id: peer_conn_id.0,
 515                        replica_id: *peer_replica_id as u32,
 516                        user_id: peer_user_id.to_proto(),
 517                    });
 518                }
 519            }
 520            broadcast(
 521                request.sender_id,
 522                joined.project.connection_ids(),
 523                |conn_id| {
 524                    self.peer.send(
 525                        conn_id,
 526                        proto::AddProjectCollaborator {
 527                            project_id,
 528                            collaborator: Some(proto::Collaborator {
 529                                peer_id: request.sender_id.0,
 530                                replica_id: joined.replica_id as u32,
 531                                user_id: guest_user_id.to_proto(),
 532                            }),
 533                        },
 534                    )
 535                },
 536            );
 537            response.send(proto::JoinProjectResponse {
 538                worktrees,
 539                replica_id: joined.replica_id as u32,
 540                collaborators,
 541                language_servers: joined.project.language_servers.clone(),
 542            })?;
 543        }
 544        self.update_user_contacts(host_user_id).await?;
 545        Ok(())
 546    }
 547
 548    async fn leave_project(
 549        self: Arc<Server>,
 550        request: TypedEnvelope<proto::LeaveProject>,
 551    ) -> Result<()> {
 552        let sender_id = request.sender_id;
 553        let project_id = request.payload.project_id;
 554        let project;
 555        {
 556            let mut state = self.store_mut().await;
 557            project = state.leave_project(sender_id, project_id)?;
 558            broadcast(sender_id, project.connection_ids, |conn_id| {
 559                self.peer.send(
 560                    conn_id,
 561                    proto::RemoveProjectCollaborator {
 562                        project_id,
 563                        peer_id: sender_id.0,
 564                    },
 565                )
 566            });
 567        }
 568        self.update_user_contacts(project.host_user_id).await?;
 569        Ok(())
 570    }
 571
 572    async fn register_worktree(
 573        self: Arc<Server>,
 574        request: TypedEnvelope<proto::RegisterWorktree>,
 575        response: Response<proto::RegisterWorktree>,
 576    ) -> Result<()> {
 577        let host_user_id;
 578        {
 579            let mut state = self.store_mut().await;
 580            host_user_id = state.user_id_for_connection(request.sender_id)?;
 581
 582            let guest_connection_ids = state
 583                .read_project(request.payload.project_id, request.sender_id)?
 584                .guest_connection_ids();
 585            state.register_worktree(
 586                request.payload.project_id,
 587                request.payload.worktree_id,
 588                request.sender_id,
 589                Worktree {
 590                    root_name: request.payload.root_name.clone(),
 591                    visible: request.payload.visible,
 592                },
 593            )?;
 594
 595            broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 596                self.peer
 597                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 598            });
 599        }
 600        self.update_user_contacts(host_user_id).await?;
 601        response.send(proto::Ack {})?;
 602        Ok(())
 603    }
 604
 605    async fn unregister_worktree(
 606        self: Arc<Server>,
 607        request: TypedEnvelope<proto::UnregisterWorktree>,
 608    ) -> Result<()> {
 609        let host_user_id;
 610        let project_id = request.payload.project_id;
 611        let worktree_id = request.payload.worktree_id;
 612        {
 613            let mut state = self.store_mut().await;
 614            let (_, guest_connection_ids) =
 615                state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 616            host_user_id = state.user_id_for_connection(request.sender_id)?;
 617            broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 618                self.peer.send(
 619                    conn_id,
 620                    proto::UnregisterWorktree {
 621                        project_id,
 622                        worktree_id,
 623                    },
 624                )
 625            });
 626        }
 627        self.update_user_contacts(host_user_id).await?;
 628        Ok(())
 629    }
 630
 631    async fn update_worktree(
 632        self: Arc<Server>,
 633        request: TypedEnvelope<proto::UpdateWorktree>,
 634        response: Response<proto::UpdateWorktree>,
 635    ) -> Result<()> {
 636        let connection_ids = self.store_mut().await.update_worktree(
 637            request.sender_id,
 638            request.payload.project_id,
 639            request.payload.worktree_id,
 640            &request.payload.removed_entries,
 641            &request.payload.updated_entries,
 642            request.payload.scan_id,
 643        )?;
 644
 645        broadcast(request.sender_id, connection_ids, |connection_id| {
 646            self.peer
 647                .forward_send(request.sender_id, connection_id, request.payload.clone())
 648        });
 649        response.send(proto::Ack {})?;
 650        Ok(())
 651    }
 652
 653    async fn update_diagnostic_summary(
 654        self: Arc<Server>,
 655        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 656    ) -> Result<()> {
 657        let summary = request
 658            .payload
 659            .summary
 660            .clone()
 661            .ok_or_else(|| anyhow!("invalid summary"))?;
 662        let receiver_ids = self.store_mut().await.update_diagnostic_summary(
 663            request.payload.project_id,
 664            request.payload.worktree_id,
 665            request.sender_id,
 666            summary,
 667        )?;
 668
 669        broadcast(request.sender_id, receiver_ids, |connection_id| {
 670            self.peer
 671                .forward_send(request.sender_id, connection_id, request.payload.clone())
 672        });
 673        Ok(())
 674    }
 675
 676    async fn start_language_server(
 677        self: Arc<Server>,
 678        request: TypedEnvelope<proto::StartLanguageServer>,
 679    ) -> Result<()> {
 680        let receiver_ids = self.store_mut().await.start_language_server(
 681            request.payload.project_id,
 682            request.sender_id,
 683            request
 684                .payload
 685                .server
 686                .clone()
 687                .ok_or_else(|| anyhow!("invalid language server"))?,
 688        )?;
 689        broadcast(request.sender_id, receiver_ids, |connection_id| {
 690            self.peer
 691                .forward_send(request.sender_id, connection_id, request.payload.clone())
 692        });
 693        Ok(())
 694    }
 695
 696    async fn update_language_server(
 697        self: Arc<Server>,
 698        request: TypedEnvelope<proto::UpdateLanguageServer>,
 699    ) -> Result<()> {
 700        let receiver_ids = self
 701            .store()
 702            .await
 703            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 704        broadcast(request.sender_id, receiver_ids, |connection_id| {
 705            self.peer
 706                .forward_send(request.sender_id, connection_id, request.payload.clone())
 707        });
 708        Ok(())
 709    }
 710
 711    async fn forward_project_request<T>(
 712        self: Arc<Server>,
 713        request: TypedEnvelope<T>,
 714        response: Response<T>,
 715    ) -> Result<()>
 716    where
 717        T: EntityMessage + RequestMessage,
 718    {
 719        let host_connection_id = self
 720            .store()
 721            .await
 722            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 723            .host_connection_id;
 724
 725        response.send(
 726            self.peer
 727                .forward_request(request.sender_id, host_connection_id, request.payload)
 728                .await?,
 729        )?;
 730        Ok(())
 731    }
 732
 733    async fn save_buffer(
 734        self: Arc<Server>,
 735        request: TypedEnvelope<proto::SaveBuffer>,
 736        response: Response<proto::SaveBuffer>,
 737    ) -> Result<()> {
 738        let host = self
 739            .store()
 740            .await
 741            .read_project(request.payload.project_id, request.sender_id)?
 742            .host_connection_id;
 743        let response_payload = self
 744            .peer
 745            .forward_request(request.sender_id, host, request.payload.clone())
 746            .await?;
 747
 748        let mut guests = self
 749            .store()
 750            .await
 751            .read_project(request.payload.project_id, request.sender_id)?
 752            .connection_ids();
 753        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 754        broadcast(host, guests, |conn_id| {
 755            self.peer
 756                .forward_send(host, conn_id, response_payload.clone())
 757        });
 758        response.send(response_payload)?;
 759        Ok(())
 760    }
 761
 762    async fn update_buffer(
 763        self: Arc<Server>,
 764        request: TypedEnvelope<proto::UpdateBuffer>,
 765        response: Response<proto::UpdateBuffer>,
 766    ) -> Result<()> {
 767        let receiver_ids = self
 768            .store()
 769            .await
 770            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 771        broadcast(request.sender_id, receiver_ids, |connection_id| {
 772            self.peer
 773                .forward_send(request.sender_id, connection_id, request.payload.clone())
 774        });
 775        response.send(proto::Ack {})?;
 776        Ok(())
 777    }
 778
 779    async fn update_buffer_file(
 780        self: Arc<Server>,
 781        request: TypedEnvelope<proto::UpdateBufferFile>,
 782    ) -> Result<()> {
 783        let receiver_ids = self
 784            .store()
 785            .await
 786            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 787        broadcast(request.sender_id, receiver_ids, |connection_id| {
 788            self.peer
 789                .forward_send(request.sender_id, connection_id, request.payload.clone())
 790        });
 791        Ok(())
 792    }
 793
 794    async fn buffer_reloaded(
 795        self: Arc<Server>,
 796        request: TypedEnvelope<proto::BufferReloaded>,
 797    ) -> Result<()> {
 798        let receiver_ids = self
 799            .store()
 800            .await
 801            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 802        broadcast(request.sender_id, receiver_ids, |connection_id| {
 803            self.peer
 804                .forward_send(request.sender_id, connection_id, request.payload.clone())
 805        });
 806        Ok(())
 807    }
 808
 809    async fn buffer_saved(
 810        self: Arc<Server>,
 811        request: TypedEnvelope<proto::BufferSaved>,
 812    ) -> Result<()> {
 813        let receiver_ids = self
 814            .store()
 815            .await
 816            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 817        broadcast(request.sender_id, receiver_ids, |connection_id| {
 818            self.peer
 819                .forward_send(request.sender_id, connection_id, request.payload.clone())
 820        });
 821        Ok(())
 822    }
 823
 824    async fn follow(
 825        self: Arc<Self>,
 826        request: TypedEnvelope<proto::Follow>,
 827        response: Response<proto::Follow>,
 828    ) -> Result<()> {
 829        let leader_id = ConnectionId(request.payload.leader_id);
 830        let follower_id = request.sender_id;
 831        if !self
 832            .store()
 833            .await
 834            .project_connection_ids(request.payload.project_id, follower_id)?
 835            .contains(&leader_id)
 836        {
 837            Err(anyhow!("no such peer"))?;
 838        }
 839        let mut response_payload = self
 840            .peer
 841            .forward_request(request.sender_id, leader_id, request.payload)
 842            .await?;
 843        response_payload
 844            .views
 845            .retain(|view| view.leader_id != Some(follower_id.0));
 846        response.send(response_payload)?;
 847        Ok(())
 848    }
 849
 850    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 851        let leader_id = ConnectionId(request.payload.leader_id);
 852        if !self
 853            .store()
 854            .await
 855            .project_connection_ids(request.payload.project_id, request.sender_id)?
 856            .contains(&leader_id)
 857        {
 858            Err(anyhow!("no such peer"))?;
 859        }
 860        self.peer
 861            .forward_send(request.sender_id, leader_id, request.payload)?;
 862        Ok(())
 863    }
 864
 865    async fn update_followers(
 866        self: Arc<Self>,
 867        request: TypedEnvelope<proto::UpdateFollowers>,
 868    ) -> Result<()> {
 869        let connection_ids = self
 870            .store()
 871            .await
 872            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 873        let leader_id = request
 874            .payload
 875            .variant
 876            .as_ref()
 877            .and_then(|variant| match variant {
 878                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 879                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 880                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 881            });
 882        for follower_id in &request.payload.follower_ids {
 883            let follower_id = ConnectionId(*follower_id);
 884            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 885                self.peer
 886                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 887            }
 888        }
 889        Ok(())
 890    }
 891
 892    async fn get_channels(
 893        self: Arc<Server>,
 894        request: TypedEnvelope<proto::GetChannels>,
 895        response: Response<proto::GetChannels>,
 896    ) -> Result<()> {
 897        let user_id = self
 898            .store()
 899            .await
 900            .user_id_for_connection(request.sender_id)?;
 901        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 902        response.send(proto::GetChannelsResponse {
 903            channels: channels
 904                .into_iter()
 905                .map(|chan| proto::Channel {
 906                    id: chan.id.to_proto(),
 907                    name: chan.name,
 908                })
 909                .collect(),
 910        })?;
 911        Ok(())
 912    }
 913
 914    async fn get_users(
 915        self: Arc<Server>,
 916        request: TypedEnvelope<proto::GetUsers>,
 917        response: Response<proto::GetUsers>,
 918    ) -> Result<()> {
 919        let user_ids = request
 920            .payload
 921            .user_ids
 922            .into_iter()
 923            .map(UserId::from_proto)
 924            .collect();
 925        let users = self
 926            .app_state
 927            .db
 928            .get_users_by_ids(user_ids)
 929            .await?
 930            .into_iter()
 931            .map(|user| proto::User {
 932                id: user.id.to_proto(),
 933                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 934                github_login: user.github_login,
 935            })
 936            .collect();
 937        response.send(proto::UsersResponse { users })?;
 938        Ok(())
 939    }
 940
 941    async fn fuzzy_search_users(
 942        self: Arc<Server>,
 943        request: TypedEnvelope<proto::FuzzySearchUsers>,
 944        response: Response<proto::FuzzySearchUsers>,
 945    ) -> Result<()> {
 946        let user_id = self
 947            .store()
 948            .await
 949            .user_id_for_connection(request.sender_id)?;
 950        let query = request.payload.query;
 951        let db = &self.app_state.db;
 952        let users = match query.len() {
 953            0 => vec![],
 954            1 | 2 => db
 955                .get_user_by_github_login(&query)
 956                .await?
 957                .into_iter()
 958                .collect(),
 959            _ => db.fuzzy_search_users(&query, 10).await?,
 960        };
 961        let users = users
 962            .into_iter()
 963            .filter(|user| user.id != user_id)
 964            .map(|user| proto::User {
 965                id: user.id.to_proto(),
 966                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 967                github_login: user.github_login,
 968            })
 969            .collect();
 970        response.send(proto::UsersResponse { users })?;
 971        Ok(())
 972    }
 973
 974    async fn request_contact(
 975        self: Arc<Server>,
 976        request: TypedEnvelope<proto::RequestContact>,
 977        response: Response<proto::RequestContact>,
 978    ) -> Result<()> {
 979        let requester_id = self
 980            .store()
 981            .await
 982            .user_id_for_connection(request.sender_id)?;
 983        let responder_id = UserId::from_proto(request.payload.responder_id);
 984        if requester_id == responder_id {
 985            return Err(anyhow!("cannot add yourself as a contact"))?;
 986        }
 987
 988        self.app_state
 989            .db
 990            .send_contact_request(requester_id, responder_id)
 991            .await?;
 992
 993        // Update outgoing contact requests of requester
 994        let mut update = proto::UpdateContacts::default();
 995        update.outgoing_requests.push(responder_id.to_proto());
 996        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
 997            self.peer.send(connection_id, update.clone())?;
 998        }
 999
1000        // Update incoming contact requests of responder
1001        let mut update = proto::UpdateContacts::default();
1002        update
1003            .incoming_requests
1004            .push(proto::IncomingContactRequest {
1005                requester_id: requester_id.to_proto(),
1006                should_notify: true,
1007            });
1008        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1009            self.peer.send(connection_id, update.clone())?;
1010        }
1011
1012        response.send(proto::Ack {})?;
1013        Ok(())
1014    }
1015
1016    async fn respond_to_contact_request(
1017        self: Arc<Server>,
1018        request: TypedEnvelope<proto::RespondToContactRequest>,
1019        response: Response<proto::RespondToContactRequest>,
1020    ) -> Result<()> {
1021        let responder_id = self
1022            .store()
1023            .await
1024            .user_id_for_connection(request.sender_id)?;
1025        let requester_id = UserId::from_proto(request.payload.requester_id);
1026        let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1027        self.app_state
1028            .db
1029            .respond_to_contact_request(responder_id, requester_id, accept)
1030            .await?;
1031
1032        let store = self.store().await;
1033        // Update responder with new contact
1034        let mut update = proto::UpdateContacts::default();
1035        if accept {
1036            update.contacts.push(store.contact_for_user(requester_id));
1037        }
1038        update
1039            .remove_incoming_requests
1040            .push(requester_id.to_proto());
1041        for connection_id in store.connection_ids_for_user(responder_id) {
1042            self.peer.send(connection_id, update.clone())?;
1043        }
1044
1045        // Update requester with new contact
1046        let mut update = proto::UpdateContacts::default();
1047        if accept {
1048            update.contacts.push(store.contact_for_user(responder_id));
1049        }
1050        update
1051            .remove_outgoing_requests
1052            .push(responder_id.to_proto());
1053        for connection_id in store.connection_ids_for_user(requester_id) {
1054            self.peer.send(connection_id, update.clone())?;
1055        }
1056
1057        response.send(proto::Ack {})?;
1058        Ok(())
1059    }
1060
1061    async fn remove_contact(
1062        self: Arc<Server>,
1063        request: TypedEnvelope<proto::RemoveContact>,
1064        response: Response<proto::RemoveContact>,
1065    ) -> Result<()> {
1066        let requester_id = self
1067            .store()
1068            .await
1069            .user_id_for_connection(request.sender_id)?;
1070        let responder_id = UserId::from_proto(request.payload.user_id);
1071        self.app_state
1072            .db
1073            .remove_contact(requester_id, responder_id)
1074            .await?;
1075
1076        // Update outgoing contact requests of requester
1077        let mut update = proto::UpdateContacts::default();
1078        update
1079            .remove_outgoing_requests
1080            .push(responder_id.to_proto());
1081        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1082            self.peer.send(connection_id, update.clone())?;
1083        }
1084
1085        // Update incoming contact requests of responder
1086        let mut update = proto::UpdateContacts::default();
1087        update
1088            .remove_incoming_requests
1089            .push(requester_id.to_proto());
1090        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1091            self.peer.send(connection_id, update.clone())?;
1092        }
1093
1094        response.send(proto::Ack {})?;
1095        Ok(())
1096    }
1097
1098    // #[instrument(skip(self, state, user_ids))]
1099    // fn update_contacts_for_users<'a>(
1100    //     self: &Arc<Self>,
1101    //     state: &Store,
1102    //     user_ids: impl IntoIterator<Item = &'a UserId>,
1103    // ) {
1104    //     for user_id in user_ids {
1105    //         let contacts = state.contacts_for_user(*user_id);
1106    //         for connection_id in state.connection_ids_for_user(*user_id) {
1107    //             self.peer
1108    //                 .send(
1109    //                     connection_id,
1110    //                     proto::UpdateContacts {
1111    //                         contacts: contacts.clone(),
1112    //                         pending_requests_from_user_ids: Default::default(),
1113    //                         pending_requests_to_user_ids: Default::default(),
1114    //                     },
1115    //                 )
1116    //                 .trace_err();
1117    //         }
1118    //     }
1119    // }
1120
1121    async fn join_channel(
1122        self: Arc<Self>,
1123        request: TypedEnvelope<proto::JoinChannel>,
1124        response: Response<proto::JoinChannel>,
1125    ) -> Result<()> {
1126        let user_id = self
1127            .store()
1128            .await
1129            .user_id_for_connection(request.sender_id)?;
1130        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1131        if !self
1132            .app_state
1133            .db
1134            .can_user_access_channel(user_id, channel_id)
1135            .await?
1136        {
1137            Err(anyhow!("access denied"))?;
1138        }
1139
1140        self.store_mut()
1141            .await
1142            .join_channel(request.sender_id, channel_id);
1143        let messages = self
1144            .app_state
1145            .db
1146            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1147            .await?
1148            .into_iter()
1149            .map(|msg| proto::ChannelMessage {
1150                id: msg.id.to_proto(),
1151                body: msg.body,
1152                timestamp: msg.sent_at.unix_timestamp() as u64,
1153                sender_id: msg.sender_id.to_proto(),
1154                nonce: Some(msg.nonce.as_u128().into()),
1155            })
1156            .collect::<Vec<_>>();
1157        response.send(proto::JoinChannelResponse {
1158            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1159            messages,
1160        })?;
1161        Ok(())
1162    }
1163
1164    async fn leave_channel(
1165        self: Arc<Self>,
1166        request: TypedEnvelope<proto::LeaveChannel>,
1167    ) -> Result<()> {
1168        let user_id = self
1169            .store()
1170            .await
1171            .user_id_for_connection(request.sender_id)?;
1172        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1173        if !self
1174            .app_state
1175            .db
1176            .can_user_access_channel(user_id, channel_id)
1177            .await?
1178        {
1179            Err(anyhow!("access denied"))?;
1180        }
1181
1182        self.store_mut()
1183            .await
1184            .leave_channel(request.sender_id, channel_id);
1185
1186        Ok(())
1187    }
1188
1189    async fn send_channel_message(
1190        self: Arc<Self>,
1191        request: TypedEnvelope<proto::SendChannelMessage>,
1192        response: Response<proto::SendChannelMessage>,
1193    ) -> Result<()> {
1194        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1195        let user_id;
1196        let connection_ids;
1197        {
1198            let state = self.store().await;
1199            user_id = state.user_id_for_connection(request.sender_id)?;
1200            connection_ids = state.channel_connection_ids(channel_id)?;
1201        }
1202
1203        // Validate the message body.
1204        let body = request.payload.body.trim().to_string();
1205        if body.len() > MAX_MESSAGE_LEN {
1206            return Err(anyhow!("message is too long"))?;
1207        }
1208        if body.is_empty() {
1209            return Err(anyhow!("message can't be blank"))?;
1210        }
1211
1212        let timestamp = OffsetDateTime::now_utc();
1213        let nonce = request
1214            .payload
1215            .nonce
1216            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1217
1218        let message_id = self
1219            .app_state
1220            .db
1221            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1222            .await?
1223            .to_proto();
1224        let message = proto::ChannelMessage {
1225            sender_id: user_id.to_proto(),
1226            id: message_id,
1227            body,
1228            timestamp: timestamp.unix_timestamp() as u64,
1229            nonce: Some(nonce),
1230        };
1231        broadcast(request.sender_id, connection_ids, |conn_id| {
1232            self.peer.send(
1233                conn_id,
1234                proto::ChannelMessageSent {
1235                    channel_id: channel_id.to_proto(),
1236                    message: Some(message.clone()),
1237                },
1238            )
1239        });
1240        response.send(proto::SendChannelMessageResponse {
1241            message: Some(message),
1242        })?;
1243        Ok(())
1244    }
1245
1246    async fn get_channel_messages(
1247        self: Arc<Self>,
1248        request: TypedEnvelope<proto::GetChannelMessages>,
1249        response: Response<proto::GetChannelMessages>,
1250    ) -> Result<()> {
1251        let user_id = self
1252            .store()
1253            .await
1254            .user_id_for_connection(request.sender_id)?;
1255        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1256        if !self
1257            .app_state
1258            .db
1259            .can_user_access_channel(user_id, channel_id)
1260            .await?
1261        {
1262            Err(anyhow!("access denied"))?;
1263        }
1264
1265        let messages = self
1266            .app_state
1267            .db
1268            .get_channel_messages(
1269                channel_id,
1270                MESSAGE_COUNT_PER_PAGE,
1271                Some(MessageId::from_proto(request.payload.before_message_id)),
1272            )
1273            .await?
1274            .into_iter()
1275            .map(|msg| proto::ChannelMessage {
1276                id: msg.id.to_proto(),
1277                body: msg.body,
1278                timestamp: msg.sent_at.unix_timestamp() as u64,
1279                sender_id: msg.sender_id.to_proto(),
1280                nonce: Some(msg.nonce.as_u128().into()),
1281            })
1282            .collect::<Vec<_>>();
1283        response.send(proto::GetChannelMessagesResponse {
1284            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1285            messages,
1286        })?;
1287        Ok(())
1288    }
1289
1290    async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1291        #[cfg(test)]
1292        tokio::task::yield_now().await;
1293        let guard = self.store.read().await;
1294        #[cfg(test)]
1295        tokio::task::yield_now().await;
1296        StoreReadGuard {
1297            guard,
1298            _not_send: PhantomData,
1299        }
1300    }
1301
1302    async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1303        #[cfg(test)]
1304        tokio::task::yield_now().await;
1305        let guard = self.store.write().await;
1306        #[cfg(test)]
1307        tokio::task::yield_now().await;
1308        StoreWriteGuard {
1309            guard,
1310            _not_send: PhantomData,
1311        }
1312    }
1313}
1314
1315impl<'a> Deref for StoreReadGuard<'a> {
1316    type Target = Store;
1317
1318    fn deref(&self) -> &Self::Target {
1319        &*self.guard
1320    }
1321}
1322
1323impl<'a> Deref for StoreWriteGuard<'a> {
1324    type Target = Store;
1325
1326    fn deref(&self) -> &Self::Target {
1327        &*self.guard
1328    }
1329}
1330
1331impl<'a> DerefMut for StoreWriteGuard<'a> {
1332    fn deref_mut(&mut self) -> &mut Self::Target {
1333        &mut *self.guard
1334    }
1335}
1336
1337impl<'a> Drop for StoreWriteGuard<'a> {
1338    fn drop(&mut self) {
1339        #[cfg(test)]
1340        self.check_invariants();
1341
1342        let metrics = self.metrics();
1343        tracing::info!(
1344            connections = metrics.connections,
1345            registered_projects = metrics.registered_projects,
1346            shared_projects = metrics.shared_projects,
1347            "metrics"
1348        );
1349    }
1350}
1351
1352impl Executor for RealExecutor {
1353    type Sleep = Sleep;
1354
1355    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1356        tokio::task::spawn(future);
1357    }
1358
1359    fn sleep(&self, duration: Duration) -> Self::Sleep {
1360        tokio::time::sleep(duration)
1361    }
1362}
1363
1364fn broadcast<F>(
1365    sender_id: ConnectionId,
1366    receiver_ids: impl IntoIterator<Item = ConnectionId>,
1367    mut f: F,
1368) where
1369    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1370{
1371    for receiver_id in receiver_ids {
1372        if receiver_id != sender_id {
1373            f(receiver_id).trace_err();
1374        }
1375    }
1376}
1377
1378lazy_static! {
1379    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1380}
1381
1382pub struct ProtocolVersion(u32);
1383
1384impl Header for ProtocolVersion {
1385    fn name() -> &'static HeaderName {
1386        &ZED_PROTOCOL_VERSION
1387    }
1388
1389    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1390    where
1391        Self: Sized,
1392        I: Iterator<Item = &'i axum::http::HeaderValue>,
1393    {
1394        let version = values
1395            .next()
1396            .ok_or_else(|| axum::headers::Error::invalid())?
1397            .to_str()
1398            .map_err(|_| axum::headers::Error::invalid())?
1399            .parse()
1400            .map_err(|_| axum::headers::Error::invalid())?;
1401        Ok(Self(version))
1402    }
1403
1404    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1405        values.extend([self.0.to_string().parse().unwrap()]);
1406    }
1407}
1408
1409pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1410    let server = Server::new(app_state.clone(), None);
1411    Router::new()
1412        .route("/rpc", get(handle_websocket_request))
1413        .layer(
1414            ServiceBuilder::new()
1415                .layer(Extension(app_state))
1416                .layer(middleware::from_fn(auth::validate_header))
1417                .layer(Extension(server)),
1418        )
1419}
1420
1421pub async fn handle_websocket_request(
1422    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1423    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1424    Extension(server): Extension<Arc<Server>>,
1425    Extension(user_id): Extension<UserId>,
1426    ws: WebSocketUpgrade,
1427) -> axum::response::Response {
1428    if protocol_version != rpc::PROTOCOL_VERSION {
1429        return (
1430            StatusCode::UPGRADE_REQUIRED,
1431            "client must be upgraded".to_string(),
1432        )
1433            .into_response();
1434    }
1435    let socket_address = socket_address.to_string();
1436    ws.on_upgrade(move |socket| {
1437        use util::ResultExt;
1438        let socket = socket
1439            .map_ok(to_tungstenite_message)
1440            .err_into()
1441            .with(|message| async move { Ok(to_axum_message(message)) });
1442        let connection = Connection::new(Box::pin(socket));
1443        async move {
1444            server
1445                .handle_connection(connection, socket_address, user_id, None, RealExecutor)
1446                .await
1447                .log_err();
1448        }
1449    })
1450}
1451
1452fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1453    match message {
1454        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1455        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1456        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1457        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1458        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1459            code: frame.code.into(),
1460            reason: frame.reason,
1461        })),
1462    }
1463}
1464
1465fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1466    match message {
1467        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1468        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1469        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1470        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1471        AxumMessage::Close(frame) => {
1472            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1473                code: frame.code.into(),
1474                reason: frame.reason,
1475            }))
1476        }
1477    }
1478}
1479
1480pub trait ResultExt {
1481    type Ok;
1482
1483    fn trace_err(self) -> Option<Self::Ok>;
1484}
1485
1486impl<T, E> ResultExt for Result<T, E>
1487where
1488    E: std::fmt::Debug,
1489{
1490    type Ok = T;
1491
1492    fn trace_err(self) -> Option<T> {
1493        match self {
1494            Ok(value) => Some(value),
1495            Err(error) => {
1496                tracing::error!("{:?}", error);
1497                None
1498            }
1499        }
1500    }
1501}
1502
1503#[cfg(test)]
1504mod tests {
1505    use super::*;
1506    use crate::{
1507        db::{tests::TestDb, UserId},
1508        AppState,
1509    };
1510    use ::rpc::Peer;
1511    use client::{
1512        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1513        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1514    };
1515    use collections::{BTreeMap, HashSet};
1516    use editor::{
1517        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1518        ToOffset, ToggleCodeActions, Undo,
1519    };
1520    use gpui::{
1521        executor::{self, Deterministic},
1522        geometry::vector::vec2f,
1523        ModelHandle, TestAppContext, ViewHandle,
1524    };
1525    use language::{
1526        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1527        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1528    };
1529    use lsp::{self, FakeLanguageServer};
1530    use parking_lot::Mutex;
1531    use project::{
1532        fs::{FakeFs, Fs as _},
1533        search::SearchQuery,
1534        worktree::WorktreeHandle,
1535        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1536    };
1537    use rand::prelude::*;
1538    use rpc::PeerId;
1539    use serde_json::json;
1540    use settings::Settings;
1541    use sqlx::types::time::OffsetDateTime;
1542    use std::{
1543        env,
1544        ops::Deref,
1545        path::{Path, PathBuf},
1546        rc::Rc,
1547        sync::{
1548            atomic::{AtomicBool, Ordering::SeqCst},
1549            Arc,
1550        },
1551        time::Duration,
1552    };
1553    use theme::ThemeRegistry;
1554    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1555
1556    #[cfg(test)]
1557    #[ctor::ctor]
1558    fn init_logger() {
1559        if std::env::var("RUST_LOG").is_ok() {
1560            env_logger::init();
1561        }
1562    }
1563
1564    #[gpui::test(iterations = 10)]
1565    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1566        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1567        let lang_registry = Arc::new(LanguageRegistry::test());
1568        let fs = FakeFs::new(cx_a.background());
1569        cx_a.foreground().forbid_parking();
1570
1571        // Connect to a server as 2 clients.
1572        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1573        let client_a = server.create_client(cx_a, "user_a").await;
1574        let client_b = server.create_client(cx_b, "user_b").await;
1575        server
1576            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1577            .await;
1578
1579        // Share a project as client A
1580        fs.insert_tree(
1581            "/a",
1582            json!({
1583                "a.txt": "a-contents",
1584                "b.txt": "b-contents",
1585            }),
1586        )
1587        .await;
1588        let project_a = cx_a.update(|cx| {
1589            Project::local(
1590                client_a.clone(),
1591                client_a.user_store.clone(),
1592                lang_registry.clone(),
1593                fs.clone(),
1594                cx,
1595            )
1596        });
1597        let (worktree_a, _) = project_a
1598            .update(cx_a, |p, cx| {
1599                p.find_or_create_local_worktree("/a", true, cx)
1600            })
1601            .await
1602            .unwrap();
1603        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1604        worktree_a
1605            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1606            .await;
1607        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1608        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1609
1610        // Join that project as client B
1611        let project_b = Project::remote(
1612            project_id,
1613            client_b.clone(),
1614            client_b.user_store.clone(),
1615            lang_registry.clone(),
1616            fs.clone(),
1617            &mut cx_b.to_async(),
1618        )
1619        .await
1620        .unwrap();
1621
1622        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1623            assert_eq!(
1624                project
1625                    .collaborators()
1626                    .get(&client_a.peer_id)
1627                    .unwrap()
1628                    .user
1629                    .github_login,
1630                "user_a"
1631            );
1632            project.replica_id()
1633        });
1634        project_a
1635            .condition(&cx_a, |tree, _| {
1636                tree.collaborators()
1637                    .get(&client_b.peer_id)
1638                    .map_or(false, |collaborator| {
1639                        collaborator.replica_id == replica_id_b
1640                            && collaborator.user.github_login == "user_b"
1641                    })
1642            })
1643            .await;
1644
1645        // Open the same file as client B and client A.
1646        let buffer_b = project_b
1647            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1648            .await
1649            .unwrap();
1650        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1651        project_a.read_with(cx_a, |project, cx| {
1652            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1653        });
1654        let buffer_a = project_a
1655            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1656            .await
1657            .unwrap();
1658
1659        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1660
1661        // TODO
1662        // // Create a selection set as client B and see that selection set as client A.
1663        // buffer_a
1664        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1665        //     .await;
1666
1667        // Edit the buffer as client B and see that edit as client A.
1668        editor_b.update(cx_b, |editor, cx| {
1669            editor.handle_input(&Input("ok, ".into()), cx)
1670        });
1671        buffer_a
1672            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1673            .await;
1674
1675        // TODO
1676        // // Remove the selection set as client B, see those selections disappear as client A.
1677        cx_b.update(move |_| drop(editor_b));
1678        // buffer_a
1679        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1680        //     .await;
1681
1682        // Dropping the client B's project removes client B from client A's collaborators.
1683        cx_b.update(move |_| drop(project_b));
1684        project_a
1685            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1686            .await;
1687    }
1688
1689    #[gpui::test(iterations = 10)]
1690    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1691        let lang_registry = Arc::new(LanguageRegistry::test());
1692        let fs = FakeFs::new(cx_a.background());
1693        cx_a.foreground().forbid_parking();
1694
1695        // Connect to a server as 2 clients.
1696        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1697        let client_a = server.create_client(cx_a, "user_a").await;
1698        let client_b = server.create_client(cx_b, "user_b").await;
1699        server
1700            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1701            .await;
1702
1703        // Share a project as client A
1704        fs.insert_tree(
1705            "/a",
1706            json!({
1707                "a.txt": "a-contents",
1708                "b.txt": "b-contents",
1709            }),
1710        )
1711        .await;
1712        let project_a = cx_a.update(|cx| {
1713            Project::local(
1714                client_a.clone(),
1715                client_a.user_store.clone(),
1716                lang_registry.clone(),
1717                fs.clone(),
1718                cx,
1719            )
1720        });
1721        let (worktree_a, _) = project_a
1722            .update(cx_a, |p, cx| {
1723                p.find_or_create_local_worktree("/a", true, cx)
1724            })
1725            .await
1726            .unwrap();
1727        worktree_a
1728            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1729            .await;
1730        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1731        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1732        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1733        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1734
1735        // Join that project as client B
1736        let project_b = Project::remote(
1737            project_id,
1738            client_b.clone(),
1739            client_b.user_store.clone(),
1740            lang_registry.clone(),
1741            fs.clone(),
1742            &mut cx_b.to_async(),
1743        )
1744        .await
1745        .unwrap();
1746        project_b
1747            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1748            .await
1749            .unwrap();
1750
1751        // Unshare the project as client A
1752        project_a.update(cx_a, |project, cx| project.unshare(cx));
1753        project_b
1754            .condition(cx_b, |project, _| project.is_read_only())
1755            .await;
1756        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1757        cx_b.update(|_| {
1758            drop(project_b);
1759        });
1760
1761        // Share the project again and ensure guests can still join.
1762        project_a
1763            .update(cx_a, |project, cx| project.share(cx))
1764            .await
1765            .unwrap();
1766        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1767
1768        let project_b2 = Project::remote(
1769            project_id,
1770            client_b.clone(),
1771            client_b.user_store.clone(),
1772            lang_registry.clone(),
1773            fs.clone(),
1774            &mut cx_b.to_async(),
1775        )
1776        .await
1777        .unwrap();
1778        project_b2
1779            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1780            .await
1781            .unwrap();
1782    }
1783
1784    #[gpui::test(iterations = 10)]
1785    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1786        let lang_registry = Arc::new(LanguageRegistry::test());
1787        let fs = FakeFs::new(cx_a.background());
1788        cx_a.foreground().forbid_parking();
1789
1790        // Connect to a server as 2 clients.
1791        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1792        let client_a = server.create_client(cx_a, "user_a").await;
1793        let client_b = server.create_client(cx_b, "user_b").await;
1794        server
1795            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1796            .await;
1797
1798        // Share a project as client A
1799        fs.insert_tree(
1800            "/a",
1801            json!({
1802                "a.txt": "a-contents",
1803                "b.txt": "b-contents",
1804            }),
1805        )
1806        .await;
1807        let project_a = cx_a.update(|cx| {
1808            Project::local(
1809                client_a.clone(),
1810                client_a.user_store.clone(),
1811                lang_registry.clone(),
1812                fs.clone(),
1813                cx,
1814            )
1815        });
1816        let (worktree_a, _) = project_a
1817            .update(cx_a, |p, cx| {
1818                p.find_or_create_local_worktree("/a", true, cx)
1819            })
1820            .await
1821            .unwrap();
1822        worktree_a
1823            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1824            .await;
1825        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1826        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1827        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1828        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1829
1830        // Join that project as client B
1831        let project_b = Project::remote(
1832            project_id,
1833            client_b.clone(),
1834            client_b.user_store.clone(),
1835            lang_registry.clone(),
1836            fs.clone(),
1837            &mut cx_b.to_async(),
1838        )
1839        .await
1840        .unwrap();
1841        project_b
1842            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1843            .await
1844            .unwrap();
1845
1846        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1847        server.disconnect_client(client_a.current_user_id(cx_a));
1848        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1849        project_a
1850            .condition(cx_a, |project, _| project.collaborators().is_empty())
1851            .await;
1852        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1853        project_b
1854            .condition(cx_b, |project, _| project.is_read_only())
1855            .await;
1856        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1857        cx_b.update(|_| {
1858            drop(project_b);
1859        });
1860
1861        // Await reconnection
1862        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1863
1864        // Share the project again and ensure guests can still join.
1865        project_a
1866            .update(cx_a, |project, cx| project.share(cx))
1867            .await
1868            .unwrap();
1869        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1870
1871        let project_b2 = Project::remote(
1872            project_id,
1873            client_b.clone(),
1874            client_b.user_store.clone(),
1875            lang_registry.clone(),
1876            fs.clone(),
1877            &mut cx_b.to_async(),
1878        )
1879        .await
1880        .unwrap();
1881        project_b2
1882            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1883            .await
1884            .unwrap();
1885    }
1886
1887    #[gpui::test(iterations = 10)]
1888    async fn test_propagate_saves_and_fs_changes(
1889        cx_a: &mut TestAppContext,
1890        cx_b: &mut TestAppContext,
1891        cx_c: &mut TestAppContext,
1892    ) {
1893        let lang_registry = Arc::new(LanguageRegistry::test());
1894        let fs = FakeFs::new(cx_a.background());
1895        cx_a.foreground().forbid_parking();
1896
1897        // Connect to a server as 3 clients.
1898        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1899        let client_a = server.create_client(cx_a, "user_a").await;
1900        let client_b = server.create_client(cx_b, "user_b").await;
1901        let client_c = server.create_client(cx_c, "user_c").await;
1902        server
1903            .make_contacts(vec![
1904                (&client_a, cx_a),
1905                (&client_b, cx_b),
1906                (&client_c, cx_c),
1907            ])
1908            .await;
1909
1910        // Share a worktree as client A.
1911        fs.insert_tree(
1912            "/a",
1913            json!({
1914                "file1": "",
1915                "file2": ""
1916            }),
1917        )
1918        .await;
1919        let project_a = cx_a.update(|cx| {
1920            Project::local(
1921                client_a.clone(),
1922                client_a.user_store.clone(),
1923                lang_registry.clone(),
1924                fs.clone(),
1925                cx,
1926            )
1927        });
1928        let (worktree_a, _) = project_a
1929            .update(cx_a, |p, cx| {
1930                p.find_or_create_local_worktree("/a", true, cx)
1931            })
1932            .await
1933            .unwrap();
1934        worktree_a
1935            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1936            .await;
1937        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1938        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1939        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1940
1941        // Join that worktree as clients B and C.
1942        let project_b = Project::remote(
1943            project_id,
1944            client_b.clone(),
1945            client_b.user_store.clone(),
1946            lang_registry.clone(),
1947            fs.clone(),
1948            &mut cx_b.to_async(),
1949        )
1950        .await
1951        .unwrap();
1952        let project_c = Project::remote(
1953            project_id,
1954            client_c.clone(),
1955            client_c.user_store.clone(),
1956            lang_registry.clone(),
1957            fs.clone(),
1958            &mut cx_c.to_async(),
1959        )
1960        .await
1961        .unwrap();
1962        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1963        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1964
1965        // Open and edit a buffer as both guests B and C.
1966        let buffer_b = project_b
1967            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1968            .await
1969            .unwrap();
1970        let buffer_c = project_c
1971            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1972            .await
1973            .unwrap();
1974        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1975        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1976
1977        // Open and edit that buffer as the host.
1978        let buffer_a = project_a
1979            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1980            .await
1981            .unwrap();
1982
1983        buffer_a
1984            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1985            .await;
1986        buffer_a.update(cx_a, |buf, cx| {
1987            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1988        });
1989
1990        // Wait for edits to propagate
1991        buffer_a
1992            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1993            .await;
1994        buffer_b
1995            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1996            .await;
1997        buffer_c
1998            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1999            .await;
2000
2001        // Edit the buffer as the host and concurrently save as guest B.
2002        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2003        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2004        save_b.await.unwrap();
2005        assert_eq!(
2006            fs.load("/a/file1".as_ref()).await.unwrap(),
2007            "hi-a, i-am-c, i-am-b, i-am-a"
2008        );
2009        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2010        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2011        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2012
2013        worktree_a.flush_fs_events(cx_a).await;
2014
2015        // Make changes on host's file system, see those changes on guest worktrees.
2016        fs.rename(
2017            "/a/file1".as_ref(),
2018            "/a/file1-renamed".as_ref(),
2019            Default::default(),
2020        )
2021        .await
2022        .unwrap();
2023
2024        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2025            .await
2026            .unwrap();
2027        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2028
2029        worktree_a
2030            .condition(&cx_a, |tree, _| {
2031                tree.paths()
2032                    .map(|p| p.to_string_lossy())
2033                    .collect::<Vec<_>>()
2034                    == ["file1-renamed", "file3", "file4"]
2035            })
2036            .await;
2037        worktree_b
2038            .condition(&cx_b, |tree, _| {
2039                tree.paths()
2040                    .map(|p| p.to_string_lossy())
2041                    .collect::<Vec<_>>()
2042                    == ["file1-renamed", "file3", "file4"]
2043            })
2044            .await;
2045        worktree_c
2046            .condition(&cx_c, |tree, _| {
2047                tree.paths()
2048                    .map(|p| p.to_string_lossy())
2049                    .collect::<Vec<_>>()
2050                    == ["file1-renamed", "file3", "file4"]
2051            })
2052            .await;
2053
2054        // Ensure buffer files are updated as well.
2055        buffer_a
2056            .condition(&cx_a, |buf, _| {
2057                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2058            })
2059            .await;
2060        buffer_b
2061            .condition(&cx_b, |buf, _| {
2062                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2063            })
2064            .await;
2065        buffer_c
2066            .condition(&cx_c, |buf, _| {
2067                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2068            })
2069            .await;
2070    }
2071
2072    #[gpui::test(iterations = 10)]
2073    async fn test_fs_operations(
2074        executor: Arc<Deterministic>,
2075        cx_a: &mut TestAppContext,
2076        cx_b: &mut TestAppContext,
2077    ) {
2078        executor.forbid_parking();
2079        let fs = FakeFs::new(cx_a.background());
2080
2081        // Connect to a server as 2 clients.
2082        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2083        let mut client_a = server.create_client(cx_a, "user_a").await;
2084        let mut client_b = server.create_client(cx_b, "user_b").await;
2085        server
2086            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2087            .await;
2088
2089        // Share a project as client A
2090        fs.insert_tree(
2091            "/dir",
2092            json!({
2093                "a.txt": "a-contents",
2094                "b.txt": "b-contents",
2095            }),
2096        )
2097        .await;
2098
2099        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2100        let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
2101        project_a
2102            .update(cx_a, |project, cx| project.share(cx))
2103            .await
2104            .unwrap();
2105
2106        let project_b = client_b.build_remote_project(project_id, cx_b).await;
2107
2108        let worktree_a =
2109            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2110        let worktree_b =
2111            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2112
2113        let entry = project_b
2114            .update(cx_b, |project, cx| {
2115                project
2116                    .create_entry((worktree_id, "c.txt"), false, cx)
2117                    .unwrap()
2118            })
2119            .await
2120            .unwrap();
2121        worktree_a.read_with(cx_a, |worktree, _| {
2122            assert_eq!(
2123                worktree
2124                    .paths()
2125                    .map(|p| p.to_string_lossy())
2126                    .collect::<Vec<_>>(),
2127                ["a.txt", "b.txt", "c.txt"]
2128            );
2129        });
2130        worktree_b.read_with(cx_b, |worktree, _| {
2131            assert_eq!(
2132                worktree
2133                    .paths()
2134                    .map(|p| p.to_string_lossy())
2135                    .collect::<Vec<_>>(),
2136                ["a.txt", "b.txt", "c.txt"]
2137            );
2138        });
2139
2140        project_b
2141            .update(cx_b, |project, cx| {
2142                project.rename_entry(entry.id, Path::new("d.txt"), cx)
2143            })
2144            .unwrap()
2145            .await
2146            .unwrap();
2147        worktree_a.read_with(cx_a, |worktree, _| {
2148            assert_eq!(
2149                worktree
2150                    .paths()
2151                    .map(|p| p.to_string_lossy())
2152                    .collect::<Vec<_>>(),
2153                ["a.txt", "b.txt", "d.txt"]
2154            );
2155        });
2156        worktree_b.read_with(cx_b, |worktree, _| {
2157            assert_eq!(
2158                worktree
2159                    .paths()
2160                    .map(|p| p.to_string_lossy())
2161                    .collect::<Vec<_>>(),
2162                ["a.txt", "b.txt", "d.txt"]
2163            );
2164        });
2165
2166        let dir_entry = project_b
2167            .update(cx_b, |project, cx| {
2168                project
2169                    .create_entry((worktree_id, "DIR"), true, cx)
2170                    .unwrap()
2171            })
2172            .await
2173            .unwrap();
2174        worktree_a.read_with(cx_a, |worktree, _| {
2175            assert_eq!(
2176                worktree
2177                    .paths()
2178                    .map(|p| p.to_string_lossy())
2179                    .collect::<Vec<_>>(),
2180                ["DIR", "a.txt", "b.txt", "d.txt"]
2181            );
2182        });
2183        worktree_b.read_with(cx_b, |worktree, _| {
2184            assert_eq!(
2185                worktree
2186                    .paths()
2187                    .map(|p| p.to_string_lossy())
2188                    .collect::<Vec<_>>(),
2189                ["DIR", "a.txt", "b.txt", "d.txt"]
2190            );
2191        });
2192
2193        project_b
2194            .update(cx_b, |project, cx| {
2195                project.delete_entry(dir_entry.id, cx).unwrap()
2196            })
2197            .await
2198            .unwrap();
2199        worktree_a.read_with(cx_a, |worktree, _| {
2200            assert_eq!(
2201                worktree
2202                    .paths()
2203                    .map(|p| p.to_string_lossy())
2204                    .collect::<Vec<_>>(),
2205                ["a.txt", "b.txt", "d.txt"]
2206            );
2207        });
2208        worktree_b.read_with(cx_b, |worktree, _| {
2209            assert_eq!(
2210                worktree
2211                    .paths()
2212                    .map(|p| p.to_string_lossy())
2213                    .collect::<Vec<_>>(),
2214                ["a.txt", "b.txt", "d.txt"]
2215            );
2216        });
2217
2218        project_b
2219            .update(cx_b, |project, cx| {
2220                project.delete_entry(entry.id, cx).unwrap()
2221            })
2222            .await
2223            .unwrap();
2224        worktree_a.read_with(cx_a, |worktree, _| {
2225            assert_eq!(
2226                worktree
2227                    .paths()
2228                    .map(|p| p.to_string_lossy())
2229                    .collect::<Vec<_>>(),
2230                ["a.txt", "b.txt"]
2231            );
2232        });
2233        worktree_b.read_with(cx_b, |worktree, _| {
2234            assert_eq!(
2235                worktree
2236                    .paths()
2237                    .map(|p| p.to_string_lossy())
2238                    .collect::<Vec<_>>(),
2239                ["a.txt", "b.txt"]
2240            );
2241        });
2242    }
2243
2244    #[gpui::test(iterations = 10)]
2245    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2246        cx_a.foreground().forbid_parking();
2247        let lang_registry = Arc::new(LanguageRegistry::test());
2248        let fs = FakeFs::new(cx_a.background());
2249
2250        // Connect to a server as 2 clients.
2251        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2252        let client_a = server.create_client(cx_a, "user_a").await;
2253        let client_b = server.create_client(cx_b, "user_b").await;
2254        server
2255            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2256            .await;
2257
2258        // Share a project as client A
2259        fs.insert_tree(
2260            "/dir",
2261            json!({
2262                "a.txt": "a-contents",
2263            }),
2264        )
2265        .await;
2266
2267        let project_a = cx_a.update(|cx| {
2268            Project::local(
2269                client_a.clone(),
2270                client_a.user_store.clone(),
2271                lang_registry.clone(),
2272                fs.clone(),
2273                cx,
2274            )
2275        });
2276        let (worktree_a, _) = project_a
2277            .update(cx_a, |p, cx| {
2278                p.find_or_create_local_worktree("/dir", true, cx)
2279            })
2280            .await
2281            .unwrap();
2282        worktree_a
2283            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2284            .await;
2285        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2286        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2287        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2288
2289        // Join that project as client B
2290        let project_b = Project::remote(
2291            project_id,
2292            client_b.clone(),
2293            client_b.user_store.clone(),
2294            lang_registry.clone(),
2295            fs.clone(),
2296            &mut cx_b.to_async(),
2297        )
2298        .await
2299        .unwrap();
2300
2301        // Open a buffer as client B
2302        let buffer_b = project_b
2303            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2304            .await
2305            .unwrap();
2306
2307        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2308        buffer_b.read_with(cx_b, |buf, _| {
2309            assert!(buf.is_dirty());
2310            assert!(!buf.has_conflict());
2311        });
2312
2313        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2314        buffer_b
2315            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2316            .await;
2317        buffer_b.read_with(cx_b, |buf, _| {
2318            assert!(!buf.has_conflict());
2319        });
2320
2321        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2322        buffer_b.read_with(cx_b, |buf, _| {
2323            assert!(buf.is_dirty());
2324            assert!(!buf.has_conflict());
2325        });
2326    }
2327
2328    #[gpui::test(iterations = 10)]
2329    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2330        cx_a.foreground().forbid_parking();
2331        let lang_registry = Arc::new(LanguageRegistry::test());
2332        let fs = FakeFs::new(cx_a.background());
2333
2334        // Connect to a server as 2 clients.
2335        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2336        let client_a = server.create_client(cx_a, "user_a").await;
2337        let client_b = server.create_client(cx_b, "user_b").await;
2338        server
2339            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2340            .await;
2341
2342        // Share a project as client A
2343        fs.insert_tree(
2344            "/dir",
2345            json!({
2346                "a.txt": "a-contents",
2347            }),
2348        )
2349        .await;
2350
2351        let project_a = cx_a.update(|cx| {
2352            Project::local(
2353                client_a.clone(),
2354                client_a.user_store.clone(),
2355                lang_registry.clone(),
2356                fs.clone(),
2357                cx,
2358            )
2359        });
2360        let (worktree_a, _) = project_a
2361            .update(cx_a, |p, cx| {
2362                p.find_or_create_local_worktree("/dir", true, cx)
2363            })
2364            .await
2365            .unwrap();
2366        worktree_a
2367            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2368            .await;
2369        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2370        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2371        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2372
2373        // Join that project as client B
2374        let project_b = Project::remote(
2375            project_id,
2376            client_b.clone(),
2377            client_b.user_store.clone(),
2378            lang_registry.clone(),
2379            fs.clone(),
2380            &mut cx_b.to_async(),
2381        )
2382        .await
2383        .unwrap();
2384        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2385
2386        // Open a buffer as client B
2387        let buffer_b = project_b
2388            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2389            .await
2390            .unwrap();
2391        buffer_b.read_with(cx_b, |buf, _| {
2392            assert!(!buf.is_dirty());
2393            assert!(!buf.has_conflict());
2394        });
2395
2396        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2397            .await
2398            .unwrap();
2399        buffer_b
2400            .condition(&cx_b, |buf, _| {
2401                buf.text() == "new contents" && !buf.is_dirty()
2402            })
2403            .await;
2404        buffer_b.read_with(cx_b, |buf, _| {
2405            assert!(!buf.has_conflict());
2406        });
2407    }
2408
2409    #[gpui::test(iterations = 10)]
2410    async fn test_editing_while_guest_opens_buffer(
2411        cx_a: &mut TestAppContext,
2412        cx_b: &mut TestAppContext,
2413    ) {
2414        cx_a.foreground().forbid_parking();
2415        let lang_registry = Arc::new(LanguageRegistry::test());
2416        let fs = FakeFs::new(cx_a.background());
2417
2418        // Connect to a server as 2 clients.
2419        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2420        let client_a = server.create_client(cx_a, "user_a").await;
2421        let client_b = server.create_client(cx_b, "user_b").await;
2422        server
2423            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2424            .await;
2425
2426        // Share a project as client A
2427        fs.insert_tree(
2428            "/dir",
2429            json!({
2430                "a.txt": "a-contents",
2431            }),
2432        )
2433        .await;
2434        let project_a = cx_a.update(|cx| {
2435            Project::local(
2436                client_a.clone(),
2437                client_a.user_store.clone(),
2438                lang_registry.clone(),
2439                fs.clone(),
2440                cx,
2441            )
2442        });
2443        let (worktree_a, _) = project_a
2444            .update(cx_a, |p, cx| {
2445                p.find_or_create_local_worktree("/dir", true, cx)
2446            })
2447            .await
2448            .unwrap();
2449        worktree_a
2450            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2451            .await;
2452        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2453        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2454        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2455
2456        // Join that project as client B
2457        let project_b = Project::remote(
2458            project_id,
2459            client_b.clone(),
2460            client_b.user_store.clone(),
2461            lang_registry.clone(),
2462            fs.clone(),
2463            &mut cx_b.to_async(),
2464        )
2465        .await
2466        .unwrap();
2467
2468        // Open a buffer as client A
2469        let buffer_a = project_a
2470            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2471            .await
2472            .unwrap();
2473
2474        // Start opening the same buffer as client B
2475        let buffer_b = cx_b
2476            .background()
2477            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2478
2479        // Edit the buffer as client A while client B is still opening it.
2480        cx_b.background().simulate_random_delay().await;
2481        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2482        cx_b.background().simulate_random_delay().await;
2483        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2484
2485        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2486        let buffer_b = buffer_b.await.unwrap();
2487        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2488    }
2489
2490    #[gpui::test(iterations = 10)]
2491    async fn test_leaving_worktree_while_opening_buffer(
2492        cx_a: &mut TestAppContext,
2493        cx_b: &mut TestAppContext,
2494    ) {
2495        cx_a.foreground().forbid_parking();
2496        let lang_registry = Arc::new(LanguageRegistry::test());
2497        let fs = FakeFs::new(cx_a.background());
2498
2499        // Connect to a server as 2 clients.
2500        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2501        let client_a = server.create_client(cx_a, "user_a").await;
2502        let client_b = server.create_client(cx_b, "user_b").await;
2503        server
2504            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2505            .await;
2506
2507        // Share a project as client A
2508        fs.insert_tree(
2509            "/dir",
2510            json!({
2511                "a.txt": "a-contents",
2512            }),
2513        )
2514        .await;
2515        let project_a = cx_a.update(|cx| {
2516            Project::local(
2517                client_a.clone(),
2518                client_a.user_store.clone(),
2519                lang_registry.clone(),
2520                fs.clone(),
2521                cx,
2522            )
2523        });
2524        let (worktree_a, _) = project_a
2525            .update(cx_a, |p, cx| {
2526                p.find_or_create_local_worktree("/dir", true, cx)
2527            })
2528            .await
2529            .unwrap();
2530        worktree_a
2531            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2532            .await;
2533        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2534        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2535        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2536
2537        // Join that project as client B
2538        let project_b = Project::remote(
2539            project_id,
2540            client_b.clone(),
2541            client_b.user_store.clone(),
2542            lang_registry.clone(),
2543            fs.clone(),
2544            &mut cx_b.to_async(),
2545        )
2546        .await
2547        .unwrap();
2548
2549        // See that a guest has joined as client A.
2550        project_a
2551            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2552            .await;
2553
2554        // Begin opening a buffer as client B, but leave the project before the open completes.
2555        let buffer_b = cx_b
2556            .background()
2557            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2558        cx_b.update(|_| drop(project_b));
2559        drop(buffer_b);
2560
2561        // See that the guest has left.
2562        project_a
2563            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2564            .await;
2565    }
2566
2567    #[gpui::test(iterations = 10)]
2568    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2569        cx_a.foreground().forbid_parking();
2570        let lang_registry = Arc::new(LanguageRegistry::test());
2571        let fs = FakeFs::new(cx_a.background());
2572
2573        // Connect to a server as 2 clients.
2574        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2575        let client_a = server.create_client(cx_a, "user_a").await;
2576        let client_b = server.create_client(cx_b, "user_b").await;
2577        server
2578            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2579            .await;
2580
2581        // Share a project as client A
2582        fs.insert_tree(
2583            "/a",
2584            json!({
2585                "a.txt": "a-contents",
2586                "b.txt": "b-contents",
2587            }),
2588        )
2589        .await;
2590        let project_a = cx_a.update(|cx| {
2591            Project::local(
2592                client_a.clone(),
2593                client_a.user_store.clone(),
2594                lang_registry.clone(),
2595                fs.clone(),
2596                cx,
2597            )
2598        });
2599        let (worktree_a, _) = project_a
2600            .update(cx_a, |p, cx| {
2601                p.find_or_create_local_worktree("/a", true, cx)
2602            })
2603            .await
2604            .unwrap();
2605        worktree_a
2606            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2607            .await;
2608        let project_id = project_a
2609            .update(cx_a, |project, _| project.next_remote_id())
2610            .await;
2611        project_a
2612            .update(cx_a, |project, cx| project.share(cx))
2613            .await
2614            .unwrap();
2615
2616        // Join that project as client B
2617        let _project_b = Project::remote(
2618            project_id,
2619            client_b.clone(),
2620            client_b.user_store.clone(),
2621            lang_registry.clone(),
2622            fs.clone(),
2623            &mut cx_b.to_async(),
2624        )
2625        .await
2626        .unwrap();
2627
2628        // Client A sees that a guest has joined.
2629        project_a
2630            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2631            .await;
2632
2633        // Drop client B's connection and ensure client A observes client B leaving the project.
2634        client_b.disconnect(&cx_b.to_async()).unwrap();
2635        project_a
2636            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2637            .await;
2638
2639        // Rejoin the project as client B
2640        let _project_b = Project::remote(
2641            project_id,
2642            client_b.clone(),
2643            client_b.user_store.clone(),
2644            lang_registry.clone(),
2645            fs.clone(),
2646            &mut cx_b.to_async(),
2647        )
2648        .await
2649        .unwrap();
2650
2651        // Client A sees that a guest has re-joined.
2652        project_a
2653            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2654            .await;
2655
2656        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2657        client_b.wait_for_current_user(cx_b).await;
2658        server.disconnect_client(client_b.current_user_id(cx_b));
2659        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2660        project_a
2661            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2662            .await;
2663    }
2664
2665    #[gpui::test(iterations = 10)]
2666    async fn test_collaborating_with_diagnostics(
2667        cx_a: &mut TestAppContext,
2668        cx_b: &mut TestAppContext,
2669    ) {
2670        cx_a.foreground().forbid_parking();
2671        let lang_registry = Arc::new(LanguageRegistry::test());
2672        let fs = FakeFs::new(cx_a.background());
2673
2674        // Set up a fake language server.
2675        let mut language = Language::new(
2676            LanguageConfig {
2677                name: "Rust".into(),
2678                path_suffixes: vec!["rs".to_string()],
2679                ..Default::default()
2680            },
2681            Some(tree_sitter_rust::language()),
2682        );
2683        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2684        lang_registry.add(Arc::new(language));
2685
2686        // Connect to a server as 2 clients.
2687        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2688        let client_a = server.create_client(cx_a, "user_a").await;
2689        let client_b = server.create_client(cx_b, "user_b").await;
2690        server
2691            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2692            .await;
2693
2694        // Share a project as client A
2695        fs.insert_tree(
2696            "/a",
2697            json!({
2698                "a.rs": "let one = two",
2699                "other.rs": "",
2700            }),
2701        )
2702        .await;
2703        let project_a = cx_a.update(|cx| {
2704            Project::local(
2705                client_a.clone(),
2706                client_a.user_store.clone(),
2707                lang_registry.clone(),
2708                fs.clone(),
2709                cx,
2710            )
2711        });
2712        let (worktree_a, _) = project_a
2713            .update(cx_a, |p, cx| {
2714                p.find_or_create_local_worktree("/a", true, cx)
2715            })
2716            .await
2717            .unwrap();
2718        worktree_a
2719            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2720            .await;
2721        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2722        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2723        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2724
2725        // Cause the language server to start.
2726        let _ = cx_a
2727            .background()
2728            .spawn(project_a.update(cx_a, |project, cx| {
2729                project.open_buffer(
2730                    ProjectPath {
2731                        worktree_id,
2732                        path: Path::new("other.rs").into(),
2733                    },
2734                    cx,
2735                )
2736            }))
2737            .await
2738            .unwrap();
2739
2740        // Simulate a language server reporting errors for a file.
2741        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2742        fake_language_server
2743            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2744            .await;
2745        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2746            lsp::PublishDiagnosticsParams {
2747                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2748                version: None,
2749                diagnostics: vec![lsp::Diagnostic {
2750                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2751                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2752                    message: "message 1".to_string(),
2753                    ..Default::default()
2754                }],
2755            },
2756        );
2757
2758        // Wait for server to see the diagnostics update.
2759        server
2760            .condition(|store| {
2761                let worktree = store
2762                    .project(project_id)
2763                    .unwrap()
2764                    .share
2765                    .as_ref()
2766                    .unwrap()
2767                    .worktrees
2768                    .get(&worktree_id.to_proto())
2769                    .unwrap();
2770
2771                !worktree.diagnostic_summaries.is_empty()
2772            })
2773            .await;
2774
2775        // Join the worktree as client B.
2776        let project_b = Project::remote(
2777            project_id,
2778            client_b.clone(),
2779            client_b.user_store.clone(),
2780            lang_registry.clone(),
2781            fs.clone(),
2782            &mut cx_b.to_async(),
2783        )
2784        .await
2785        .unwrap();
2786
2787        project_b.read_with(cx_b, |project, cx| {
2788            assert_eq!(
2789                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2790                &[(
2791                    ProjectPath {
2792                        worktree_id,
2793                        path: Arc::from(Path::new("a.rs")),
2794                    },
2795                    DiagnosticSummary {
2796                        error_count: 1,
2797                        warning_count: 0,
2798                        ..Default::default()
2799                    },
2800                )]
2801            )
2802        });
2803
2804        // Simulate a language server reporting more errors for a file.
2805        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2806            lsp::PublishDiagnosticsParams {
2807                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2808                version: None,
2809                diagnostics: vec![
2810                    lsp::Diagnostic {
2811                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2812                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2813                        message: "message 1".to_string(),
2814                        ..Default::default()
2815                    },
2816                    lsp::Diagnostic {
2817                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2818                        range: lsp::Range::new(
2819                            lsp::Position::new(0, 10),
2820                            lsp::Position::new(0, 13),
2821                        ),
2822                        message: "message 2".to_string(),
2823                        ..Default::default()
2824                    },
2825                ],
2826            },
2827        );
2828
2829        // Client b gets the updated summaries
2830        project_b
2831            .condition(&cx_b, |project, cx| {
2832                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2833                    == &[(
2834                        ProjectPath {
2835                            worktree_id,
2836                            path: Arc::from(Path::new("a.rs")),
2837                        },
2838                        DiagnosticSummary {
2839                            error_count: 1,
2840                            warning_count: 1,
2841                            ..Default::default()
2842                        },
2843                    )]
2844            })
2845            .await;
2846
2847        // Open the file with the errors on client B. They should be present.
2848        let buffer_b = cx_b
2849            .background()
2850            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2851            .await
2852            .unwrap();
2853
2854        buffer_b.read_with(cx_b, |buffer, _| {
2855            assert_eq!(
2856                buffer
2857                    .snapshot()
2858                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2859                    .map(|entry| entry)
2860                    .collect::<Vec<_>>(),
2861                &[
2862                    DiagnosticEntry {
2863                        range: Point::new(0, 4)..Point::new(0, 7),
2864                        diagnostic: Diagnostic {
2865                            group_id: 0,
2866                            message: "message 1".to_string(),
2867                            severity: lsp::DiagnosticSeverity::ERROR,
2868                            is_primary: true,
2869                            ..Default::default()
2870                        }
2871                    },
2872                    DiagnosticEntry {
2873                        range: Point::new(0, 10)..Point::new(0, 13),
2874                        diagnostic: Diagnostic {
2875                            group_id: 1,
2876                            severity: lsp::DiagnosticSeverity::WARNING,
2877                            message: "message 2".to_string(),
2878                            is_primary: true,
2879                            ..Default::default()
2880                        }
2881                    }
2882                ]
2883            );
2884        });
2885
2886        // Simulate a language server reporting no errors for a file.
2887        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2888            lsp::PublishDiagnosticsParams {
2889                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2890                version: None,
2891                diagnostics: vec![],
2892            },
2893        );
2894        project_a
2895            .condition(cx_a, |project, cx| {
2896                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2897            })
2898            .await;
2899        project_b
2900            .condition(cx_b, |project, cx| {
2901                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2902            })
2903            .await;
2904    }
2905
2906    #[gpui::test(iterations = 10)]
2907    async fn test_collaborating_with_completion(
2908        cx_a: &mut TestAppContext,
2909        cx_b: &mut TestAppContext,
2910    ) {
2911        cx_a.foreground().forbid_parking();
2912        let lang_registry = Arc::new(LanguageRegistry::test());
2913        let fs = FakeFs::new(cx_a.background());
2914
2915        // Set up a fake language server.
2916        let mut language = Language::new(
2917            LanguageConfig {
2918                name: "Rust".into(),
2919                path_suffixes: vec!["rs".to_string()],
2920                ..Default::default()
2921            },
2922            Some(tree_sitter_rust::language()),
2923        );
2924        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2925            capabilities: lsp::ServerCapabilities {
2926                completion_provider: Some(lsp::CompletionOptions {
2927                    trigger_characters: Some(vec![".".to_string()]),
2928                    ..Default::default()
2929                }),
2930                ..Default::default()
2931            },
2932            ..Default::default()
2933        });
2934        lang_registry.add(Arc::new(language));
2935
2936        // Connect to a server as 2 clients.
2937        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2938        let client_a = server.create_client(cx_a, "user_a").await;
2939        let client_b = server.create_client(cx_b, "user_b").await;
2940        server
2941            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2942            .await;
2943
2944        // Share a project as client A
2945        fs.insert_tree(
2946            "/a",
2947            json!({
2948                "main.rs": "fn main() { a }",
2949                "other.rs": "",
2950            }),
2951        )
2952        .await;
2953        let project_a = cx_a.update(|cx| {
2954            Project::local(
2955                client_a.clone(),
2956                client_a.user_store.clone(),
2957                lang_registry.clone(),
2958                fs.clone(),
2959                cx,
2960            )
2961        });
2962        let (worktree_a, _) = project_a
2963            .update(cx_a, |p, cx| {
2964                p.find_or_create_local_worktree("/a", true, cx)
2965            })
2966            .await
2967            .unwrap();
2968        worktree_a
2969            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2970            .await;
2971        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2972        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2973        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2974
2975        // Join the worktree as client B.
2976        let project_b = Project::remote(
2977            project_id,
2978            client_b.clone(),
2979            client_b.user_store.clone(),
2980            lang_registry.clone(),
2981            fs.clone(),
2982            &mut cx_b.to_async(),
2983        )
2984        .await
2985        .unwrap();
2986
2987        // Open a file in an editor as the guest.
2988        let buffer_b = project_b
2989            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2990            .await
2991            .unwrap();
2992        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2993        let editor_b = cx_b.add_view(window_b, |cx| {
2994            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2995        });
2996
2997        let fake_language_server = fake_language_servers.next().await.unwrap();
2998        buffer_b
2999            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3000            .await;
3001
3002        // Type a completion trigger character as the guest.
3003        editor_b.update(cx_b, |editor, cx| {
3004            editor.change_selections(None, cx, |s| s.select_ranges([13..13]));
3005            editor.handle_input(&Input(".".into()), cx);
3006            cx.focus(&editor_b);
3007        });
3008
3009        // Receive a completion request as the host's language server.
3010        // Return some completions from the host's language server.
3011        cx_a.foreground().start_waiting();
3012        fake_language_server
3013            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3014                assert_eq!(
3015                    params.text_document_position.text_document.uri,
3016                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3017                );
3018                assert_eq!(
3019                    params.text_document_position.position,
3020                    lsp::Position::new(0, 14),
3021                );
3022
3023                Ok(Some(lsp::CompletionResponse::Array(vec![
3024                    lsp::CompletionItem {
3025                        label: "first_method(…)".into(),
3026                        detail: Some("fn(&mut self, B) -> C".into()),
3027                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3028                            new_text: "first_method($1)".to_string(),
3029                            range: lsp::Range::new(
3030                                lsp::Position::new(0, 14),
3031                                lsp::Position::new(0, 14),
3032                            ),
3033                        })),
3034                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3035                        ..Default::default()
3036                    },
3037                    lsp::CompletionItem {
3038                        label: "second_method(…)".into(),
3039                        detail: Some("fn(&mut self, C) -> D<E>".into()),
3040                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3041                            new_text: "second_method()".to_string(),
3042                            range: lsp::Range::new(
3043                                lsp::Position::new(0, 14),
3044                                lsp::Position::new(0, 14),
3045                            ),
3046                        })),
3047                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3048                        ..Default::default()
3049                    },
3050                ])))
3051            })
3052            .next()
3053            .await
3054            .unwrap();
3055        cx_a.foreground().finish_waiting();
3056
3057        // Open the buffer on the host.
3058        let buffer_a = project_a
3059            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3060            .await
3061            .unwrap();
3062        buffer_a
3063            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3064            .await;
3065
3066        // Confirm a completion on the guest.
3067        editor_b
3068            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3069            .await;
3070        editor_b.update(cx_b, |editor, cx| {
3071            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3072            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3073        });
3074
3075        // Return a resolved completion from the host's language server.
3076        // The resolved completion has an additional text edit.
3077        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3078            |params, _| async move {
3079                assert_eq!(params.label, "first_method(…)");
3080                Ok(lsp::CompletionItem {
3081                    label: "first_method(…)".into(),
3082                    detail: Some("fn(&mut self, B) -> C".into()),
3083                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3084                        new_text: "first_method($1)".to_string(),
3085                        range: lsp::Range::new(
3086                            lsp::Position::new(0, 14),
3087                            lsp::Position::new(0, 14),
3088                        ),
3089                    })),
3090                    additional_text_edits: Some(vec![lsp::TextEdit {
3091                        new_text: "use d::SomeTrait;\n".to_string(),
3092                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3093                    }]),
3094                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3095                    ..Default::default()
3096                })
3097            },
3098        );
3099
3100        // The additional edit is applied.
3101        buffer_a
3102            .condition(&cx_a, |buffer, _| {
3103                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3104            })
3105            .await;
3106        buffer_b
3107            .condition(&cx_b, |buffer, _| {
3108                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3109            })
3110            .await;
3111    }
3112
3113    #[gpui::test(iterations = 10)]
3114    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3115        cx_a.foreground().forbid_parking();
3116        let lang_registry = Arc::new(LanguageRegistry::test());
3117        let fs = FakeFs::new(cx_a.background());
3118
3119        // Connect to a server as 2 clients.
3120        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3121        let client_a = server.create_client(cx_a, "user_a").await;
3122        let client_b = server.create_client(cx_b, "user_b").await;
3123        server
3124            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3125            .await;
3126
3127        // Share a project as client A
3128        fs.insert_tree(
3129            "/a",
3130            json!({
3131                "a.rs": "let one = 1;",
3132            }),
3133        )
3134        .await;
3135        let project_a = cx_a.update(|cx| {
3136            Project::local(
3137                client_a.clone(),
3138                client_a.user_store.clone(),
3139                lang_registry.clone(),
3140                fs.clone(),
3141                cx,
3142            )
3143        });
3144        let (worktree_a, _) = project_a
3145            .update(cx_a, |p, cx| {
3146                p.find_or_create_local_worktree("/a", true, cx)
3147            })
3148            .await
3149            .unwrap();
3150        worktree_a
3151            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3152            .await;
3153        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3154        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3155        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3156        let buffer_a = project_a
3157            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3158            .await
3159            .unwrap();
3160
3161        // Join the worktree as client B.
3162        let project_b = Project::remote(
3163            project_id,
3164            client_b.clone(),
3165            client_b.user_store.clone(),
3166            lang_registry.clone(),
3167            fs.clone(),
3168            &mut cx_b.to_async(),
3169        )
3170        .await
3171        .unwrap();
3172
3173        let buffer_b = cx_b
3174            .background()
3175            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3176            .await
3177            .unwrap();
3178        buffer_b.update(cx_b, |buffer, cx| {
3179            buffer.edit([(4..7, "six")], cx);
3180            buffer.edit([(10..11, "6")], cx);
3181            assert_eq!(buffer.text(), "let six = 6;");
3182            assert!(buffer.is_dirty());
3183            assert!(!buffer.has_conflict());
3184        });
3185        buffer_a
3186            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3187            .await;
3188
3189        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3190            .await
3191            .unwrap();
3192        buffer_a
3193            .condition(cx_a, |buffer, _| buffer.has_conflict())
3194            .await;
3195        buffer_b
3196            .condition(cx_b, |buffer, _| buffer.has_conflict())
3197            .await;
3198
3199        project_b
3200            .update(cx_b, |project, cx| {
3201                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3202            })
3203            .await
3204            .unwrap();
3205        buffer_a.read_with(cx_a, |buffer, _| {
3206            assert_eq!(buffer.text(), "let seven = 7;");
3207            assert!(!buffer.is_dirty());
3208            assert!(!buffer.has_conflict());
3209        });
3210        buffer_b.read_with(cx_b, |buffer, _| {
3211            assert_eq!(buffer.text(), "let seven = 7;");
3212            assert!(!buffer.is_dirty());
3213            assert!(!buffer.has_conflict());
3214        });
3215
3216        buffer_a.update(cx_a, |buffer, cx| {
3217            // Undoing on the host is a no-op when the reload was initiated by the guest.
3218            buffer.undo(cx);
3219            assert_eq!(buffer.text(), "let seven = 7;");
3220            assert!(!buffer.is_dirty());
3221            assert!(!buffer.has_conflict());
3222        });
3223        buffer_b.update(cx_b, |buffer, cx| {
3224            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3225            buffer.undo(cx);
3226            assert_eq!(buffer.text(), "let six = 6;");
3227            assert!(buffer.is_dirty());
3228            assert!(!buffer.has_conflict());
3229        });
3230    }
3231
3232    #[gpui::test(iterations = 10)]
3233    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3234        cx_a.foreground().forbid_parking();
3235        let lang_registry = Arc::new(LanguageRegistry::test());
3236        let fs = FakeFs::new(cx_a.background());
3237
3238        // Set up a fake language server.
3239        let mut language = Language::new(
3240            LanguageConfig {
3241                name: "Rust".into(),
3242                path_suffixes: vec!["rs".to_string()],
3243                ..Default::default()
3244            },
3245            Some(tree_sitter_rust::language()),
3246        );
3247        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3248        lang_registry.add(Arc::new(language));
3249
3250        // Connect to a server as 2 clients.
3251        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3252        let client_a = server.create_client(cx_a, "user_a").await;
3253        let client_b = server.create_client(cx_b, "user_b").await;
3254        server
3255            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3256            .await;
3257
3258        // Share a project as client A
3259        fs.insert_tree(
3260            "/a",
3261            json!({
3262                "a.rs": "let one = two",
3263            }),
3264        )
3265        .await;
3266        let project_a = cx_a.update(|cx| {
3267            Project::local(
3268                client_a.clone(),
3269                client_a.user_store.clone(),
3270                lang_registry.clone(),
3271                fs.clone(),
3272                cx,
3273            )
3274        });
3275        let (worktree_a, _) = project_a
3276            .update(cx_a, |p, cx| {
3277                p.find_or_create_local_worktree("/a", true, cx)
3278            })
3279            .await
3280            .unwrap();
3281        worktree_a
3282            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3283            .await;
3284        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3285        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3286        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3287
3288        // Join the worktree as client B.
3289        let project_b = Project::remote(
3290            project_id,
3291            client_b.clone(),
3292            client_b.user_store.clone(),
3293            lang_registry.clone(),
3294            fs.clone(),
3295            &mut cx_b.to_async(),
3296        )
3297        .await
3298        .unwrap();
3299
3300        let buffer_b = cx_b
3301            .background()
3302            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3303            .await
3304            .unwrap();
3305
3306        let fake_language_server = fake_language_servers.next().await.unwrap();
3307        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3308            Ok(Some(vec![
3309                lsp::TextEdit {
3310                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3311                    new_text: "h".to_string(),
3312                },
3313                lsp::TextEdit {
3314                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3315                    new_text: "y".to_string(),
3316                },
3317            ]))
3318        });
3319
3320        project_b
3321            .update(cx_b, |project, cx| {
3322                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3323            })
3324            .await
3325            .unwrap();
3326        assert_eq!(
3327            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3328            "let honey = two"
3329        );
3330    }
3331
3332    #[gpui::test(iterations = 10)]
3333    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3334        cx_a.foreground().forbid_parking();
3335        let lang_registry = Arc::new(LanguageRegistry::test());
3336        let fs = FakeFs::new(cx_a.background());
3337        fs.insert_tree(
3338            "/root-1",
3339            json!({
3340                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3341            }),
3342        )
3343        .await;
3344        fs.insert_tree(
3345            "/root-2",
3346            json!({
3347                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3348            }),
3349        )
3350        .await;
3351
3352        // Set up a fake language server.
3353        let mut language = Language::new(
3354            LanguageConfig {
3355                name: "Rust".into(),
3356                path_suffixes: vec!["rs".to_string()],
3357                ..Default::default()
3358            },
3359            Some(tree_sitter_rust::language()),
3360        );
3361        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3362        lang_registry.add(Arc::new(language));
3363
3364        // Connect to a server as 2 clients.
3365        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3366        let client_a = server.create_client(cx_a, "user_a").await;
3367        let client_b = server.create_client(cx_b, "user_b").await;
3368        server
3369            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3370            .await;
3371
3372        // Share a project as client A
3373        let project_a = cx_a.update(|cx| {
3374            Project::local(
3375                client_a.clone(),
3376                client_a.user_store.clone(),
3377                lang_registry.clone(),
3378                fs.clone(),
3379                cx,
3380            )
3381        });
3382        let (worktree_a, _) = project_a
3383            .update(cx_a, |p, cx| {
3384                p.find_or_create_local_worktree("/root-1", true, cx)
3385            })
3386            .await
3387            .unwrap();
3388        worktree_a
3389            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3390            .await;
3391        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3392        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3393        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3394
3395        // Join the worktree as client B.
3396        let project_b = Project::remote(
3397            project_id,
3398            client_b.clone(),
3399            client_b.user_store.clone(),
3400            lang_registry.clone(),
3401            fs.clone(),
3402            &mut cx_b.to_async(),
3403        )
3404        .await
3405        .unwrap();
3406
3407        // Open the file on client B.
3408        let buffer_b = cx_b
3409            .background()
3410            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3411            .await
3412            .unwrap();
3413
3414        // Request the definition of a symbol as the guest.
3415        let fake_language_server = fake_language_servers.next().await.unwrap();
3416        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3417            |_, _| async move {
3418                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3419                    lsp::Location::new(
3420                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3421                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3422                    ),
3423                )))
3424            },
3425        );
3426
3427        let definitions_1 = project_b
3428            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3429            .await
3430            .unwrap();
3431        cx_b.read(|cx| {
3432            assert_eq!(definitions_1.len(), 1);
3433            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3434            let target_buffer = definitions_1[0].buffer.read(cx);
3435            assert_eq!(
3436                target_buffer.text(),
3437                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3438            );
3439            assert_eq!(
3440                definitions_1[0].range.to_point(target_buffer),
3441                Point::new(0, 6)..Point::new(0, 9)
3442            );
3443        });
3444
3445        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3446        // the previous call to `definition`.
3447        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3448            |_, _| async move {
3449                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3450                    lsp::Location::new(
3451                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3452                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3453                    ),
3454                )))
3455            },
3456        );
3457
3458        let definitions_2 = project_b
3459            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3460            .await
3461            .unwrap();
3462        cx_b.read(|cx| {
3463            assert_eq!(definitions_2.len(), 1);
3464            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3465            let target_buffer = definitions_2[0].buffer.read(cx);
3466            assert_eq!(
3467                target_buffer.text(),
3468                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3469            );
3470            assert_eq!(
3471                definitions_2[0].range.to_point(target_buffer),
3472                Point::new(1, 6)..Point::new(1, 11)
3473            );
3474        });
3475        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3476    }
3477
3478    #[gpui::test(iterations = 10)]
3479    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3480        cx_a.foreground().forbid_parking();
3481        let lang_registry = Arc::new(LanguageRegistry::test());
3482        let fs = FakeFs::new(cx_a.background());
3483        fs.insert_tree(
3484            "/root-1",
3485            json!({
3486                "one.rs": "const ONE: usize = 1;",
3487                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3488            }),
3489        )
3490        .await;
3491        fs.insert_tree(
3492            "/root-2",
3493            json!({
3494                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3495            }),
3496        )
3497        .await;
3498
3499        // Set up a fake language server.
3500        let mut language = Language::new(
3501            LanguageConfig {
3502                name: "Rust".into(),
3503                path_suffixes: vec!["rs".to_string()],
3504                ..Default::default()
3505            },
3506            Some(tree_sitter_rust::language()),
3507        );
3508        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3509        lang_registry.add(Arc::new(language));
3510
3511        // Connect to a server as 2 clients.
3512        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3513        let client_a = server.create_client(cx_a, "user_a").await;
3514        let client_b = server.create_client(cx_b, "user_b").await;
3515        server
3516            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3517            .await;
3518
3519        // Share a project as client A
3520        let project_a = cx_a.update(|cx| {
3521            Project::local(
3522                client_a.clone(),
3523                client_a.user_store.clone(),
3524                lang_registry.clone(),
3525                fs.clone(),
3526                cx,
3527            )
3528        });
3529        let (worktree_a, _) = project_a
3530            .update(cx_a, |p, cx| {
3531                p.find_or_create_local_worktree("/root-1", true, cx)
3532            })
3533            .await
3534            .unwrap();
3535        worktree_a
3536            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3537            .await;
3538        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3539        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3540        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3541
3542        // Join the worktree as client B.
3543        let project_b = Project::remote(
3544            project_id,
3545            client_b.clone(),
3546            client_b.user_store.clone(),
3547            lang_registry.clone(),
3548            fs.clone(),
3549            &mut cx_b.to_async(),
3550        )
3551        .await
3552        .unwrap();
3553
3554        // Open the file on client B.
3555        let buffer_b = cx_b
3556            .background()
3557            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3558            .await
3559            .unwrap();
3560
3561        // Request references to a symbol as the guest.
3562        let fake_language_server = fake_language_servers.next().await.unwrap();
3563        fake_language_server.handle_request::<lsp::request::References, _, _>(
3564            |params, _| async move {
3565                assert_eq!(
3566                    params.text_document_position.text_document.uri.as_str(),
3567                    "file:///root-1/one.rs"
3568                );
3569                Ok(Some(vec![
3570                    lsp::Location {
3571                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3572                        range: lsp::Range::new(
3573                            lsp::Position::new(0, 24),
3574                            lsp::Position::new(0, 27),
3575                        ),
3576                    },
3577                    lsp::Location {
3578                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3579                        range: lsp::Range::new(
3580                            lsp::Position::new(0, 35),
3581                            lsp::Position::new(0, 38),
3582                        ),
3583                    },
3584                    lsp::Location {
3585                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3586                        range: lsp::Range::new(
3587                            lsp::Position::new(0, 37),
3588                            lsp::Position::new(0, 40),
3589                        ),
3590                    },
3591                ]))
3592            },
3593        );
3594
3595        let references = project_b
3596            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3597            .await
3598            .unwrap();
3599        cx_b.read(|cx| {
3600            assert_eq!(references.len(), 3);
3601            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3602
3603            let two_buffer = references[0].buffer.read(cx);
3604            let three_buffer = references[2].buffer.read(cx);
3605            assert_eq!(
3606                two_buffer.file().unwrap().path().as_ref(),
3607                Path::new("two.rs")
3608            );
3609            assert_eq!(references[1].buffer, references[0].buffer);
3610            assert_eq!(
3611                three_buffer.file().unwrap().full_path(cx),
3612                Path::new("three.rs")
3613            );
3614
3615            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3616            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3617            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3618        });
3619    }
3620
3621    #[gpui::test(iterations = 10)]
3622    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3623        cx_a.foreground().forbid_parking();
3624        let lang_registry = Arc::new(LanguageRegistry::test());
3625        let fs = FakeFs::new(cx_a.background());
3626        fs.insert_tree(
3627            "/root-1",
3628            json!({
3629                "a": "hello world",
3630                "b": "goodnight moon",
3631                "c": "a world of goo",
3632                "d": "world champion of clown world",
3633            }),
3634        )
3635        .await;
3636        fs.insert_tree(
3637            "/root-2",
3638            json!({
3639                "e": "disney world is fun",
3640            }),
3641        )
3642        .await;
3643
3644        // Connect to a server as 2 clients.
3645        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3646        let client_a = server.create_client(cx_a, "user_a").await;
3647        let client_b = server.create_client(cx_b, "user_b").await;
3648        server
3649            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3650            .await;
3651
3652        // Share a project as client A
3653        let project_a = cx_a.update(|cx| {
3654            Project::local(
3655                client_a.clone(),
3656                client_a.user_store.clone(),
3657                lang_registry.clone(),
3658                fs.clone(),
3659                cx,
3660            )
3661        });
3662        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3663
3664        let (worktree_1, _) = project_a
3665            .update(cx_a, |p, cx| {
3666                p.find_or_create_local_worktree("/root-1", true, cx)
3667            })
3668            .await
3669            .unwrap();
3670        worktree_1
3671            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3672            .await;
3673        let (worktree_2, _) = project_a
3674            .update(cx_a, |p, cx| {
3675                p.find_or_create_local_worktree("/root-2", true, cx)
3676            })
3677            .await
3678            .unwrap();
3679        worktree_2
3680            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3681            .await;
3682
3683        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3684
3685        // Join the worktree as client B.
3686        let project_b = Project::remote(
3687            project_id,
3688            client_b.clone(),
3689            client_b.user_store.clone(),
3690            lang_registry.clone(),
3691            fs.clone(),
3692            &mut cx_b.to_async(),
3693        )
3694        .await
3695        .unwrap();
3696
3697        let results = project_b
3698            .update(cx_b, |project, cx| {
3699                project.search(SearchQuery::text("world", false, false), cx)
3700            })
3701            .await
3702            .unwrap();
3703
3704        let mut ranges_by_path = results
3705            .into_iter()
3706            .map(|(buffer, ranges)| {
3707                buffer.read_with(cx_b, |buffer, cx| {
3708                    let path = buffer.file().unwrap().full_path(cx);
3709                    let offset_ranges = ranges
3710                        .into_iter()
3711                        .map(|range| range.to_offset(buffer))
3712                        .collect::<Vec<_>>();
3713                    (path, offset_ranges)
3714                })
3715            })
3716            .collect::<Vec<_>>();
3717        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3718
3719        assert_eq!(
3720            ranges_by_path,
3721            &[
3722                (PathBuf::from("root-1/a"), vec![6..11]),
3723                (PathBuf::from("root-1/c"), vec![2..7]),
3724                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3725                (PathBuf::from("root-2/e"), vec![7..12]),
3726            ]
3727        );
3728    }
3729
3730    #[gpui::test(iterations = 10)]
3731    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3732        cx_a.foreground().forbid_parking();
3733        let lang_registry = Arc::new(LanguageRegistry::test());
3734        let fs = FakeFs::new(cx_a.background());
3735        fs.insert_tree(
3736            "/root-1",
3737            json!({
3738                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3739            }),
3740        )
3741        .await;
3742
3743        // Set up a fake language server.
3744        let mut language = Language::new(
3745            LanguageConfig {
3746                name: "Rust".into(),
3747                path_suffixes: vec!["rs".to_string()],
3748                ..Default::default()
3749            },
3750            Some(tree_sitter_rust::language()),
3751        );
3752        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3753        lang_registry.add(Arc::new(language));
3754
3755        // Connect to a server as 2 clients.
3756        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3757        let client_a = server.create_client(cx_a, "user_a").await;
3758        let client_b = server.create_client(cx_b, "user_b").await;
3759        server
3760            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3761            .await;
3762
3763        // Share a project as client A
3764        let project_a = cx_a.update(|cx| {
3765            Project::local(
3766                client_a.clone(),
3767                client_a.user_store.clone(),
3768                lang_registry.clone(),
3769                fs.clone(),
3770                cx,
3771            )
3772        });
3773        let (worktree_a, _) = project_a
3774            .update(cx_a, |p, cx| {
3775                p.find_or_create_local_worktree("/root-1", true, cx)
3776            })
3777            .await
3778            .unwrap();
3779        worktree_a
3780            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3781            .await;
3782        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3783        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3784        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3785
3786        // Join the worktree as client B.
3787        let project_b = Project::remote(
3788            project_id,
3789            client_b.clone(),
3790            client_b.user_store.clone(),
3791            lang_registry.clone(),
3792            fs.clone(),
3793            &mut cx_b.to_async(),
3794        )
3795        .await
3796        .unwrap();
3797
3798        // Open the file on client B.
3799        let buffer_b = cx_b
3800            .background()
3801            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3802            .await
3803            .unwrap();
3804
3805        // Request document highlights as the guest.
3806        let fake_language_server = fake_language_servers.next().await.unwrap();
3807        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3808            |params, _| async move {
3809                assert_eq!(
3810                    params
3811                        .text_document_position_params
3812                        .text_document
3813                        .uri
3814                        .as_str(),
3815                    "file:///root-1/main.rs"
3816                );
3817                assert_eq!(
3818                    params.text_document_position_params.position,
3819                    lsp::Position::new(0, 34)
3820                );
3821                Ok(Some(vec![
3822                    lsp::DocumentHighlight {
3823                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3824                        range: lsp::Range::new(
3825                            lsp::Position::new(0, 10),
3826                            lsp::Position::new(0, 16),
3827                        ),
3828                    },
3829                    lsp::DocumentHighlight {
3830                        kind: Some(lsp::DocumentHighlightKind::READ),
3831                        range: lsp::Range::new(
3832                            lsp::Position::new(0, 32),
3833                            lsp::Position::new(0, 38),
3834                        ),
3835                    },
3836                    lsp::DocumentHighlight {
3837                        kind: Some(lsp::DocumentHighlightKind::READ),
3838                        range: lsp::Range::new(
3839                            lsp::Position::new(0, 41),
3840                            lsp::Position::new(0, 47),
3841                        ),
3842                    },
3843                ]))
3844            },
3845        );
3846
3847        let highlights = project_b
3848            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3849            .await
3850            .unwrap();
3851        buffer_b.read_with(cx_b, |buffer, _| {
3852            let snapshot = buffer.snapshot();
3853
3854            let highlights = highlights
3855                .into_iter()
3856                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3857                .collect::<Vec<_>>();
3858            assert_eq!(
3859                highlights,
3860                &[
3861                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3862                    (lsp::DocumentHighlightKind::READ, 32..38),
3863                    (lsp::DocumentHighlightKind::READ, 41..47)
3864                ]
3865            )
3866        });
3867    }
3868
3869    #[gpui::test(iterations = 10)]
3870    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3871        cx_a.foreground().forbid_parking();
3872        let lang_registry = Arc::new(LanguageRegistry::test());
3873        let fs = FakeFs::new(cx_a.background());
3874        fs.insert_tree(
3875            "/code",
3876            json!({
3877                "crate-1": {
3878                    "one.rs": "const ONE: usize = 1;",
3879                },
3880                "crate-2": {
3881                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3882                },
3883                "private": {
3884                    "passwords.txt": "the-password",
3885                }
3886            }),
3887        )
3888        .await;
3889
3890        // Set up a fake language server.
3891        let mut language = Language::new(
3892            LanguageConfig {
3893                name: "Rust".into(),
3894                path_suffixes: vec!["rs".to_string()],
3895                ..Default::default()
3896            },
3897            Some(tree_sitter_rust::language()),
3898        );
3899        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3900        lang_registry.add(Arc::new(language));
3901
3902        // Connect to a server as 2 clients.
3903        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3904        let client_a = server.create_client(cx_a, "user_a").await;
3905        let client_b = server.create_client(cx_b, "user_b").await;
3906        server
3907            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3908            .await;
3909
3910        // Share a project as client A
3911        let project_a = cx_a.update(|cx| {
3912            Project::local(
3913                client_a.clone(),
3914                client_a.user_store.clone(),
3915                lang_registry.clone(),
3916                fs.clone(),
3917                cx,
3918            )
3919        });
3920        let (worktree_a, _) = project_a
3921            .update(cx_a, |p, cx| {
3922                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3923            })
3924            .await
3925            .unwrap();
3926        worktree_a
3927            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3928            .await;
3929        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3930        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3931        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3932
3933        // Join the worktree as client B.
3934        let project_b = Project::remote(
3935            project_id,
3936            client_b.clone(),
3937            client_b.user_store.clone(),
3938            lang_registry.clone(),
3939            fs.clone(),
3940            &mut cx_b.to_async(),
3941        )
3942        .await
3943        .unwrap();
3944
3945        // Cause the language server to start.
3946        let _buffer = cx_b
3947            .background()
3948            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3949            .await
3950            .unwrap();
3951
3952        let fake_language_server = fake_language_servers.next().await.unwrap();
3953        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3954            |_, _| async move {
3955                #[allow(deprecated)]
3956                Ok(Some(vec![lsp::SymbolInformation {
3957                    name: "TWO".into(),
3958                    location: lsp::Location {
3959                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3960                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3961                    },
3962                    kind: lsp::SymbolKind::CONSTANT,
3963                    tags: None,
3964                    container_name: None,
3965                    deprecated: None,
3966                }]))
3967            },
3968        );
3969
3970        // Request the definition of a symbol as the guest.
3971        let symbols = project_b
3972            .update(cx_b, |p, cx| p.symbols("two", cx))
3973            .await
3974            .unwrap();
3975        assert_eq!(symbols.len(), 1);
3976        assert_eq!(symbols[0].name, "TWO");
3977
3978        // Open one of the returned symbols.
3979        let buffer_b_2 = project_b
3980            .update(cx_b, |project, cx| {
3981                project.open_buffer_for_symbol(&symbols[0], cx)
3982            })
3983            .await
3984            .unwrap();
3985        buffer_b_2.read_with(cx_b, |buffer, _| {
3986            assert_eq!(
3987                buffer.file().unwrap().path().as_ref(),
3988                Path::new("../crate-2/two.rs")
3989            );
3990        });
3991
3992        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3993        let mut fake_symbol = symbols[0].clone();
3994        fake_symbol.path = Path::new("/code/secrets").into();
3995        let error = project_b
3996            .update(cx_b, |project, cx| {
3997                project.open_buffer_for_symbol(&fake_symbol, cx)
3998            })
3999            .await
4000            .unwrap_err();
4001        assert!(error.to_string().contains("invalid symbol signature"));
4002    }
4003
4004    #[gpui::test(iterations = 10)]
4005    async fn test_open_buffer_while_getting_definition_pointing_to_it(
4006        cx_a: &mut TestAppContext,
4007        cx_b: &mut TestAppContext,
4008        mut rng: StdRng,
4009    ) {
4010        cx_a.foreground().forbid_parking();
4011        let lang_registry = Arc::new(LanguageRegistry::test());
4012        let fs = FakeFs::new(cx_a.background());
4013        fs.insert_tree(
4014            "/root",
4015            json!({
4016                "a.rs": "const ONE: usize = b::TWO;",
4017                "b.rs": "const TWO: usize = 2",
4018            }),
4019        )
4020        .await;
4021
4022        // Set up a fake language server.
4023        let mut language = Language::new(
4024            LanguageConfig {
4025                name: "Rust".into(),
4026                path_suffixes: vec!["rs".to_string()],
4027                ..Default::default()
4028            },
4029            Some(tree_sitter_rust::language()),
4030        );
4031        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4032        lang_registry.add(Arc::new(language));
4033
4034        // Connect to a server as 2 clients.
4035        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4036        let client_a = server.create_client(cx_a, "user_a").await;
4037        let client_b = server.create_client(cx_b, "user_b").await;
4038        server
4039            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4040            .await;
4041
4042        // Share a project as client A
4043        let project_a = cx_a.update(|cx| {
4044            Project::local(
4045                client_a.clone(),
4046                client_a.user_store.clone(),
4047                lang_registry.clone(),
4048                fs.clone(),
4049                cx,
4050            )
4051        });
4052
4053        let (worktree_a, _) = project_a
4054            .update(cx_a, |p, cx| {
4055                p.find_or_create_local_worktree("/root", true, cx)
4056            })
4057            .await
4058            .unwrap();
4059        worktree_a
4060            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4061            .await;
4062        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4063        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4064        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4065
4066        // Join the worktree as client B.
4067        let project_b = Project::remote(
4068            project_id,
4069            client_b.clone(),
4070            client_b.user_store.clone(),
4071            lang_registry.clone(),
4072            fs.clone(),
4073            &mut cx_b.to_async(),
4074        )
4075        .await
4076        .unwrap();
4077
4078        let buffer_b1 = cx_b
4079            .background()
4080            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4081            .await
4082            .unwrap();
4083
4084        let fake_language_server = fake_language_servers.next().await.unwrap();
4085        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4086            |_, _| async move {
4087                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4088                    lsp::Location::new(
4089                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
4090                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4091                    ),
4092                )))
4093            },
4094        );
4095
4096        let definitions;
4097        let buffer_b2;
4098        if rng.gen() {
4099            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4100            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4101        } else {
4102            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4103            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4104        }
4105
4106        let buffer_b2 = buffer_b2.await.unwrap();
4107        let definitions = definitions.await.unwrap();
4108        assert_eq!(definitions.len(), 1);
4109        assert_eq!(definitions[0].buffer, buffer_b2);
4110    }
4111
4112    #[gpui::test(iterations = 10)]
4113    async fn test_collaborating_with_code_actions(
4114        cx_a: &mut TestAppContext,
4115        cx_b: &mut TestAppContext,
4116    ) {
4117        cx_a.foreground().forbid_parking();
4118        let lang_registry = Arc::new(LanguageRegistry::test());
4119        let fs = FakeFs::new(cx_a.background());
4120        cx_b.update(|cx| editor::init(cx));
4121
4122        // Set up a fake language server.
4123        let mut language = Language::new(
4124            LanguageConfig {
4125                name: "Rust".into(),
4126                path_suffixes: vec!["rs".to_string()],
4127                ..Default::default()
4128            },
4129            Some(tree_sitter_rust::language()),
4130        );
4131        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4132        lang_registry.add(Arc::new(language));
4133
4134        // Connect to a server as 2 clients.
4135        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4136        let client_a = server.create_client(cx_a, "user_a").await;
4137        let client_b = server.create_client(cx_b, "user_b").await;
4138        server
4139            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4140            .await;
4141
4142        // Share a project as client A
4143        fs.insert_tree(
4144            "/a",
4145            json!({
4146                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4147                "other.rs": "pub fn foo() -> usize { 4 }",
4148            }),
4149        )
4150        .await;
4151        let project_a = cx_a.update(|cx| {
4152            Project::local(
4153                client_a.clone(),
4154                client_a.user_store.clone(),
4155                lang_registry.clone(),
4156                fs.clone(),
4157                cx,
4158            )
4159        });
4160        let (worktree_a, _) = project_a
4161            .update(cx_a, |p, cx| {
4162                p.find_or_create_local_worktree("/a", true, cx)
4163            })
4164            .await
4165            .unwrap();
4166        worktree_a
4167            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4168            .await;
4169        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4170        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4171        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4172
4173        // Join the worktree as client B.
4174        let project_b = Project::remote(
4175            project_id,
4176            client_b.clone(),
4177            client_b.user_store.clone(),
4178            lang_registry.clone(),
4179            fs.clone(),
4180            &mut cx_b.to_async(),
4181        )
4182        .await
4183        .unwrap();
4184        let mut params = cx_b.update(WorkspaceParams::test);
4185        params.languages = lang_registry.clone();
4186        params.client = client_b.client.clone();
4187        params.user_store = client_b.user_store.clone();
4188        params.project = project_b;
4189
4190        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4191        let editor_b = workspace_b
4192            .update(cx_b, |workspace, cx| {
4193                workspace.open_path((worktree_id, "main.rs"), true, cx)
4194            })
4195            .await
4196            .unwrap()
4197            .downcast::<Editor>()
4198            .unwrap();
4199
4200        let mut fake_language_server = fake_language_servers.next().await.unwrap();
4201        fake_language_server
4202            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4203                assert_eq!(
4204                    params.text_document.uri,
4205                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4206                );
4207                assert_eq!(params.range.start, lsp::Position::new(0, 0));
4208                assert_eq!(params.range.end, lsp::Position::new(0, 0));
4209                Ok(None)
4210            })
4211            .next()
4212            .await;
4213
4214        // Move cursor to a location that contains code actions.
4215        editor_b.update(cx_b, |editor, cx| {
4216            editor.change_selections(None, cx, |s| {
4217                s.select_ranges([Point::new(1, 31)..Point::new(1, 31)])
4218            });
4219            cx.focus(&editor_b);
4220        });
4221
4222        fake_language_server
4223            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4224                assert_eq!(
4225                    params.text_document.uri,
4226                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4227                );
4228                assert_eq!(params.range.start, lsp::Position::new(1, 31));
4229                assert_eq!(params.range.end, lsp::Position::new(1, 31));
4230
4231                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4232                    lsp::CodeAction {
4233                        title: "Inline into all callers".to_string(),
4234                        edit: Some(lsp::WorkspaceEdit {
4235                            changes: Some(
4236                                [
4237                                    (
4238                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
4239                                        vec![lsp::TextEdit::new(
4240                                            lsp::Range::new(
4241                                                lsp::Position::new(1, 22),
4242                                                lsp::Position::new(1, 34),
4243                                            ),
4244                                            "4".to_string(),
4245                                        )],
4246                                    ),
4247                                    (
4248                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
4249                                        vec![lsp::TextEdit::new(
4250                                            lsp::Range::new(
4251                                                lsp::Position::new(0, 0),
4252                                                lsp::Position::new(0, 27),
4253                                            ),
4254                                            "".to_string(),
4255                                        )],
4256                                    ),
4257                                ]
4258                                .into_iter()
4259                                .collect(),
4260                            ),
4261                            ..Default::default()
4262                        }),
4263                        data: Some(json!({
4264                            "codeActionParams": {
4265                                "range": {
4266                                    "start": {"line": 1, "column": 31},
4267                                    "end": {"line": 1, "column": 31},
4268                                }
4269                            }
4270                        })),
4271                        ..Default::default()
4272                    },
4273                )]))
4274            })
4275            .next()
4276            .await;
4277
4278        // Toggle code actions and wait for them to display.
4279        editor_b.update(cx_b, |editor, cx| {
4280            editor.toggle_code_actions(
4281                &ToggleCodeActions {
4282                    deployed_from_indicator: false,
4283                },
4284                cx,
4285            );
4286        });
4287        editor_b
4288            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4289            .await;
4290
4291        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4292
4293        // Confirming the code action will trigger a resolve request.
4294        let confirm_action = workspace_b
4295            .update(cx_b, |workspace, cx| {
4296                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4297            })
4298            .unwrap();
4299        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4300            |_, _| async move {
4301                Ok(lsp::CodeAction {
4302                    title: "Inline into all callers".to_string(),
4303                    edit: Some(lsp::WorkspaceEdit {
4304                        changes: Some(
4305                            [
4306                                (
4307                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4308                                    vec![lsp::TextEdit::new(
4309                                        lsp::Range::new(
4310                                            lsp::Position::new(1, 22),
4311                                            lsp::Position::new(1, 34),
4312                                        ),
4313                                        "4".to_string(),
4314                                    )],
4315                                ),
4316                                (
4317                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4318                                    vec![lsp::TextEdit::new(
4319                                        lsp::Range::new(
4320                                            lsp::Position::new(0, 0),
4321                                            lsp::Position::new(0, 27),
4322                                        ),
4323                                        "".to_string(),
4324                                    )],
4325                                ),
4326                            ]
4327                            .into_iter()
4328                            .collect(),
4329                        ),
4330                        ..Default::default()
4331                    }),
4332                    ..Default::default()
4333                })
4334            },
4335        );
4336
4337        // After the action is confirmed, an editor containing both modified files is opened.
4338        confirm_action.await.unwrap();
4339        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4340            workspace
4341                .active_item(cx)
4342                .unwrap()
4343                .downcast::<Editor>()
4344                .unwrap()
4345        });
4346        code_action_editor.update(cx_b, |editor, cx| {
4347            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4348            editor.undo(&Undo, cx);
4349            assert_eq!(
4350                editor.text(cx),
4351                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4352            );
4353            editor.redo(&Redo, cx);
4354            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4355        });
4356    }
4357
4358    #[gpui::test(iterations = 10)]
4359    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4360        cx_a.foreground().forbid_parking();
4361        let lang_registry = Arc::new(LanguageRegistry::test());
4362        let fs = FakeFs::new(cx_a.background());
4363        cx_b.update(|cx| editor::init(cx));
4364
4365        // Set up a fake language server.
4366        let mut language = Language::new(
4367            LanguageConfig {
4368                name: "Rust".into(),
4369                path_suffixes: vec!["rs".to_string()],
4370                ..Default::default()
4371            },
4372            Some(tree_sitter_rust::language()),
4373        );
4374        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4375            capabilities: lsp::ServerCapabilities {
4376                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4377                    prepare_provider: Some(true),
4378                    work_done_progress_options: Default::default(),
4379                })),
4380                ..Default::default()
4381            },
4382            ..Default::default()
4383        });
4384        lang_registry.add(Arc::new(language));
4385
4386        // Connect to a server as 2 clients.
4387        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4388        let client_a = server.create_client(cx_a, "user_a").await;
4389        let client_b = server.create_client(cx_b, "user_b").await;
4390        server
4391            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4392            .await;
4393
4394        // Share a project as client A
4395        fs.insert_tree(
4396            "/dir",
4397            json!({
4398                "one.rs": "const ONE: usize = 1;",
4399                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4400            }),
4401        )
4402        .await;
4403        let project_a = cx_a.update(|cx| {
4404            Project::local(
4405                client_a.clone(),
4406                client_a.user_store.clone(),
4407                lang_registry.clone(),
4408                fs.clone(),
4409                cx,
4410            )
4411        });
4412        let (worktree_a, _) = project_a
4413            .update(cx_a, |p, cx| {
4414                p.find_or_create_local_worktree("/dir", true, cx)
4415            })
4416            .await
4417            .unwrap();
4418        worktree_a
4419            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4420            .await;
4421        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4422        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4423        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4424
4425        // Join the worktree as client B.
4426        let project_b = Project::remote(
4427            project_id,
4428            client_b.clone(),
4429            client_b.user_store.clone(),
4430            lang_registry.clone(),
4431            fs.clone(),
4432            &mut cx_b.to_async(),
4433        )
4434        .await
4435        .unwrap();
4436        let mut params = cx_b.update(WorkspaceParams::test);
4437        params.languages = lang_registry.clone();
4438        params.client = client_b.client.clone();
4439        params.user_store = client_b.user_store.clone();
4440        params.project = project_b;
4441
4442        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4443        let editor_b = workspace_b
4444            .update(cx_b, |workspace, cx| {
4445                workspace.open_path((worktree_id, "one.rs"), true, cx)
4446            })
4447            .await
4448            .unwrap()
4449            .downcast::<Editor>()
4450            .unwrap();
4451        let fake_language_server = fake_language_servers.next().await.unwrap();
4452
4453        // Move cursor to a location that can be renamed.
4454        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4455            editor.change_selections(None, cx, |s| s.select_ranges([7..7]));
4456            editor.rename(&Rename, cx).unwrap()
4457        });
4458
4459        fake_language_server
4460            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4461                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4462                assert_eq!(params.position, lsp::Position::new(0, 7));
4463                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4464                    lsp::Position::new(0, 6),
4465                    lsp::Position::new(0, 9),
4466                ))))
4467            })
4468            .next()
4469            .await
4470            .unwrap();
4471        prepare_rename.await.unwrap();
4472        editor_b.update(cx_b, |editor, cx| {
4473            let rename = editor.pending_rename().unwrap();
4474            let buffer = editor.buffer().read(cx).snapshot(cx);
4475            assert_eq!(
4476                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4477                6..9
4478            );
4479            rename.editor.update(cx, |rename_editor, cx| {
4480                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4481                    rename_buffer.edit([(0..3, "THREE")], cx);
4482                });
4483            });
4484        });
4485
4486        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4487            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4488        });
4489        fake_language_server
4490            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4491                assert_eq!(
4492                    params.text_document_position.text_document.uri.as_str(),
4493                    "file:///dir/one.rs"
4494                );
4495                assert_eq!(
4496                    params.text_document_position.position,
4497                    lsp::Position::new(0, 6)
4498                );
4499                assert_eq!(params.new_name, "THREE");
4500                Ok(Some(lsp::WorkspaceEdit {
4501                    changes: Some(
4502                        [
4503                            (
4504                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4505                                vec![lsp::TextEdit::new(
4506                                    lsp::Range::new(
4507                                        lsp::Position::new(0, 6),
4508                                        lsp::Position::new(0, 9),
4509                                    ),
4510                                    "THREE".to_string(),
4511                                )],
4512                            ),
4513                            (
4514                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4515                                vec![
4516                                    lsp::TextEdit::new(
4517                                        lsp::Range::new(
4518                                            lsp::Position::new(0, 24),
4519                                            lsp::Position::new(0, 27),
4520                                        ),
4521                                        "THREE".to_string(),
4522                                    ),
4523                                    lsp::TextEdit::new(
4524                                        lsp::Range::new(
4525                                            lsp::Position::new(0, 35),
4526                                            lsp::Position::new(0, 38),
4527                                        ),
4528                                        "THREE".to_string(),
4529                                    ),
4530                                ],
4531                            ),
4532                        ]
4533                        .into_iter()
4534                        .collect(),
4535                    ),
4536                    ..Default::default()
4537                }))
4538            })
4539            .next()
4540            .await
4541            .unwrap();
4542        confirm_rename.await.unwrap();
4543
4544        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4545            workspace
4546                .active_item(cx)
4547                .unwrap()
4548                .downcast::<Editor>()
4549                .unwrap()
4550        });
4551        rename_editor.update(cx_b, |editor, cx| {
4552            assert_eq!(
4553                editor.text(cx),
4554                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4555            );
4556            editor.undo(&Undo, cx);
4557            assert_eq!(
4558                editor.text(cx),
4559                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4560            );
4561            editor.redo(&Redo, cx);
4562            assert_eq!(
4563                editor.text(cx),
4564                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4565            );
4566        });
4567
4568        // Ensure temporary rename edits cannot be undone/redone.
4569        editor_b.update(cx_b, |editor, cx| {
4570            editor.undo(&Undo, cx);
4571            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4572            editor.undo(&Undo, cx);
4573            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4574            editor.redo(&Redo, cx);
4575            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4576        })
4577    }
4578
4579    #[gpui::test(iterations = 10)]
4580    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4581        cx_a.foreground().forbid_parking();
4582
4583        // Connect to a server as 2 clients.
4584        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4585        let client_a = server.create_client(cx_a, "user_a").await;
4586        let client_b = server.create_client(cx_b, "user_b").await;
4587
4588        // Create an org that includes these 2 users.
4589        let db = &server.app_state.db;
4590        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4591        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4592            .await
4593            .unwrap();
4594        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4595            .await
4596            .unwrap();
4597
4598        // Create a channel that includes all the users.
4599        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4600        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4601            .await
4602            .unwrap();
4603        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4604            .await
4605            .unwrap();
4606        db.create_channel_message(
4607            channel_id,
4608            client_b.current_user_id(&cx_b),
4609            "hello A, it's B.",
4610            OffsetDateTime::now_utc(),
4611            1,
4612        )
4613        .await
4614        .unwrap();
4615
4616        let channels_a = cx_a
4617            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4618        channels_a
4619            .condition(cx_a, |list, _| list.available_channels().is_some())
4620            .await;
4621        channels_a.read_with(cx_a, |list, _| {
4622            assert_eq!(
4623                list.available_channels().unwrap(),
4624                &[ChannelDetails {
4625                    id: channel_id.to_proto(),
4626                    name: "test-channel".to_string()
4627                }]
4628            )
4629        });
4630        let channel_a = channels_a.update(cx_a, |this, cx| {
4631            this.get_channel(channel_id.to_proto(), cx).unwrap()
4632        });
4633        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4634        channel_a
4635            .condition(&cx_a, |channel, _| {
4636                channel_messages(channel)
4637                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4638            })
4639            .await;
4640
4641        let channels_b = cx_b
4642            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4643        channels_b
4644            .condition(cx_b, |list, _| list.available_channels().is_some())
4645            .await;
4646        channels_b.read_with(cx_b, |list, _| {
4647            assert_eq!(
4648                list.available_channels().unwrap(),
4649                &[ChannelDetails {
4650                    id: channel_id.to_proto(),
4651                    name: "test-channel".to_string()
4652                }]
4653            )
4654        });
4655
4656        let channel_b = channels_b.update(cx_b, |this, cx| {
4657            this.get_channel(channel_id.to_proto(), cx).unwrap()
4658        });
4659        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4660        channel_b
4661            .condition(&cx_b, |channel, _| {
4662                channel_messages(channel)
4663                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4664            })
4665            .await;
4666
4667        channel_a
4668            .update(cx_a, |channel, cx| {
4669                channel
4670                    .send_message("oh, hi B.".to_string(), cx)
4671                    .unwrap()
4672                    .detach();
4673                let task = channel.send_message("sup".to_string(), cx).unwrap();
4674                assert_eq!(
4675                    channel_messages(channel),
4676                    &[
4677                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4678                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4679                        ("user_a".to_string(), "sup".to_string(), true)
4680                    ]
4681                );
4682                task
4683            })
4684            .await
4685            .unwrap();
4686
4687        channel_b
4688            .condition(&cx_b, |channel, _| {
4689                channel_messages(channel)
4690                    == [
4691                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4692                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4693                        ("user_a".to_string(), "sup".to_string(), false),
4694                    ]
4695            })
4696            .await;
4697
4698        assert_eq!(
4699            server
4700                .state()
4701                .await
4702                .channel(channel_id)
4703                .unwrap()
4704                .connection_ids
4705                .len(),
4706            2
4707        );
4708        cx_b.update(|_| drop(channel_b));
4709        server
4710            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4711            .await;
4712
4713        cx_a.update(|_| drop(channel_a));
4714        server
4715            .condition(|state| state.channel(channel_id).is_none())
4716            .await;
4717    }
4718
4719    #[gpui::test(iterations = 10)]
4720    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4721        cx_a.foreground().forbid_parking();
4722
4723        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4724        let client_a = server.create_client(cx_a, "user_a").await;
4725
4726        let db = &server.app_state.db;
4727        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4728        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4729        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4730            .await
4731            .unwrap();
4732        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4733            .await
4734            .unwrap();
4735
4736        let channels_a = cx_a
4737            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4738        channels_a
4739            .condition(cx_a, |list, _| list.available_channels().is_some())
4740            .await;
4741        let channel_a = channels_a.update(cx_a, |this, cx| {
4742            this.get_channel(channel_id.to_proto(), cx).unwrap()
4743        });
4744
4745        // Messages aren't allowed to be too long.
4746        channel_a
4747            .update(cx_a, |channel, cx| {
4748                let long_body = "this is long.\n".repeat(1024);
4749                channel.send_message(long_body, cx).unwrap()
4750            })
4751            .await
4752            .unwrap_err();
4753
4754        // Messages aren't allowed to be blank.
4755        channel_a.update(cx_a, |channel, cx| {
4756            channel.send_message(String::new(), cx).unwrap_err()
4757        });
4758
4759        // Leading and trailing whitespace are trimmed.
4760        channel_a
4761            .update(cx_a, |channel, cx| {
4762                channel
4763                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4764                    .unwrap()
4765            })
4766            .await
4767            .unwrap();
4768        assert_eq!(
4769            db.get_channel_messages(channel_id, 10, None)
4770                .await
4771                .unwrap()
4772                .iter()
4773                .map(|m| &m.body)
4774                .collect::<Vec<_>>(),
4775            &["surrounded by whitespace"]
4776        );
4777    }
4778
4779    #[gpui::test(iterations = 10)]
4780    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4781        cx_a.foreground().forbid_parking();
4782
4783        // Connect to a server as 2 clients.
4784        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4785        let client_a = server.create_client(cx_a, "user_a").await;
4786        let client_b = server.create_client(cx_b, "user_b").await;
4787        let mut status_b = client_b.status();
4788
4789        // Create an org that includes these 2 users.
4790        let db = &server.app_state.db;
4791        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4792        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4793            .await
4794            .unwrap();
4795        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4796            .await
4797            .unwrap();
4798
4799        // Create a channel that includes all the users.
4800        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4801        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4802            .await
4803            .unwrap();
4804        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4805            .await
4806            .unwrap();
4807        db.create_channel_message(
4808            channel_id,
4809            client_b.current_user_id(&cx_b),
4810            "hello A, it's B.",
4811            OffsetDateTime::now_utc(),
4812            2,
4813        )
4814        .await
4815        .unwrap();
4816
4817        let channels_a = cx_a
4818            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4819        channels_a
4820            .condition(cx_a, |list, _| list.available_channels().is_some())
4821            .await;
4822
4823        channels_a.read_with(cx_a, |list, _| {
4824            assert_eq!(
4825                list.available_channels().unwrap(),
4826                &[ChannelDetails {
4827                    id: channel_id.to_proto(),
4828                    name: "test-channel".to_string()
4829                }]
4830            )
4831        });
4832        let channel_a = channels_a.update(cx_a, |this, cx| {
4833            this.get_channel(channel_id.to_proto(), cx).unwrap()
4834        });
4835        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4836        channel_a
4837            .condition(&cx_a, |channel, _| {
4838                channel_messages(channel)
4839                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4840            })
4841            .await;
4842
4843        let channels_b = cx_b
4844            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4845        channels_b
4846            .condition(cx_b, |list, _| list.available_channels().is_some())
4847            .await;
4848        channels_b.read_with(cx_b, |list, _| {
4849            assert_eq!(
4850                list.available_channels().unwrap(),
4851                &[ChannelDetails {
4852                    id: channel_id.to_proto(),
4853                    name: "test-channel".to_string()
4854                }]
4855            )
4856        });
4857
4858        let channel_b = channels_b.update(cx_b, |this, cx| {
4859            this.get_channel(channel_id.to_proto(), cx).unwrap()
4860        });
4861        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4862        channel_b
4863            .condition(&cx_b, |channel, _| {
4864                channel_messages(channel)
4865                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4866            })
4867            .await;
4868
4869        // Disconnect client B, ensuring we can still access its cached channel data.
4870        server.forbid_connections();
4871        server.disconnect_client(client_b.current_user_id(&cx_b));
4872        cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4873        while !matches!(
4874            status_b.next().await,
4875            Some(client::Status::ReconnectionError { .. })
4876        ) {}
4877
4878        channels_b.read_with(cx_b, |channels, _| {
4879            assert_eq!(
4880                channels.available_channels().unwrap(),
4881                [ChannelDetails {
4882                    id: channel_id.to_proto(),
4883                    name: "test-channel".to_string()
4884                }]
4885            )
4886        });
4887        channel_b.read_with(cx_b, |channel, _| {
4888            assert_eq!(
4889                channel_messages(channel),
4890                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4891            )
4892        });
4893
4894        // Send a message from client B while it is disconnected.
4895        channel_b
4896            .update(cx_b, |channel, cx| {
4897                let task = channel
4898                    .send_message("can you see this?".to_string(), cx)
4899                    .unwrap();
4900                assert_eq!(
4901                    channel_messages(channel),
4902                    &[
4903                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4904                        ("user_b".to_string(), "can you see this?".to_string(), true)
4905                    ]
4906                );
4907                task
4908            })
4909            .await
4910            .unwrap_err();
4911
4912        // Send a message from client A while B is disconnected.
4913        channel_a
4914            .update(cx_a, |channel, cx| {
4915                channel
4916                    .send_message("oh, hi B.".to_string(), cx)
4917                    .unwrap()
4918                    .detach();
4919                let task = channel.send_message("sup".to_string(), cx).unwrap();
4920                assert_eq!(
4921                    channel_messages(channel),
4922                    &[
4923                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4924                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4925                        ("user_a".to_string(), "sup".to_string(), true)
4926                    ]
4927                );
4928                task
4929            })
4930            .await
4931            .unwrap();
4932
4933        // Give client B a chance to reconnect.
4934        server.allow_connections();
4935        cx_b.foreground().advance_clock(Duration::from_secs(10));
4936
4937        // Verify that B sees the new messages upon reconnection, as well as the message client B
4938        // sent while offline.
4939        channel_b
4940            .condition(&cx_b, |channel, _| {
4941                channel_messages(channel)
4942                    == [
4943                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4944                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4945                        ("user_a".to_string(), "sup".to_string(), false),
4946                        ("user_b".to_string(), "can you see this?".to_string(), false),
4947                    ]
4948            })
4949            .await;
4950
4951        // Ensure client A and B can communicate normally after reconnection.
4952        channel_a
4953            .update(cx_a, |channel, cx| {
4954                channel.send_message("you online?".to_string(), cx).unwrap()
4955            })
4956            .await
4957            .unwrap();
4958        channel_b
4959            .condition(&cx_b, |channel, _| {
4960                channel_messages(channel)
4961                    == [
4962                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4963                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4964                        ("user_a".to_string(), "sup".to_string(), false),
4965                        ("user_b".to_string(), "can you see this?".to_string(), false),
4966                        ("user_a".to_string(), "you online?".to_string(), false),
4967                    ]
4968            })
4969            .await;
4970
4971        channel_b
4972            .update(cx_b, |channel, cx| {
4973                channel.send_message("yep".to_string(), cx).unwrap()
4974            })
4975            .await
4976            .unwrap();
4977        channel_a
4978            .condition(&cx_a, |channel, _| {
4979                channel_messages(channel)
4980                    == [
4981                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4982                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4983                        ("user_a".to_string(), "sup".to_string(), false),
4984                        ("user_b".to_string(), "can you see this?".to_string(), false),
4985                        ("user_a".to_string(), "you online?".to_string(), false),
4986                        ("user_b".to_string(), "yep".to_string(), false),
4987                    ]
4988            })
4989            .await;
4990    }
4991
4992    #[gpui::test(iterations = 10)]
4993    async fn test_contacts(
4994        deterministic: Arc<Deterministic>,
4995        cx_a: &mut TestAppContext,
4996        cx_b: &mut TestAppContext,
4997        cx_c: &mut TestAppContext,
4998    ) {
4999        cx_a.foreground().forbid_parking();
5000        let lang_registry = Arc::new(LanguageRegistry::test());
5001        let fs = FakeFs::new(cx_a.background());
5002
5003        // Connect to a server as 3 clients.
5004        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5005        let client_a = server.create_client(cx_a, "user_a").await;
5006        let client_b = server.create_client(cx_b, "user_b").await;
5007        let client_c = server.create_client(cx_c, "user_c").await;
5008        server
5009            .make_contacts(vec![
5010                (&client_a, cx_a),
5011                (&client_b, cx_b),
5012                (&client_c, cx_c),
5013            ])
5014            .await;
5015
5016        deterministic.run_until_parked();
5017        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5018            client.user_store.read_with(*cx, |store, _| {
5019                assert_eq!(
5020                    contacts(store),
5021                    [
5022                        ("user_a", true, vec![]),
5023                        ("user_b", true, vec![]),
5024                        ("user_c", true, vec![])
5025                    ]
5026                )
5027            });
5028        }
5029
5030        // Share a worktree as client A.
5031        fs.create_dir(Path::new("/a")).await.unwrap();
5032
5033        let project_a = cx_a.update(|cx| {
5034            Project::local(
5035                client_a.clone(),
5036                client_a.user_store.clone(),
5037                lang_registry.clone(),
5038                fs.clone(),
5039                cx,
5040            )
5041        });
5042        let (worktree_a, _) = project_a
5043            .update(cx_a, |p, cx| {
5044                p.find_or_create_local_worktree("/a", true, cx)
5045            })
5046            .await
5047            .unwrap();
5048        worktree_a
5049            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
5050            .await;
5051
5052        deterministic.run_until_parked();
5053        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5054            client.user_store.read_with(*cx, |store, _| {
5055                assert_eq!(
5056                    contacts(store),
5057                    [
5058                        ("user_a", true, vec![("a", false, vec![])]),
5059                        ("user_b", true, vec![]),
5060                        ("user_c", true, vec![])
5061                    ]
5062                )
5063            });
5064        }
5065
5066        let project_id = project_a
5067            .update(cx_a, |project, _| project.next_remote_id())
5068            .await;
5069        project_a
5070            .update(cx_a, |project, cx| project.share(cx))
5071            .await
5072            .unwrap();
5073
5074        deterministic.run_until_parked();
5075        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5076            client.user_store.read_with(*cx, |store, _| {
5077                assert_eq!(
5078                    contacts(store),
5079                    [
5080                        ("user_a", true, vec![("a", true, vec![])]),
5081                        ("user_b", true, vec![]),
5082                        ("user_c", true, vec![])
5083                    ]
5084                )
5085            });
5086        }
5087
5088        let _project_b = Project::remote(
5089            project_id,
5090            client_b.clone(),
5091            client_b.user_store.clone(),
5092            lang_registry.clone(),
5093            fs.clone(),
5094            &mut cx_b.to_async(),
5095        )
5096        .await
5097        .unwrap();
5098
5099        deterministic.run_until_parked();
5100        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5101            client.user_store.read_with(*cx, |store, _| {
5102                assert_eq!(
5103                    contacts(store),
5104                    [
5105                        ("user_a", true, vec![("a", true, vec!["user_b"])]),
5106                        ("user_b", true, vec![]),
5107                        ("user_c", true, vec![])
5108                    ]
5109                )
5110            });
5111        }
5112
5113        project_a
5114            .condition(&cx_a, |project, _| {
5115                project.collaborators().contains_key(&client_b.peer_id)
5116            })
5117            .await;
5118
5119        cx_a.update(move |_| drop(project_a));
5120        deterministic.run_until_parked();
5121        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5122            client.user_store.read_with(*cx, |store, _| {
5123                assert_eq!(
5124                    contacts(store),
5125                    [
5126                        ("user_a", true, vec![]),
5127                        ("user_b", true, vec![]),
5128                        ("user_c", true, vec![])
5129                    ]
5130                )
5131            });
5132        }
5133
5134        server.disconnect_client(client_c.current_user_id(cx_c));
5135        server.forbid_connections();
5136        deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5137        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5138            client.user_store.read_with(*cx, |store, _| {
5139                assert_eq!(
5140                    contacts(store),
5141                    [
5142                        ("user_a", true, vec![]),
5143                        ("user_b", true, vec![]),
5144                        ("user_c", false, vec![])
5145                    ]
5146                )
5147            });
5148        }
5149        client_c
5150            .user_store
5151            .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5152
5153        server.allow_connections();
5154        client_c
5155            .authenticate_and_connect(false, &cx_c.to_async())
5156            .await
5157            .unwrap();
5158
5159        deterministic.run_until_parked();
5160        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5161            client.user_store.read_with(*cx, |store, _| {
5162                assert_eq!(
5163                    contacts(store),
5164                    [
5165                        ("user_a", true, vec![]),
5166                        ("user_b", true, vec![]),
5167                        ("user_c", true, vec![])
5168                    ]
5169                )
5170            });
5171        }
5172
5173        fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, bool, Vec<&str>)>)> {
5174            user_store
5175                .contacts()
5176                .iter()
5177                .map(|contact| {
5178                    let worktrees = contact
5179                        .projects
5180                        .iter()
5181                        .map(|p| {
5182                            (
5183                                p.worktree_root_names[0].as_str(),
5184                                p.is_shared,
5185                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5186                            )
5187                        })
5188                        .collect();
5189                    (
5190                        contact.user.github_login.as_str(),
5191                        contact.online,
5192                        worktrees,
5193                    )
5194                })
5195                .collect()
5196        }
5197    }
5198
5199    #[gpui::test(iterations = 10)]
5200    async fn test_contact_requests(
5201        executor: Arc<Deterministic>,
5202        cx_a: &mut TestAppContext,
5203        cx_a2: &mut TestAppContext,
5204        cx_b: &mut TestAppContext,
5205        cx_b2: &mut TestAppContext,
5206        cx_c: &mut TestAppContext,
5207        cx_c2: &mut TestAppContext,
5208    ) {
5209        cx_a.foreground().forbid_parking();
5210
5211        // Connect to a server as 3 clients.
5212        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5213        let client_a = server.create_client(cx_a, "user_a").await;
5214        let client_a2 = server.create_client(cx_a2, "user_a").await;
5215        let client_b = server.create_client(cx_b, "user_b").await;
5216        let client_b2 = server.create_client(cx_b2, "user_b").await;
5217        let client_c = server.create_client(cx_c, "user_c").await;
5218        let client_c2 = server.create_client(cx_c2, "user_c").await;
5219
5220        assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5221        assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5222        assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5223
5224        // User A and User C request that user B become their contact.
5225        client_a
5226            .user_store
5227            .update(cx_a, |store, cx| {
5228                store.request_contact(client_b.user_id().unwrap(), cx)
5229            })
5230            .await
5231            .unwrap();
5232        client_c
5233            .user_store
5234            .update(cx_c, |store, cx| {
5235                store.request_contact(client_b.user_id().unwrap(), cx)
5236            })
5237            .await
5238            .unwrap();
5239        executor.run_until_parked();
5240
5241        // All users see the pending request appear in all their clients.
5242        assert_eq!(
5243            client_a.summarize_contacts(&cx_a).outgoing_requests,
5244            &["user_b"]
5245        );
5246        assert_eq!(
5247            client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5248            &["user_b"]
5249        );
5250        assert_eq!(
5251            client_b.summarize_contacts(&cx_b).incoming_requests,
5252            &["user_a", "user_c"]
5253        );
5254        assert_eq!(
5255            client_b2.summarize_contacts(&cx_b2).incoming_requests,
5256            &["user_a", "user_c"]
5257        );
5258        assert_eq!(
5259            client_c.summarize_contacts(&cx_c).outgoing_requests,
5260            &["user_b"]
5261        );
5262        assert_eq!(
5263            client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5264            &["user_b"]
5265        );
5266
5267        // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5268        disconnect_and_reconnect(&client_a, cx_a).await;
5269        disconnect_and_reconnect(&client_b, cx_b).await;
5270        disconnect_and_reconnect(&client_c, cx_c).await;
5271        executor.run_until_parked();
5272        assert_eq!(
5273            client_a.summarize_contacts(&cx_a).outgoing_requests,
5274            &["user_b"]
5275        );
5276        assert_eq!(
5277            client_b.summarize_contacts(&cx_b).incoming_requests,
5278            &["user_a", "user_c"]
5279        );
5280        assert_eq!(
5281            client_c.summarize_contacts(&cx_c).outgoing_requests,
5282            &["user_b"]
5283        );
5284
5285        // User B accepts the request from user A.
5286        client_b
5287            .user_store
5288            .update(cx_b, |store, cx| {
5289                store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5290            })
5291            .await
5292            .unwrap();
5293
5294        executor.run_until_parked();
5295
5296        // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5297        let contacts_b = client_b.summarize_contacts(&cx_b);
5298        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5299        assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5300        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5301        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5302        assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5303
5304        // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5305        let contacts_a = client_a.summarize_contacts(&cx_a);
5306        assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5307        assert!(contacts_a.outgoing_requests.is_empty());
5308        let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5309        assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5310        assert!(contacts_a2.outgoing_requests.is_empty());
5311
5312        // Contacts are present upon connecting (tested here via disconnect/reconnect)
5313        disconnect_and_reconnect(&client_a, cx_a).await;
5314        disconnect_and_reconnect(&client_b, cx_b).await;
5315        disconnect_and_reconnect(&client_c, cx_c).await;
5316        executor.run_until_parked();
5317        assert_eq!(
5318            client_a.summarize_contacts(&cx_a).current,
5319            &["user_a", "user_b"]
5320        );
5321        assert_eq!(
5322            client_b.summarize_contacts(&cx_b).current,
5323            &["user_a", "user_b"]
5324        );
5325        assert_eq!(
5326            client_b.summarize_contacts(&cx_b).incoming_requests,
5327            &["user_c"]
5328        );
5329        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5330        assert_eq!(
5331            client_c.summarize_contacts(&cx_c).outgoing_requests,
5332            &["user_b"]
5333        );
5334
5335        // User B rejects the request from user C.
5336        client_b
5337            .user_store
5338            .update(cx_b, |store, cx| {
5339                store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5340            })
5341            .await
5342            .unwrap();
5343
5344        executor.run_until_parked();
5345
5346        // User B doesn't see user C as their contact, and the incoming request from them is removed.
5347        let contacts_b = client_b.summarize_contacts(&cx_b);
5348        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5349        assert!(contacts_b.incoming_requests.is_empty());
5350        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5351        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5352        assert!(contacts_b2.incoming_requests.is_empty());
5353
5354        // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5355        let contacts_c = client_c.summarize_contacts(&cx_c);
5356        assert_eq!(contacts_c.current, &["user_c"]);
5357        assert!(contacts_c.outgoing_requests.is_empty());
5358        let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5359        assert_eq!(contacts_c2.current, &["user_c"]);
5360        assert!(contacts_c2.outgoing_requests.is_empty());
5361
5362        // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5363        disconnect_and_reconnect(&client_a, cx_a).await;
5364        disconnect_and_reconnect(&client_b, cx_b).await;
5365        disconnect_and_reconnect(&client_c, cx_c).await;
5366        executor.run_until_parked();
5367        assert_eq!(
5368            client_a.summarize_contacts(&cx_a).current,
5369            &["user_a", "user_b"]
5370        );
5371        assert_eq!(
5372            client_b.summarize_contacts(&cx_b).current,
5373            &["user_a", "user_b"]
5374        );
5375        assert!(client_b
5376            .summarize_contacts(&cx_b)
5377            .incoming_requests
5378            .is_empty());
5379        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5380        assert!(client_c
5381            .summarize_contacts(&cx_c)
5382            .outgoing_requests
5383            .is_empty());
5384
5385        async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5386            client.disconnect(&cx.to_async()).unwrap();
5387            client.clear_contacts(cx).await;
5388            client
5389                .authenticate_and_connect(false, &cx.to_async())
5390                .await
5391                .unwrap();
5392        }
5393    }
5394
5395    #[gpui::test(iterations = 10)]
5396    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5397        cx_a.foreground().forbid_parking();
5398        let fs = FakeFs::new(cx_a.background());
5399
5400        // 2 clients connect to a server.
5401        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5402        let mut client_a = server.create_client(cx_a, "user_a").await;
5403        let mut client_b = server.create_client(cx_b, "user_b").await;
5404        server
5405            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5406            .await;
5407        cx_a.update(editor::init);
5408        cx_b.update(editor::init);
5409
5410        // Client A shares a project.
5411        fs.insert_tree(
5412            "/a",
5413            json!({
5414                "1.txt": "one",
5415                "2.txt": "two",
5416                "3.txt": "three",
5417            }),
5418        )
5419        .await;
5420        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5421        project_a
5422            .update(cx_a, |project, cx| project.share(cx))
5423            .await
5424            .unwrap();
5425
5426        // Client B joins the project.
5427        let project_b = client_b
5428            .build_remote_project(
5429                project_a
5430                    .read_with(cx_a, |project, _| project.remote_id())
5431                    .unwrap(),
5432                cx_b,
5433            )
5434            .await;
5435
5436        // Client A opens some editors.
5437        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5438        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5439        let editor_a1 = workspace_a
5440            .update(cx_a, |workspace, cx| {
5441                workspace.open_path((worktree_id, "1.txt"), true, cx)
5442            })
5443            .await
5444            .unwrap()
5445            .downcast::<Editor>()
5446            .unwrap();
5447        let editor_a2 = workspace_a
5448            .update(cx_a, |workspace, cx| {
5449                workspace.open_path((worktree_id, "2.txt"), true, cx)
5450            })
5451            .await
5452            .unwrap()
5453            .downcast::<Editor>()
5454            .unwrap();
5455
5456        // Client B opens an editor.
5457        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5458        let editor_b1 = workspace_b
5459            .update(cx_b, |workspace, cx| {
5460                workspace.open_path((worktree_id, "1.txt"), true, cx)
5461            })
5462            .await
5463            .unwrap()
5464            .downcast::<Editor>()
5465            .unwrap();
5466
5467        let client_a_id = project_b.read_with(cx_b, |project, _| {
5468            project.collaborators().values().next().unwrap().peer_id
5469        });
5470        let client_b_id = project_a.read_with(cx_a, |project, _| {
5471            project.collaborators().values().next().unwrap().peer_id
5472        });
5473
5474        // When client B starts following client A, all visible view states are replicated to client B.
5475        editor_a1.update(cx_a, |editor, cx| {
5476            editor.change_selections(None, cx, |s| s.select_ranges([0..1]))
5477        });
5478        editor_a2.update(cx_a, |editor, cx| {
5479            editor.change_selections(None, cx, |s| s.select_ranges([2..3]))
5480        });
5481        workspace_b
5482            .update(cx_b, |workspace, cx| {
5483                workspace
5484                    .toggle_follow(&ToggleFollow(client_a_id), cx)
5485                    .unwrap()
5486            })
5487            .await
5488            .unwrap();
5489        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5490            workspace
5491                .active_item(cx)
5492                .unwrap()
5493                .downcast::<Editor>()
5494                .unwrap()
5495        });
5496        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5497        assert_eq!(
5498            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5499            Some((worktree_id, "2.txt").into())
5500        );
5501        assert_eq!(
5502            editor_b2.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5503            vec![2..3]
5504        );
5505        assert_eq!(
5506            editor_b1.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5507            vec![0..1]
5508        );
5509
5510        // When client A activates a different editor, client B does so as well.
5511        workspace_a.update(cx_a, |workspace, cx| {
5512            workspace.activate_item(&editor_a1, cx)
5513        });
5514        workspace_b
5515            .condition(cx_b, |workspace, cx| {
5516                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5517            })
5518            .await;
5519
5520        // When client A navigates back and forth, client B does so as well.
5521        workspace_a
5522            .update(cx_a, |workspace, cx| {
5523                workspace::Pane::go_back(workspace, None, cx)
5524            })
5525            .await;
5526        workspace_b
5527            .condition(cx_b, |workspace, cx| {
5528                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5529            })
5530            .await;
5531
5532        workspace_a
5533            .update(cx_a, |workspace, cx| {
5534                workspace::Pane::go_forward(workspace, None, cx)
5535            })
5536            .await;
5537        workspace_b
5538            .condition(cx_b, |workspace, cx| {
5539                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5540            })
5541            .await;
5542
5543        // Changes to client A's editor are reflected on client B.
5544        editor_a1.update(cx_a, |editor, cx| {
5545            editor.change_selections(None, cx, |s| s.select_ranges([1..1, 2..2]));
5546        });
5547        editor_b1
5548            .condition(cx_b, |editor, cx| {
5549                editor.selections.ranges(cx) == vec![1..1, 2..2]
5550            })
5551            .await;
5552
5553        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5554        editor_b1
5555            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5556            .await;
5557
5558        editor_a1.update(cx_a, |editor, cx| {
5559            editor.change_selections(None, cx, |s| s.select_ranges([3..3]));
5560            editor.set_scroll_position(vec2f(0., 100.), cx);
5561        });
5562        editor_b1
5563            .condition(cx_b, |editor, cx| {
5564                editor.selections.ranges(cx) == vec![3..3]
5565            })
5566            .await;
5567
5568        // After unfollowing, client B stops receiving updates from client A.
5569        workspace_b.update(cx_b, |workspace, cx| {
5570            workspace.unfollow(&workspace.active_pane().clone(), cx)
5571        });
5572        workspace_a.update(cx_a, |workspace, cx| {
5573            workspace.activate_item(&editor_a2, cx)
5574        });
5575        cx_a.foreground().run_until_parked();
5576        assert_eq!(
5577            workspace_b.read_with(cx_b, |workspace, cx| workspace
5578                .active_item(cx)
5579                .unwrap()
5580                .id()),
5581            editor_b1.id()
5582        );
5583
5584        // Client A starts following client B.
5585        workspace_a
5586            .update(cx_a, |workspace, cx| {
5587                workspace
5588                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5589                    .unwrap()
5590            })
5591            .await
5592            .unwrap();
5593        assert_eq!(
5594            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5595            Some(client_b_id)
5596        );
5597        assert_eq!(
5598            workspace_a.read_with(cx_a, |workspace, cx| workspace
5599                .active_item(cx)
5600                .unwrap()
5601                .id()),
5602            editor_a1.id()
5603        );
5604
5605        // Following interrupts when client B disconnects.
5606        client_b.disconnect(&cx_b.to_async()).unwrap();
5607        cx_a.foreground().run_until_parked();
5608        assert_eq!(
5609            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5610            None
5611        );
5612    }
5613
5614    #[gpui::test(iterations = 10)]
5615    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5616        cx_a.foreground().forbid_parking();
5617        let fs = FakeFs::new(cx_a.background());
5618
5619        // 2 clients connect to a server.
5620        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5621        let mut client_a = server.create_client(cx_a, "user_a").await;
5622        let mut client_b = server.create_client(cx_b, "user_b").await;
5623        server
5624            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5625            .await;
5626        cx_a.update(editor::init);
5627        cx_b.update(editor::init);
5628
5629        // Client A shares a project.
5630        fs.insert_tree(
5631            "/a",
5632            json!({
5633                "1.txt": "one",
5634                "2.txt": "two",
5635                "3.txt": "three",
5636                "4.txt": "four",
5637            }),
5638        )
5639        .await;
5640        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5641        project_a
5642            .update(cx_a, |project, cx| project.share(cx))
5643            .await
5644            .unwrap();
5645
5646        // Client B joins the project.
5647        let project_b = client_b
5648            .build_remote_project(
5649                project_a
5650                    .read_with(cx_a, |project, _| project.remote_id())
5651                    .unwrap(),
5652                cx_b,
5653            )
5654            .await;
5655
5656        // Client A opens some editors.
5657        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5658        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5659        let _editor_a1 = workspace_a
5660            .update(cx_a, |workspace, cx| {
5661                workspace.open_path((worktree_id, "1.txt"), true, cx)
5662            })
5663            .await
5664            .unwrap()
5665            .downcast::<Editor>()
5666            .unwrap();
5667
5668        // Client B opens an editor.
5669        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5670        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5671        let _editor_b1 = workspace_b
5672            .update(cx_b, |workspace, cx| {
5673                workspace.open_path((worktree_id, "2.txt"), true, cx)
5674            })
5675            .await
5676            .unwrap()
5677            .downcast::<Editor>()
5678            .unwrap();
5679
5680        // Clients A and B follow each other in split panes
5681        workspace_a
5682            .update(cx_a, |workspace, cx| {
5683                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5684                assert_ne!(*workspace.active_pane(), pane_a1);
5685                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5686                workspace
5687                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5688                    .unwrap()
5689            })
5690            .await
5691            .unwrap();
5692        workspace_b
5693            .update(cx_b, |workspace, cx| {
5694                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5695                assert_ne!(*workspace.active_pane(), pane_b1);
5696                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5697                workspace
5698                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5699                    .unwrap()
5700            })
5701            .await
5702            .unwrap();
5703
5704        workspace_a
5705            .update(cx_a, |workspace, cx| {
5706                workspace.activate_next_pane(cx);
5707                assert_eq!(*workspace.active_pane(), pane_a1);
5708                workspace.open_path((worktree_id, "3.txt"), true, cx)
5709            })
5710            .await
5711            .unwrap();
5712        workspace_b
5713            .update(cx_b, |workspace, cx| {
5714                workspace.activate_next_pane(cx);
5715                assert_eq!(*workspace.active_pane(), pane_b1);
5716                workspace.open_path((worktree_id, "4.txt"), true, cx)
5717            })
5718            .await
5719            .unwrap();
5720        cx_a.foreground().run_until_parked();
5721
5722        // Ensure leader updates don't change the active pane of followers
5723        workspace_a.read_with(cx_a, |workspace, _| {
5724            assert_eq!(*workspace.active_pane(), pane_a1);
5725        });
5726        workspace_b.read_with(cx_b, |workspace, _| {
5727            assert_eq!(*workspace.active_pane(), pane_b1);
5728        });
5729
5730        // Ensure peers following each other doesn't cause an infinite loop.
5731        assert_eq!(
5732            workspace_a.read_with(cx_a, |workspace, cx| workspace
5733                .active_item(cx)
5734                .unwrap()
5735                .project_path(cx)),
5736            Some((worktree_id, "3.txt").into())
5737        );
5738        workspace_a.update(cx_a, |workspace, cx| {
5739            assert_eq!(
5740                workspace.active_item(cx).unwrap().project_path(cx),
5741                Some((worktree_id, "3.txt").into())
5742            );
5743            workspace.activate_next_pane(cx);
5744            assert_eq!(
5745                workspace.active_item(cx).unwrap().project_path(cx),
5746                Some((worktree_id, "4.txt").into())
5747            );
5748        });
5749        workspace_b.update(cx_b, |workspace, cx| {
5750            assert_eq!(
5751                workspace.active_item(cx).unwrap().project_path(cx),
5752                Some((worktree_id, "4.txt").into())
5753            );
5754            workspace.activate_next_pane(cx);
5755            assert_eq!(
5756                workspace.active_item(cx).unwrap().project_path(cx),
5757                Some((worktree_id, "3.txt").into())
5758            );
5759        });
5760    }
5761
5762    #[gpui::test(iterations = 10)]
5763    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5764        cx_a.foreground().forbid_parking();
5765        let fs = FakeFs::new(cx_a.background());
5766
5767        // 2 clients connect to a server.
5768        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5769        let mut client_a = server.create_client(cx_a, "user_a").await;
5770        let mut client_b = server.create_client(cx_b, "user_b").await;
5771        server
5772            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5773            .await;
5774        cx_a.update(editor::init);
5775        cx_b.update(editor::init);
5776
5777        // Client A shares a project.
5778        fs.insert_tree(
5779            "/a",
5780            json!({
5781                "1.txt": "one",
5782                "2.txt": "two",
5783                "3.txt": "three",
5784            }),
5785        )
5786        .await;
5787        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5788        project_a
5789            .update(cx_a, |project, cx| project.share(cx))
5790            .await
5791            .unwrap();
5792
5793        // Client B joins the project.
5794        let project_b = client_b
5795            .build_remote_project(
5796                project_a
5797                    .read_with(cx_a, |project, _| project.remote_id())
5798                    .unwrap(),
5799                cx_b,
5800            )
5801            .await;
5802
5803        // Client A opens some editors.
5804        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5805        let _editor_a1 = workspace_a
5806            .update(cx_a, |workspace, cx| {
5807                workspace.open_path((worktree_id, "1.txt"), true, cx)
5808            })
5809            .await
5810            .unwrap()
5811            .downcast::<Editor>()
5812            .unwrap();
5813
5814        // Client B starts following client A.
5815        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5816        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5817        let leader_id = project_b.read_with(cx_b, |project, _| {
5818            project.collaborators().values().next().unwrap().peer_id
5819        });
5820        workspace_b
5821            .update(cx_b, |workspace, cx| {
5822                workspace
5823                    .toggle_follow(&ToggleFollow(leader_id), cx)
5824                    .unwrap()
5825            })
5826            .await
5827            .unwrap();
5828        assert_eq!(
5829            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5830            Some(leader_id)
5831        );
5832        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5833            workspace
5834                .active_item(cx)
5835                .unwrap()
5836                .downcast::<Editor>()
5837                .unwrap()
5838        });
5839
5840        // When client B moves, it automatically stops following client A.
5841        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5842        assert_eq!(
5843            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5844            None
5845        );
5846
5847        workspace_b
5848            .update(cx_b, |workspace, cx| {
5849                workspace
5850                    .toggle_follow(&ToggleFollow(leader_id), cx)
5851                    .unwrap()
5852            })
5853            .await
5854            .unwrap();
5855        assert_eq!(
5856            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5857            Some(leader_id)
5858        );
5859
5860        // When client B edits, it automatically stops following client A.
5861        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5862        assert_eq!(
5863            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5864            None
5865        );
5866
5867        workspace_b
5868            .update(cx_b, |workspace, cx| {
5869                workspace
5870                    .toggle_follow(&ToggleFollow(leader_id), cx)
5871                    .unwrap()
5872            })
5873            .await
5874            .unwrap();
5875        assert_eq!(
5876            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5877            Some(leader_id)
5878        );
5879
5880        // When client B scrolls, it automatically stops following client A.
5881        editor_b2.update(cx_b, |editor, cx| {
5882            editor.set_scroll_position(vec2f(0., 3.), cx)
5883        });
5884        assert_eq!(
5885            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5886            None
5887        );
5888
5889        workspace_b
5890            .update(cx_b, |workspace, cx| {
5891                workspace
5892                    .toggle_follow(&ToggleFollow(leader_id), cx)
5893                    .unwrap()
5894            })
5895            .await
5896            .unwrap();
5897        assert_eq!(
5898            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5899            Some(leader_id)
5900        );
5901
5902        // When client B activates a different pane, it continues following client A in the original pane.
5903        workspace_b.update(cx_b, |workspace, cx| {
5904            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5905        });
5906        assert_eq!(
5907            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5908            Some(leader_id)
5909        );
5910
5911        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5912        assert_eq!(
5913            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5914            Some(leader_id)
5915        );
5916
5917        // When client B activates a different item in the original pane, it automatically stops following client A.
5918        workspace_b
5919            .update(cx_b, |workspace, cx| {
5920                workspace.open_path((worktree_id, "2.txt"), true, cx)
5921            })
5922            .await
5923            .unwrap();
5924        assert_eq!(
5925            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5926            None
5927        );
5928    }
5929
5930    #[gpui::test(iterations = 100)]
5931    async fn test_random_collaboration(
5932        cx: &mut TestAppContext,
5933        deterministic: Arc<Deterministic>,
5934        rng: StdRng,
5935    ) {
5936        cx.foreground().forbid_parking();
5937        let max_peers = env::var("MAX_PEERS")
5938            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5939            .unwrap_or(5);
5940        assert!(max_peers <= 5);
5941
5942        let max_operations = env::var("OPERATIONS")
5943            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5944            .unwrap_or(10);
5945
5946        let rng = Arc::new(Mutex::new(rng));
5947
5948        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5949        let host_language_registry = Arc::new(LanguageRegistry::test());
5950
5951        let fs = FakeFs::new(cx.background());
5952        fs.insert_tree("/_collab", json!({"init": ""})).await;
5953
5954        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5955        let db = server.app_state.db.clone();
5956        let host_user_id = db.create_user("host", false).await.unwrap();
5957        for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
5958            let guest_user_id = db.create_user(username, false).await.unwrap();
5959            server
5960                .app_state
5961                .db
5962                .send_contact_request(guest_user_id, host_user_id)
5963                .await
5964                .unwrap();
5965            server
5966                .app_state
5967                .db
5968                .respond_to_contact_request(host_user_id, guest_user_id, true)
5969                .await
5970                .unwrap();
5971        }
5972
5973        let mut clients = Vec::new();
5974        let mut user_ids = Vec::new();
5975        let mut op_start_signals = Vec::new();
5976
5977        let mut next_entity_id = 100000;
5978        let mut host_cx = TestAppContext::new(
5979            cx.foreground_platform(),
5980            cx.platform(),
5981            deterministic.build_foreground(next_entity_id),
5982            deterministic.build_background(),
5983            cx.font_cache(),
5984            cx.leak_detector(),
5985            next_entity_id,
5986        );
5987        let host = server.create_client(&mut host_cx, "host").await;
5988        let host_project = host_cx.update(|cx| {
5989            Project::local(
5990                host.client.clone(),
5991                host.user_store.clone(),
5992                host_language_registry.clone(),
5993                fs.clone(),
5994                cx,
5995            )
5996        });
5997        let host_project_id = host_project
5998            .update(&mut host_cx, |p, _| p.next_remote_id())
5999            .await;
6000
6001        let (collab_worktree, _) = host_project
6002            .update(&mut host_cx, |project, cx| {
6003                project.find_or_create_local_worktree("/_collab", true, cx)
6004            })
6005            .await
6006            .unwrap();
6007        collab_worktree
6008            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
6009            .await;
6010        host_project
6011            .update(&mut host_cx, |project, cx| project.share(cx))
6012            .await
6013            .unwrap();
6014
6015        // Set up fake language servers.
6016        let mut language = Language::new(
6017            LanguageConfig {
6018                name: "Rust".into(),
6019                path_suffixes: vec!["rs".to_string()],
6020                ..Default::default()
6021            },
6022            None,
6023        );
6024        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6025            name: "the-fake-language-server",
6026            capabilities: lsp::LanguageServer::full_capabilities(),
6027            initializer: Some(Box::new({
6028                let rng = rng.clone();
6029                let fs = fs.clone();
6030                let project = host_project.downgrade();
6031                move |fake_server: &mut FakeLanguageServer| {
6032                    fake_server.handle_request::<lsp::request::Completion, _, _>(
6033                        |_, _| async move {
6034                            Ok(Some(lsp::CompletionResponse::Array(vec![
6035                                lsp::CompletionItem {
6036                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6037                                        range: lsp::Range::new(
6038                                            lsp::Position::new(0, 0),
6039                                            lsp::Position::new(0, 0),
6040                                        ),
6041                                        new_text: "the-new-text".to_string(),
6042                                    })),
6043                                    ..Default::default()
6044                                },
6045                            ])))
6046                        },
6047                    );
6048
6049                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6050                        |_, _| async move {
6051                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6052                                lsp::CodeAction {
6053                                    title: "the-code-action".to_string(),
6054                                    ..Default::default()
6055                                },
6056                            )]))
6057                        },
6058                    );
6059
6060                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6061                        |params, _| async move {
6062                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6063                                params.position,
6064                                params.position,
6065                            ))))
6066                        },
6067                    );
6068
6069                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6070                        let fs = fs.clone();
6071                        let rng = rng.clone();
6072                        move |_, _| {
6073                            let fs = fs.clone();
6074                            let rng = rng.clone();
6075                            async move {
6076                                let files = fs.files().await;
6077                                let mut rng = rng.lock();
6078                                let count = rng.gen_range::<usize, _>(1..3);
6079                                let files = (0..count)
6080                                    .map(|_| files.choose(&mut *rng).unwrap())
6081                                    .collect::<Vec<_>>();
6082                                log::info!("LSP: Returning definitions in files {:?}", &files);
6083                                Ok(Some(lsp::GotoDefinitionResponse::Array(
6084                                    files
6085                                        .into_iter()
6086                                        .map(|file| lsp::Location {
6087                                            uri: lsp::Url::from_file_path(file).unwrap(),
6088                                            range: Default::default(),
6089                                        })
6090                                        .collect(),
6091                                )))
6092                            }
6093                        }
6094                    });
6095
6096                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6097                        let rng = rng.clone();
6098                        let project = project.clone();
6099                        move |params, mut cx| {
6100                            let highlights = if let Some(project) = project.upgrade(&cx) {
6101                                project.update(&mut cx, |project, cx| {
6102                                    let path = params
6103                                        .text_document_position_params
6104                                        .text_document
6105                                        .uri
6106                                        .to_file_path()
6107                                        .unwrap();
6108                                    let (worktree, relative_path) =
6109                                        project.find_local_worktree(&path, cx)?;
6110                                    let project_path =
6111                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
6112                                    let buffer =
6113                                        project.get_open_buffer(&project_path, cx)?.read(cx);
6114
6115                                    let mut highlights = Vec::new();
6116                                    let highlight_count = rng.lock().gen_range(1..=5);
6117                                    let mut prev_end = 0;
6118                                    for _ in 0..highlight_count {
6119                                        let range =
6120                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
6121
6122                                        highlights.push(lsp::DocumentHighlight {
6123                                            range: range_to_lsp(range.to_point_utf16(buffer)),
6124                                            kind: Some(lsp::DocumentHighlightKind::READ),
6125                                        });
6126                                        prev_end = range.end;
6127                                    }
6128                                    Some(highlights)
6129                                })
6130                            } else {
6131                                None
6132                            };
6133                            async move { Ok(highlights) }
6134                        }
6135                    });
6136                }
6137            })),
6138            ..Default::default()
6139        });
6140        host_language_registry.add(Arc::new(language));
6141
6142        let op_start_signal = futures::channel::mpsc::unbounded();
6143        user_ids.push(host.current_user_id(&host_cx));
6144        op_start_signals.push(op_start_signal.0);
6145        clients.push(host_cx.foreground().spawn(host.simulate_host(
6146            host_project,
6147            op_start_signal.1,
6148            rng.clone(),
6149            host_cx,
6150        )));
6151
6152        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6153            rng.lock().gen_range(0..max_operations)
6154        } else {
6155            max_operations
6156        };
6157        let mut available_guests = vec![
6158            "guest-1".to_string(),
6159            "guest-2".to_string(),
6160            "guest-3".to_string(),
6161            "guest-4".to_string(),
6162        ];
6163        let mut operations = 0;
6164        while operations < max_operations {
6165            if operations == disconnect_host_at {
6166                server.disconnect_client(user_ids[0]);
6167                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6168                drop(op_start_signals);
6169                let mut clients = futures::future::join_all(clients).await;
6170                cx.foreground().run_until_parked();
6171
6172                let (host, mut host_cx, host_err) = clients.remove(0);
6173                if let Some(host_err) = host_err {
6174                    log::error!("host error - {}", host_err);
6175                }
6176                host.project
6177                    .as_ref()
6178                    .unwrap()
6179                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6180                for (guest, mut guest_cx, guest_err) in clients {
6181                    if let Some(guest_err) = guest_err {
6182                        log::error!("{} error - {}", guest.username, guest_err);
6183                    }
6184                    // TODO
6185                    // let contacts = server
6186                    //     .store
6187                    //     .read()
6188                    //     .await
6189                    //     .contacts_for_user(guest.current_user_id(&guest_cx));
6190                    // assert!(!contacts
6191                    //     .iter()
6192                    //     .flat_map(|contact| &contact.projects)
6193                    //     .any(|project| project.id == host_project_id));
6194                    guest
6195                        .project
6196                        .as_ref()
6197                        .unwrap()
6198                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6199                    guest_cx.update(|_| drop(guest));
6200                }
6201                host_cx.update(|_| drop(host));
6202
6203                return;
6204            }
6205
6206            let distribution = rng.lock().gen_range(0..100);
6207            match distribution {
6208                0..=19 if !available_guests.is_empty() => {
6209                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
6210                    let guest_username = available_guests.remove(guest_ix);
6211                    log::info!("Adding new connection for {}", guest_username);
6212                    next_entity_id += 100000;
6213                    let mut guest_cx = TestAppContext::new(
6214                        cx.foreground_platform(),
6215                        cx.platform(),
6216                        deterministic.build_foreground(next_entity_id),
6217                        deterministic.build_background(),
6218                        cx.font_cache(),
6219                        cx.leak_detector(),
6220                        next_entity_id,
6221                    );
6222                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
6223                    let guest_project = Project::remote(
6224                        host_project_id,
6225                        guest.client.clone(),
6226                        guest.user_store.clone(),
6227                        guest_lang_registry.clone(),
6228                        FakeFs::new(cx.background()),
6229                        &mut guest_cx.to_async(),
6230                    )
6231                    .await
6232                    .unwrap();
6233                    let op_start_signal = futures::channel::mpsc::unbounded();
6234                    user_ids.push(guest.current_user_id(&guest_cx));
6235                    op_start_signals.push(op_start_signal.0);
6236                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6237                        guest_username.clone(),
6238                        guest_project,
6239                        op_start_signal.1,
6240                        rng.clone(),
6241                        guest_cx,
6242                    )));
6243
6244                    log::info!("Added connection for {}", guest_username);
6245                    operations += 1;
6246                }
6247                20..=29 if clients.len() > 1 => {
6248                    let guest_ix = rng.lock().gen_range(1..clients.len());
6249                    log::info!("Removing guest {}", user_ids[guest_ix]);
6250                    let removed_guest_id = user_ids.remove(guest_ix);
6251                    let guest = clients.remove(guest_ix);
6252                    op_start_signals.remove(guest_ix);
6253                    server.disconnect_client(removed_guest_id);
6254                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6255                    let (guest, mut guest_cx, guest_err) = guest.await;
6256                    if let Some(guest_err) = guest_err {
6257                        log::error!("{} error - {}", guest.username, guest_err);
6258                    }
6259                    guest
6260                        .project
6261                        .as_ref()
6262                        .unwrap()
6263                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6264                    // TODO
6265                    // for user_id in &user_ids {
6266                    //     for contact in server.store.read().await.contacts_for_user(*user_id) {
6267                    //         assert_ne!(
6268                    //             contact.user_id, removed_guest_id.0 as u64,
6269                    //             "removed guest is still a contact of another peer"
6270                    //         );
6271                    //         for project in contact.projects {
6272                    //             for project_guest_id in project.guests {
6273                    //                 assert_ne!(
6274                    //                     project_guest_id, removed_guest_id.0 as u64,
6275                    //                     "removed guest appears as still participating on a project"
6276                    //                 );
6277                    //             }
6278                    //         }
6279                    //     }
6280                    // }
6281
6282                    log::info!("{} removed", guest.username);
6283                    available_guests.push(guest.username.clone());
6284                    guest_cx.update(|_| drop(guest));
6285
6286                    operations += 1;
6287                }
6288                _ => {
6289                    while operations < max_operations && rng.lock().gen_bool(0.7) {
6290                        op_start_signals
6291                            .choose(&mut *rng.lock())
6292                            .unwrap()
6293                            .unbounded_send(())
6294                            .unwrap();
6295                        operations += 1;
6296                    }
6297
6298                    if rng.lock().gen_bool(0.8) {
6299                        cx.foreground().run_until_parked();
6300                    }
6301                }
6302            }
6303        }
6304
6305        drop(op_start_signals);
6306        let mut clients = futures::future::join_all(clients).await;
6307        cx.foreground().run_until_parked();
6308
6309        let (host_client, mut host_cx, host_err) = clients.remove(0);
6310        if let Some(host_err) = host_err {
6311            panic!("host error - {}", host_err);
6312        }
6313        let host_project = host_client.project.as_ref().unwrap();
6314        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6315            project
6316                .worktrees(cx)
6317                .map(|worktree| {
6318                    let snapshot = worktree.read(cx).snapshot();
6319                    (snapshot.id(), snapshot)
6320                })
6321                .collect::<BTreeMap<_, _>>()
6322        });
6323
6324        host_client
6325            .project
6326            .as_ref()
6327            .unwrap()
6328            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6329
6330        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6331            if let Some(guest_err) = guest_err {
6332                panic!("{} error - {}", guest_client.username, guest_err);
6333            }
6334            let worktree_snapshots =
6335                guest_client
6336                    .project
6337                    .as_ref()
6338                    .unwrap()
6339                    .read_with(&guest_cx, |project, cx| {
6340                        project
6341                            .worktrees(cx)
6342                            .map(|worktree| {
6343                                let worktree = worktree.read(cx);
6344                                (worktree.id(), worktree.snapshot())
6345                            })
6346                            .collect::<BTreeMap<_, _>>()
6347                    });
6348
6349            assert_eq!(
6350                worktree_snapshots.keys().collect::<Vec<_>>(),
6351                host_worktree_snapshots.keys().collect::<Vec<_>>(),
6352                "{} has different worktrees than the host",
6353                guest_client.username
6354            );
6355            for (id, host_snapshot) in &host_worktree_snapshots {
6356                let guest_snapshot = &worktree_snapshots[id];
6357                assert_eq!(
6358                    guest_snapshot.root_name(),
6359                    host_snapshot.root_name(),
6360                    "{} has different root name than the host for worktree {}",
6361                    guest_client.username,
6362                    id
6363                );
6364                assert_eq!(
6365                    guest_snapshot.entries(false).collect::<Vec<_>>(),
6366                    host_snapshot.entries(false).collect::<Vec<_>>(),
6367                    "{} has different snapshot than the host for worktree {}",
6368                    guest_client.username,
6369                    id
6370                );
6371                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6372            }
6373
6374            guest_client
6375                .project
6376                .as_ref()
6377                .unwrap()
6378                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6379
6380            for guest_buffer in &guest_client.buffers {
6381                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6382                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6383                    project.buffer_for_id(buffer_id, cx).expect(&format!(
6384                        "host does not have buffer for guest:{}, peer:{}, id:{}",
6385                        guest_client.username, guest_client.peer_id, buffer_id
6386                    ))
6387                });
6388                let path = host_buffer
6389                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6390
6391                assert_eq!(
6392                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6393                    0,
6394                    "{}, buffer {}, path {:?} has deferred operations",
6395                    guest_client.username,
6396                    buffer_id,
6397                    path,
6398                );
6399                assert_eq!(
6400                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6401                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6402                    "{}, buffer {}, path {:?}, differs from the host's buffer",
6403                    guest_client.username,
6404                    buffer_id,
6405                    path
6406                );
6407            }
6408
6409            guest_cx.update(|_| drop(guest_client));
6410        }
6411
6412        host_cx.update(|_| drop(host_client));
6413    }
6414
6415    struct TestServer {
6416        peer: Arc<Peer>,
6417        app_state: Arc<AppState>,
6418        server: Arc<Server>,
6419        foreground: Rc<executor::Foreground>,
6420        notifications: mpsc::UnboundedReceiver<()>,
6421        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6422        forbid_connections: Arc<AtomicBool>,
6423        _test_db: TestDb,
6424    }
6425
6426    impl TestServer {
6427        async fn start(
6428            foreground: Rc<executor::Foreground>,
6429            background: Arc<executor::Background>,
6430        ) -> Self {
6431            let test_db = TestDb::fake(background);
6432            let app_state = Self::build_app_state(&test_db).await;
6433            let peer = Peer::new();
6434            let notifications = mpsc::unbounded();
6435            let server = Server::new(app_state.clone(), Some(notifications.0));
6436            Self {
6437                peer,
6438                app_state,
6439                server,
6440                foreground,
6441                notifications: notifications.1,
6442                connection_killers: Default::default(),
6443                forbid_connections: Default::default(),
6444                _test_db: test_db,
6445            }
6446        }
6447
6448        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6449            cx.update(|cx| {
6450                let settings = Settings::test(cx);
6451                cx.set_global(settings);
6452            });
6453
6454            let http = FakeHttpClient::with_404_response();
6455            let user_id =
6456                if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6457                    user.id
6458                } else {
6459                    self.app_state.db.create_user(name, false).await.unwrap()
6460                };
6461            let client_name = name.to_string();
6462            let mut client = Client::new(http.clone());
6463            let server = self.server.clone();
6464            let connection_killers = self.connection_killers.clone();
6465            let forbid_connections = self.forbid_connections.clone();
6466            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6467
6468            Arc::get_mut(&mut client)
6469                .unwrap()
6470                .override_authenticate(move |cx| {
6471                    cx.spawn(|_| async move {
6472                        let access_token = "the-token".to_string();
6473                        Ok(Credentials {
6474                            user_id: user_id.0 as u64,
6475                            access_token,
6476                        })
6477                    })
6478                })
6479                .override_establish_connection(move |credentials, cx| {
6480                    assert_eq!(credentials.user_id, user_id.0 as u64);
6481                    assert_eq!(credentials.access_token, "the-token");
6482
6483                    let server = server.clone();
6484                    let connection_killers = connection_killers.clone();
6485                    let forbid_connections = forbid_connections.clone();
6486                    let client_name = client_name.clone();
6487                    let connection_id_tx = connection_id_tx.clone();
6488                    cx.spawn(move |cx| async move {
6489                        if forbid_connections.load(SeqCst) {
6490                            Err(EstablishConnectionError::other(anyhow!(
6491                                "server is forbidding connections"
6492                            )))
6493                        } else {
6494                            let (client_conn, server_conn, killed) =
6495                                Connection::in_memory(cx.background());
6496                            connection_killers.lock().insert(user_id, killed);
6497                            cx.background()
6498                                .spawn(server.handle_connection(
6499                                    server_conn,
6500                                    client_name,
6501                                    user_id,
6502                                    Some(connection_id_tx),
6503                                    cx.background(),
6504                                ))
6505                                .detach();
6506                            Ok(client_conn)
6507                        }
6508                    })
6509                });
6510
6511            client
6512                .authenticate_and_connect(false, &cx.to_async())
6513                .await
6514                .unwrap();
6515
6516            Channel::init(&client);
6517            Project::init(&client);
6518            cx.update(|cx| {
6519                workspace::init(&client, cx);
6520            });
6521
6522            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6523            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6524
6525            let client = TestClient {
6526                client,
6527                peer_id,
6528                username: name.to_string(),
6529                user_store,
6530                language_registry: Arc::new(LanguageRegistry::test()),
6531                project: Default::default(),
6532                buffers: Default::default(),
6533            };
6534            client.wait_for_current_user(cx).await;
6535            client
6536        }
6537
6538        fn disconnect_client(&self, user_id: UserId) {
6539            self.connection_killers
6540                .lock()
6541                .remove(&user_id)
6542                .unwrap()
6543                .store(true, SeqCst);
6544        }
6545
6546        fn forbid_connections(&self) {
6547            self.forbid_connections.store(true, SeqCst);
6548        }
6549
6550        fn allow_connections(&self) {
6551            self.forbid_connections.store(false, SeqCst);
6552        }
6553
6554        async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6555            while let Some((client_a, cx_a)) = clients.pop() {
6556                for (client_b, cx_b) in &mut clients {
6557                    client_a
6558                        .user_store
6559                        .update(cx_a, |store, cx| {
6560                            store.request_contact(client_b.user_id().unwrap(), cx)
6561                        })
6562                        .await
6563                        .unwrap();
6564                    cx_a.foreground().run_until_parked();
6565                    client_b
6566                        .user_store
6567                        .update(*cx_b, |store, cx| {
6568                            store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6569                        })
6570                        .await
6571                        .unwrap();
6572                }
6573            }
6574        }
6575
6576        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6577            Arc::new(AppState {
6578                db: test_db.db().clone(),
6579                api_token: Default::default(),
6580            })
6581        }
6582
6583        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6584            self.server.store.read().await
6585        }
6586
6587        async fn condition<F>(&mut self, mut predicate: F)
6588        where
6589            F: FnMut(&Store) -> bool,
6590        {
6591            assert!(
6592                self.foreground.parking_forbidden(),
6593                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6594            );
6595            while !(predicate)(&*self.server.store.read().await) {
6596                self.foreground.start_waiting();
6597                self.notifications.next().await;
6598                self.foreground.finish_waiting();
6599            }
6600        }
6601    }
6602
6603    impl Deref for TestServer {
6604        type Target = Server;
6605
6606        fn deref(&self) -> &Self::Target {
6607            &self.server
6608        }
6609    }
6610
6611    impl Drop for TestServer {
6612        fn drop(&mut self) {
6613            self.peer.reset();
6614        }
6615    }
6616
6617    struct TestClient {
6618        client: Arc<Client>,
6619        username: String,
6620        pub peer_id: PeerId,
6621        pub user_store: ModelHandle<UserStore>,
6622        language_registry: Arc<LanguageRegistry>,
6623        project: Option<ModelHandle<Project>>,
6624        buffers: HashSet<ModelHandle<language::Buffer>>,
6625    }
6626
6627    impl Deref for TestClient {
6628        type Target = Arc<Client>;
6629
6630        fn deref(&self) -> &Self::Target {
6631            &self.client
6632        }
6633    }
6634
6635    struct ContactsSummary {
6636        pub current: Vec<String>,
6637        pub outgoing_requests: Vec<String>,
6638        pub incoming_requests: Vec<String>,
6639    }
6640
6641    impl TestClient {
6642        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6643            UserId::from_proto(
6644                self.user_store
6645                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6646            )
6647        }
6648
6649        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6650            let mut authed_user = self
6651                .user_store
6652                .read_with(cx, |user_store, _| user_store.watch_current_user());
6653            while authed_user.next().await.unwrap().is_none() {}
6654        }
6655
6656        async fn clear_contacts(&self, cx: &mut TestAppContext) {
6657            self.user_store
6658                .update(cx, |store, _| store.clear_contacts())
6659                .await;
6660        }
6661
6662        fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6663            self.user_store.read_with(cx, |store, _| ContactsSummary {
6664                current: store
6665                    .contacts()
6666                    .iter()
6667                    .map(|contact| contact.user.github_login.clone())
6668                    .collect(),
6669                outgoing_requests: store
6670                    .outgoing_contact_requests()
6671                    .iter()
6672                    .map(|user| user.github_login.clone())
6673                    .collect(),
6674                incoming_requests: store
6675                    .incoming_contact_requests()
6676                    .iter()
6677                    .map(|user| user.github_login.clone())
6678                    .collect(),
6679            })
6680        }
6681
6682        async fn build_local_project(
6683            &mut self,
6684            fs: Arc<FakeFs>,
6685            root_path: impl AsRef<Path>,
6686            cx: &mut TestAppContext,
6687        ) -> (ModelHandle<Project>, WorktreeId) {
6688            let project = cx.update(|cx| {
6689                Project::local(
6690                    self.client.clone(),
6691                    self.user_store.clone(),
6692                    self.language_registry.clone(),
6693                    fs,
6694                    cx,
6695                )
6696            });
6697            self.project = Some(project.clone());
6698            let (worktree, _) = project
6699                .update(cx, |p, cx| {
6700                    p.find_or_create_local_worktree(root_path, true, cx)
6701                })
6702                .await
6703                .unwrap();
6704            worktree
6705                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6706                .await;
6707            project
6708                .update(cx, |project, _| project.next_remote_id())
6709                .await;
6710            (project, worktree.read_with(cx, |tree, _| tree.id()))
6711        }
6712
6713        async fn build_remote_project(
6714            &mut self,
6715            project_id: u64,
6716            cx: &mut TestAppContext,
6717        ) -> ModelHandle<Project> {
6718            let project = Project::remote(
6719                project_id,
6720                self.client.clone(),
6721                self.user_store.clone(),
6722                self.language_registry.clone(),
6723                FakeFs::new(cx.background()),
6724                &mut cx.to_async(),
6725            )
6726            .await
6727            .unwrap();
6728            self.project = Some(project.clone());
6729            project
6730        }
6731
6732        fn build_workspace(
6733            &self,
6734            project: &ModelHandle<Project>,
6735            cx: &mut TestAppContext,
6736        ) -> ViewHandle<Workspace> {
6737            let (window_id, _) = cx.add_window(|_| EmptyView);
6738            cx.add_view(window_id, |cx| {
6739                let fs = project.read(cx).fs().clone();
6740                Workspace::new(
6741                    &WorkspaceParams {
6742                        fs,
6743                        project: project.clone(),
6744                        user_store: self.user_store.clone(),
6745                        languages: self.language_registry.clone(),
6746                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6747                        channel_list: cx.add_model(|cx| {
6748                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6749                        }),
6750                        client: self.client.clone(),
6751                    },
6752                    cx,
6753                )
6754            })
6755        }
6756
6757        async fn simulate_host(
6758            mut self,
6759            project: ModelHandle<Project>,
6760            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6761            rng: Arc<Mutex<StdRng>>,
6762            mut cx: TestAppContext,
6763        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6764            async fn simulate_host_internal(
6765                client: &mut TestClient,
6766                project: ModelHandle<Project>,
6767                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6768                rng: Arc<Mutex<StdRng>>,
6769                cx: &mut TestAppContext,
6770            ) -> anyhow::Result<()> {
6771                let fs = project.read_with(cx, |project, _| project.fs().clone());
6772
6773                while op_start_signal.next().await.is_some() {
6774                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6775                    let files = fs.as_fake().files().await;
6776                    match distribution {
6777                        0..=20 if !files.is_empty() => {
6778                            let path = files.choose(&mut *rng.lock()).unwrap();
6779                            let mut path = path.as_path();
6780                            while let Some(parent_path) = path.parent() {
6781                                path = parent_path;
6782                                if rng.lock().gen() {
6783                                    break;
6784                                }
6785                            }
6786
6787                            log::info!("Host: find/create local worktree {:?}", path);
6788                            let find_or_create_worktree = project.update(cx, |project, cx| {
6789                                project.find_or_create_local_worktree(path, true, cx)
6790                            });
6791                            if rng.lock().gen() {
6792                                cx.background().spawn(find_or_create_worktree).detach();
6793                            } else {
6794                                find_or_create_worktree.await?;
6795                            }
6796                        }
6797                        10..=80 if !files.is_empty() => {
6798                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6799                                let file = files.choose(&mut *rng.lock()).unwrap();
6800                                let (worktree, path) = project
6801                                    .update(cx, |project, cx| {
6802                                        project.find_or_create_local_worktree(
6803                                            file.clone(),
6804                                            true,
6805                                            cx,
6806                                        )
6807                                    })
6808                                    .await?;
6809                                let project_path =
6810                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6811                                log::info!(
6812                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6813                                    file,
6814                                    project_path.0,
6815                                    project_path.1
6816                                );
6817                                let buffer = project
6818                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6819                                    .await
6820                                    .unwrap();
6821                                client.buffers.insert(buffer.clone());
6822                                buffer
6823                            } else {
6824                                client
6825                                    .buffers
6826                                    .iter()
6827                                    .choose(&mut *rng.lock())
6828                                    .unwrap()
6829                                    .clone()
6830                            };
6831
6832                            if rng.lock().gen_bool(0.1) {
6833                                cx.update(|cx| {
6834                                    log::info!(
6835                                        "Host: dropping buffer {:?}",
6836                                        buffer.read(cx).file().unwrap().full_path(cx)
6837                                    );
6838                                    client.buffers.remove(&buffer);
6839                                    drop(buffer);
6840                                });
6841                            } else {
6842                                buffer.update(cx, |buffer, cx| {
6843                                    log::info!(
6844                                        "Host: updating buffer {:?} ({})",
6845                                        buffer.file().unwrap().full_path(cx),
6846                                        buffer.remote_id()
6847                                    );
6848
6849                                    if rng.lock().gen_bool(0.7) {
6850                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6851                                    } else {
6852                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6853                                    }
6854                                });
6855                            }
6856                        }
6857                        _ => loop {
6858                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6859                            let mut path = PathBuf::new();
6860                            path.push("/");
6861                            for _ in 0..path_component_count {
6862                                let letter = rng.lock().gen_range(b'a'..=b'z');
6863                                path.push(std::str::from_utf8(&[letter]).unwrap());
6864                            }
6865                            path.set_extension("rs");
6866                            let parent_path = path.parent().unwrap();
6867
6868                            log::info!("Host: creating file {:?}", path,);
6869
6870                            if fs.create_dir(&parent_path).await.is_ok()
6871                                && fs.create_file(&path, Default::default()).await.is_ok()
6872                            {
6873                                break;
6874                            } else {
6875                                log::info!("Host: cannot create file");
6876                            }
6877                        },
6878                    }
6879
6880                    cx.background().simulate_random_delay().await;
6881                }
6882
6883                Ok(())
6884            }
6885
6886            let result =
6887                simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
6888                    .await;
6889            log::info!("Host done");
6890            self.project = Some(project);
6891            (self, cx, result.err())
6892        }
6893
6894        pub async fn simulate_guest(
6895            mut self,
6896            guest_username: String,
6897            project: ModelHandle<Project>,
6898            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6899            rng: Arc<Mutex<StdRng>>,
6900            mut cx: TestAppContext,
6901        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6902            async fn simulate_guest_internal(
6903                client: &mut TestClient,
6904                guest_username: &str,
6905                project: ModelHandle<Project>,
6906                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6907                rng: Arc<Mutex<StdRng>>,
6908                cx: &mut TestAppContext,
6909            ) -> anyhow::Result<()> {
6910                while op_start_signal.next().await.is_some() {
6911                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6912                        let worktree = if let Some(worktree) =
6913                            project.read_with(cx, |project, cx| {
6914                                project
6915                                    .worktrees(&cx)
6916                                    .filter(|worktree| {
6917                                        let worktree = worktree.read(cx);
6918                                        worktree.is_visible()
6919                                            && worktree.entries(false).any(|e| e.is_file())
6920                                    })
6921                                    .choose(&mut *rng.lock())
6922                            }) {
6923                            worktree
6924                        } else {
6925                            cx.background().simulate_random_delay().await;
6926                            continue;
6927                        };
6928
6929                        let (worktree_root_name, project_path) =
6930                            worktree.read_with(cx, |worktree, _| {
6931                                let entry = worktree
6932                                    .entries(false)
6933                                    .filter(|e| e.is_file())
6934                                    .choose(&mut *rng.lock())
6935                                    .unwrap();
6936                                (
6937                                    worktree.root_name().to_string(),
6938                                    (worktree.id(), entry.path.clone()),
6939                                )
6940                            });
6941                        log::info!(
6942                            "{}: opening path {:?} in worktree {} ({})",
6943                            guest_username,
6944                            project_path.1,
6945                            project_path.0,
6946                            worktree_root_name,
6947                        );
6948                        let buffer = project
6949                            .update(cx, |project, cx| {
6950                                project.open_buffer(project_path.clone(), cx)
6951                            })
6952                            .await?;
6953                        log::info!(
6954                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6955                            guest_username,
6956                            project_path.1,
6957                            project_path.0,
6958                            worktree_root_name,
6959                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6960                        );
6961                        client.buffers.insert(buffer.clone());
6962                        buffer
6963                    } else {
6964                        client
6965                            .buffers
6966                            .iter()
6967                            .choose(&mut *rng.lock())
6968                            .unwrap()
6969                            .clone()
6970                    };
6971
6972                    let choice = rng.lock().gen_range(0..100);
6973                    match choice {
6974                        0..=9 => {
6975                            cx.update(|cx| {
6976                                log::info!(
6977                                    "{}: dropping buffer {:?}",
6978                                    guest_username,
6979                                    buffer.read(cx).file().unwrap().full_path(cx)
6980                                );
6981                                client.buffers.remove(&buffer);
6982                                drop(buffer);
6983                            });
6984                        }
6985                        10..=19 => {
6986                            let completions = project.update(cx, |project, cx| {
6987                                log::info!(
6988                                    "{}: requesting completions for buffer {} ({:?})",
6989                                    guest_username,
6990                                    buffer.read(cx).remote_id(),
6991                                    buffer.read(cx).file().unwrap().full_path(cx)
6992                                );
6993                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6994                                project.completions(&buffer, offset, cx)
6995                            });
6996                            let completions = cx.background().spawn(async move {
6997                                completions
6998                                    .await
6999                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
7000                            });
7001                            if rng.lock().gen_bool(0.3) {
7002                                log::info!("{}: detaching completions request", guest_username);
7003                                cx.update(|cx| completions.detach_and_log_err(cx));
7004                            } else {
7005                                completions.await?;
7006                            }
7007                        }
7008                        20..=29 => {
7009                            let code_actions = project.update(cx, |project, cx| {
7010                                log::info!(
7011                                    "{}: requesting code actions for buffer {} ({:?})",
7012                                    guest_username,
7013                                    buffer.read(cx).remote_id(),
7014                                    buffer.read(cx).file().unwrap().full_path(cx)
7015                                );
7016                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7017                                project.code_actions(&buffer, range, cx)
7018                            });
7019                            let code_actions = cx.background().spawn(async move {
7020                                code_actions.await.map_err(|err| {
7021                                    anyhow!("code actions request failed: {:?}", err)
7022                                })
7023                            });
7024                            if rng.lock().gen_bool(0.3) {
7025                                log::info!("{}: detaching code actions request", guest_username);
7026                                cx.update(|cx| code_actions.detach_and_log_err(cx));
7027                            } else {
7028                                code_actions.await?;
7029                            }
7030                        }
7031                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7032                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7033                                log::info!(
7034                                    "{}: saving buffer {} ({:?})",
7035                                    guest_username,
7036                                    buffer.remote_id(),
7037                                    buffer.file().unwrap().full_path(cx)
7038                                );
7039                                (buffer.version(), buffer.save(cx))
7040                            });
7041                            let save = cx.background().spawn(async move {
7042                                let (saved_version, _) = save
7043                                    .await
7044                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7045                                assert!(saved_version.observed_all(&requested_version));
7046                                Ok::<_, anyhow::Error>(())
7047                            });
7048                            if rng.lock().gen_bool(0.3) {
7049                                log::info!("{}: detaching save request", guest_username);
7050                                cx.update(|cx| save.detach_and_log_err(cx));
7051                            } else {
7052                                save.await?;
7053                            }
7054                        }
7055                        40..=44 => {
7056                            let prepare_rename = project.update(cx, |project, cx| {
7057                                log::info!(
7058                                    "{}: preparing rename for buffer {} ({:?})",
7059                                    guest_username,
7060                                    buffer.read(cx).remote_id(),
7061                                    buffer.read(cx).file().unwrap().full_path(cx)
7062                                );
7063                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7064                                project.prepare_rename(buffer, offset, cx)
7065                            });
7066                            let prepare_rename = cx.background().spawn(async move {
7067                                prepare_rename.await.map_err(|err| {
7068                                    anyhow!("prepare rename request failed: {:?}", err)
7069                                })
7070                            });
7071                            if rng.lock().gen_bool(0.3) {
7072                                log::info!("{}: detaching prepare rename request", guest_username);
7073                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7074                            } else {
7075                                prepare_rename.await?;
7076                            }
7077                        }
7078                        45..=49 => {
7079                            let definitions = project.update(cx, |project, cx| {
7080                                log::info!(
7081                                    "{}: requesting definitions for buffer {} ({:?})",
7082                                    guest_username,
7083                                    buffer.read(cx).remote_id(),
7084                                    buffer.read(cx).file().unwrap().full_path(cx)
7085                                );
7086                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7087                                project.definition(&buffer, offset, cx)
7088                            });
7089                            let definitions = cx.background().spawn(async move {
7090                                definitions
7091                                    .await
7092                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7093                            });
7094                            if rng.lock().gen_bool(0.3) {
7095                                log::info!("{}: detaching definitions request", guest_username);
7096                                cx.update(|cx| definitions.detach_and_log_err(cx));
7097                            } else {
7098                                client
7099                                    .buffers
7100                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7101                            }
7102                        }
7103                        50..=54 => {
7104                            let highlights = project.update(cx, |project, cx| {
7105                                log::info!(
7106                                    "{}: requesting highlights for buffer {} ({:?})",
7107                                    guest_username,
7108                                    buffer.read(cx).remote_id(),
7109                                    buffer.read(cx).file().unwrap().full_path(cx)
7110                                );
7111                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7112                                project.document_highlights(&buffer, offset, cx)
7113                            });
7114                            let highlights = cx.background().spawn(async move {
7115                                highlights
7116                                    .await
7117                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7118                            });
7119                            if rng.lock().gen_bool(0.3) {
7120                                log::info!("{}: detaching highlights request", guest_username);
7121                                cx.update(|cx| highlights.detach_and_log_err(cx));
7122                            } else {
7123                                highlights.await?;
7124                            }
7125                        }
7126                        55..=59 => {
7127                            let search = project.update(cx, |project, cx| {
7128                                let query = rng.lock().gen_range('a'..='z');
7129                                log::info!("{}: project-wide search {:?}", guest_username, query);
7130                                project.search(SearchQuery::text(query, false, false), cx)
7131                            });
7132                            let search = cx.background().spawn(async move {
7133                                search
7134                                    .await
7135                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
7136                            });
7137                            if rng.lock().gen_bool(0.3) {
7138                                log::info!("{}: detaching search request", guest_username);
7139                                cx.update(|cx| search.detach_and_log_err(cx));
7140                            } else {
7141                                client.buffers.extend(search.await?.into_keys());
7142                            }
7143                        }
7144                        60..=69 => {
7145                            let worktree = project
7146                                .read_with(cx, |project, cx| {
7147                                    project
7148                                        .worktrees(&cx)
7149                                        .filter(|worktree| {
7150                                            let worktree = worktree.read(cx);
7151                                            worktree.is_visible()
7152                                                && worktree.entries(false).any(|e| e.is_file())
7153                                                && worktree
7154                                                    .root_entry()
7155                                                    .map_or(false, |e| e.is_dir())
7156                                        })
7157                                        .choose(&mut *rng.lock())
7158                                })
7159                                .unwrap();
7160                            let (worktree_id, worktree_root_name) = worktree
7161                                .read_with(cx, |worktree, _| {
7162                                    (worktree.id(), worktree.root_name().to_string())
7163                                });
7164
7165                            let mut new_name = String::new();
7166                            for _ in 0..10 {
7167                                let letter = rng.lock().gen_range('a'..='z');
7168                                new_name.push(letter);
7169                            }
7170                            let mut new_path = PathBuf::new();
7171                            new_path.push(new_name);
7172                            new_path.set_extension("rs");
7173                            log::info!(
7174                                "{}: creating {:?} in worktree {} ({})",
7175                                guest_username,
7176                                new_path,
7177                                worktree_id,
7178                                worktree_root_name,
7179                            );
7180                            project
7181                                .update(cx, |project, cx| {
7182                                    project.create_entry((worktree_id, new_path), false, cx)
7183                                })
7184                                .unwrap()
7185                                .await?;
7186                        }
7187                        _ => {
7188                            buffer.update(cx, |buffer, cx| {
7189                                log::info!(
7190                                    "{}: updating buffer {} ({:?})",
7191                                    guest_username,
7192                                    buffer.remote_id(),
7193                                    buffer.file().unwrap().full_path(cx)
7194                                );
7195                                if rng.lock().gen_bool(0.7) {
7196                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7197                                } else {
7198                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7199                                }
7200                            });
7201                        }
7202                    }
7203                    cx.background().simulate_random_delay().await;
7204                }
7205                Ok(())
7206            }
7207
7208            let result = simulate_guest_internal(
7209                &mut self,
7210                &guest_username,
7211                project.clone(),
7212                op_start_signal,
7213                rng,
7214                &mut cx,
7215            )
7216            .await;
7217            log::info!("{}: done", guest_username);
7218
7219            self.project = Some(project);
7220            (self, cx, result.err())
7221        }
7222    }
7223
7224    impl Drop for TestClient {
7225        fn drop(&mut self) {
7226            self.client.tear_down();
7227        }
7228    }
7229
7230    impl Executor for Arc<gpui::executor::Background> {
7231        type Sleep = gpui::executor::Timer;
7232
7233        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7234            self.spawn(future).detach();
7235        }
7236
7237        fn sleep(&self, duration: Duration) -> Self::Sleep {
7238            self.as_ref().timer(duration)
7239        }
7240    }
7241
7242    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7243        channel
7244            .messages()
7245            .cursor::<()>()
7246            .map(|m| {
7247                (
7248                    m.sender.github_login.clone(),
7249                    m.body.clone(),
7250                    m.is_pending(),
7251                )
7252            })
7253            .collect()
7254    }
7255
7256    struct EmptyView;
7257
7258    impl gpui::Entity for EmptyView {
7259        type Event = ();
7260    }
7261
7262    impl gpui::View for EmptyView {
7263        fn ui_name() -> &'static str {
7264            "empty view"
7265        }
7266
7267        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7268            gpui::Element::boxed(gpui::elements::Empty)
7269        }
7270    }
7271}