rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::IntoResponse,
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::HashMap;
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::{
  40        atomic::{AtomicBool, Ordering::SeqCst},
  41        Arc,
  42    },
  43    time::Duration,
  44};
  45use store::{Store, Worktree};
  46use time::OffsetDateTime;
  47use tokio::{
  48    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  49    time::Sleep,
  50};
  51use tower::ServiceBuilder;
  52use tracing::{info_span, Instrument};
  53
  54type MessageHandler =
  55    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  56
  57struct Response<R> {
  58    server: Arc<Server>,
  59    receipt: Receipt<R>,
  60    responded: Arc<AtomicBool>,
  61}
  62
  63impl<R: RequestMessage> Response<R> {
  64    fn send(self, payload: R::Response) -> Result<()> {
  65        self.responded.store(true, SeqCst);
  66        self.server.peer.respond(self.receipt, payload)?;
  67        Ok(())
  68    }
  69}
  70
  71pub struct Server {
  72    peer: Arc<Peer>,
  73    store: RwLock<Store>,
  74    app_state: Arc<AppState>,
  75    handlers: HashMap<TypeId, MessageHandler>,
  76    notifications: Option<mpsc::UnboundedSender<()>>,
  77}
  78
  79pub trait Executor: Send + Clone {
  80    type Sleep: Send + Future;
  81    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  82    fn sleep(&self, duration: Duration) -> Self::Sleep;
  83}
  84
  85#[derive(Clone)]
  86pub struct RealExecutor;
  87
  88const MESSAGE_COUNT_PER_PAGE: usize = 100;
  89const MAX_MESSAGE_LEN: usize = 1024;
  90
  91struct StoreReadGuard<'a> {
  92    guard: RwLockReadGuard<'a, Store>,
  93    _not_send: PhantomData<Rc<()>>,
  94}
  95
  96struct StoreWriteGuard<'a> {
  97    guard: RwLockWriteGuard<'a, Store>,
  98    _not_send: PhantomData<Rc<()>>,
  99}
 100
 101impl Server {
 102    pub fn new(
 103        app_state: Arc<AppState>,
 104        notifications: Option<mpsc::UnboundedSender<()>>,
 105    ) -> Arc<Self> {
 106        let mut server = Self {
 107            peer: Peer::new(),
 108            app_state,
 109            store: Default::default(),
 110            handlers: Default::default(),
 111            notifications,
 112        };
 113
 114        server
 115            .add_request_handler(Server::ping)
 116            .add_request_handler(Server::register_project)
 117            .add_message_handler(Server::unregister_project)
 118            .add_request_handler(Server::share_project)
 119            .add_message_handler(Server::unshare_project)
 120            .add_request_handler(Server::join_project)
 121            .add_message_handler(Server::leave_project)
 122            .add_request_handler(Server::register_worktree)
 123            .add_message_handler(Server::unregister_worktree)
 124            .add_request_handler(Server::update_worktree)
 125            .add_message_handler(Server::start_language_server)
 126            .add_message_handler(Server::update_language_server)
 127            .add_message_handler(Server::update_diagnostic_summary)
 128            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 129            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 130            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 131            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 132            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 133            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 134            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 135            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 136            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 137            .add_request_handler(
 138                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 139            )
 140            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 141            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 142            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 143            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 144            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 145            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 146            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 147            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 148            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 149            .add_request_handler(Server::update_buffer)
 150            .add_message_handler(Server::update_buffer_file)
 151            .add_message_handler(Server::buffer_reloaded)
 152            .add_message_handler(Server::buffer_saved)
 153            .add_request_handler(Server::save_buffer)
 154            .add_request_handler(Server::get_channels)
 155            .add_request_handler(Server::get_users)
 156            .add_request_handler(Server::fuzzy_search_users)
 157            .add_request_handler(Server::request_contact)
 158            .add_request_handler(Server::remove_contact)
 159            .add_request_handler(Server::respond_to_contact_request)
 160            .add_request_handler(Server::join_channel)
 161            .add_message_handler(Server::leave_channel)
 162            .add_request_handler(Server::send_channel_message)
 163            .add_request_handler(Server::follow)
 164            .add_message_handler(Server::unfollow)
 165            .add_message_handler(Server::update_followers)
 166            .add_request_handler(Server::get_channel_messages);
 167
 168        Arc::new(server)
 169    }
 170
 171    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 172    where
 173        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 174        Fut: 'static + Send + Future<Output = Result<()>>,
 175        M: EnvelopedMessage,
 176    {
 177        let prev_handler = self.handlers.insert(
 178            TypeId::of::<M>(),
 179            Box::new(move |server, envelope| {
 180                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 181                let span = info_span!(
 182                    "handle message",
 183                    payload_type = envelope.payload_type_name(),
 184                    payload = format!("{:?}", envelope.payload).as_str(),
 185                );
 186                let future = (handler)(server, *envelope);
 187                async move {
 188                    if let Err(error) = future.await {
 189                        tracing::error!(%error, "error handling message");
 190                    }
 191                }
 192                .instrument(span)
 193                .boxed()
 194            }),
 195        );
 196        if prev_handler.is_some() {
 197            panic!("registered a handler for the same message twice");
 198        }
 199        self
 200    }
 201
 202    /// Handle a request while holding a lock to the store. This is useful when we're registering
 203    /// a connection but we want to respond on the connection before anybody else can send on it.
 204    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 205    where
 206        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
 207        Fut: Send + Future<Output = Result<()>>,
 208        M: RequestMessage,
 209    {
 210        let handler = Arc::new(handler);
 211        self.add_message_handler(move |server, envelope| {
 212            let receipt = envelope.receipt();
 213            let handler = handler.clone();
 214            async move {
 215                let responded = Arc::new(AtomicBool::default());
 216                let response = Response {
 217                    server: server.clone(),
 218                    responded: responded.clone(),
 219                    receipt: envelope.receipt(),
 220                };
 221                match (handler)(server.clone(), envelope, response).await {
 222                    Ok(()) => {
 223                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 224                            Ok(())
 225                        } else {
 226                            Err(anyhow!("handler did not send a response"))?
 227                        }
 228                    }
 229                    Err(error) => {
 230                        server.peer.respond_with_error(
 231                            receipt,
 232                            proto::Error {
 233                                message: error.to_string(),
 234                            },
 235                        )?;
 236                        Err(error)
 237                    }
 238                }
 239            }
 240        })
 241    }
 242
 243    pub fn handle_connection<E: Executor>(
 244        self: &Arc<Self>,
 245        connection: Connection,
 246        address: String,
 247        user_id: UserId,
 248        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 249        executor: E,
 250    ) -> impl Future<Output = Result<()>> {
 251        let mut this = self.clone();
 252        let span = info_span!("handle connection", %user_id, %address);
 253        async move {
 254            let (connection_id, handle_io, mut incoming_rx) = this
 255                .peer
 256                .add_connection(connection, {
 257                    let executor = executor.clone();
 258                    move |duration| {
 259                        let timer = executor.sleep(duration);
 260                        async move {
 261                            timer.await;
 262                        }
 263                    }
 264                })
 265                .await;
 266
 267            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 268
 269            if let Some(send_connection_id) = send_connection_id.as_mut() {
 270                let _ = send_connection_id.send(connection_id).await;
 271            }
 272
 273            let contacts = this.app_state.db.get_contacts(user_id).await?;
 274
 275            {
 276                let mut store = this.store_mut().await;
 277                store.add_connection(connection_id, user_id);
 278                this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 279            }
 280            this.update_user_contacts(user_id).await?;
 281
 282            let handle_io = handle_io.fuse();
 283            futures::pin_mut!(handle_io);
 284            loop {
 285                let next_message = incoming_rx.next().fuse();
 286                futures::pin_mut!(next_message);
 287                futures::select_biased! {
 288                    result = handle_io => {
 289                        if let Err(error) = result {
 290                            tracing::error!(%error, "error handling I/O");
 291                        }
 292                        break;
 293                    }
 294                    message = next_message => {
 295                        if let Some(message) = message {
 296                            let type_name = message.payload_type_name();
 297                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 298                            async {
 299                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 300                                    let notifications = this.notifications.clone();
 301                                    let is_background = message.is_background();
 302                                    let handle_message = (handler)(this.clone(), message);
 303                                    let handle_message = async move {
 304                                        handle_message.await;
 305                                        if let Some(mut notifications) = notifications {
 306                                            let _ = notifications.send(()).await;
 307                                        }
 308                                    };
 309                                    if is_background {
 310                                        executor.spawn_detached(handle_message);
 311                                    } else {
 312                                        handle_message.await;
 313                                    }
 314                                } else {
 315                                    tracing::error!("no message handler");
 316                                }
 317                            }.instrument(span).await;
 318                        } else {
 319                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 320                            break;
 321                        }
 322                    }
 323                }
 324            }
 325
 326            if let Err(error) = this.sign_out(connection_id).await {
 327                tracing::error!(%error, "error signing out");
 328            }
 329
 330            Ok(())
 331        }.instrument(span)
 332    }
 333
 334    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 335        self.peer.disconnect(connection_id);
 336        let removed_connection = self.store_mut().await.remove_connection(connection_id)?;
 337
 338        for (project_id, project) in removed_connection.hosted_projects {
 339            if let Some(share) = project.share {
 340                broadcast(connection_id, share.guests.keys().copied(), |conn_id| {
 341                    self.peer
 342                        .send(conn_id, proto::UnshareProject { project_id })
 343                });
 344            }
 345        }
 346
 347        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 348            broadcast(connection_id, peer_ids, |conn_id| {
 349                self.peer.send(
 350                    conn_id,
 351                    proto::RemoveProjectCollaborator {
 352                        project_id,
 353                        peer_id: connection_id.0,
 354                    },
 355                )
 356            });
 357        }
 358
 359        self.update_user_contacts(removed_connection.user_id)
 360            .await?;
 361
 362        Ok(())
 363    }
 364
 365    async fn ping(
 366        self: Arc<Server>,
 367        _: TypedEnvelope<proto::Ping>,
 368        response: Response<proto::Ping>,
 369    ) -> Result<()> {
 370        response.send(proto::Ack {})?;
 371        Ok(())
 372    }
 373
 374    async fn register_project(
 375        self: Arc<Server>,
 376        request: TypedEnvelope<proto::RegisterProject>,
 377        response: Response<proto::RegisterProject>,
 378    ) -> Result<()> {
 379        let user_id;
 380        let project_id;
 381        {
 382            let mut state = self.store_mut().await;
 383            user_id = state.user_id_for_connection(request.sender_id)?;
 384            project_id = state.register_project(request.sender_id, user_id);
 385        };
 386        self.update_user_contacts(user_id).await?;
 387        response.send(proto::RegisterProjectResponse { project_id })?;
 388        Ok(())
 389    }
 390
 391    async fn unregister_project(
 392        self: Arc<Server>,
 393        request: TypedEnvelope<proto::UnregisterProject>,
 394    ) -> Result<()> {
 395        let user_id = {
 396            let mut state = self.store_mut().await;
 397            state.unregister_project(request.payload.project_id, request.sender_id)?;
 398            state.user_id_for_connection(request.sender_id)?
 399        };
 400
 401        self.update_user_contacts(user_id).await?;
 402        Ok(())
 403    }
 404
 405    async fn share_project(
 406        self: Arc<Server>,
 407        request: TypedEnvelope<proto::ShareProject>,
 408        response: Response<proto::ShareProject>,
 409    ) -> Result<()> {
 410        let user_id = {
 411            let mut state = self.store_mut().await;
 412            state.share_project(request.payload.project_id, request.sender_id)?;
 413            state.user_id_for_connection(request.sender_id)?
 414        };
 415        self.update_user_contacts(user_id).await?;
 416        response.send(proto::Ack {})?;
 417        Ok(())
 418    }
 419
 420    async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
 421        let contacts = self.app_state.db.get_contacts(user_id).await?;
 422        let store = self.store().await;
 423        let updated_contact = store.contact_for_user(user_id);
 424        for contact_user_id in contacts.current {
 425            for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
 426                self.peer
 427                    .send(
 428                        contact_conn_id,
 429                        proto::UpdateContacts {
 430                            contacts: vec![updated_contact.clone()],
 431                            remove_contacts: Default::default(),
 432                            incoming_requests: Default::default(),
 433                            remove_incoming_requests: Default::default(),
 434                            outgoing_requests: Default::default(),
 435                            remove_outgoing_requests: Default::default(),
 436                        },
 437                    )
 438                    .trace_err();
 439            }
 440        }
 441        Ok(())
 442    }
 443
 444    async fn unshare_project(
 445        self: Arc<Server>,
 446        request: TypedEnvelope<proto::UnshareProject>,
 447    ) -> Result<()> {
 448        let project_id = request.payload.project_id;
 449        let project;
 450        {
 451            let mut state = self.store_mut().await;
 452            project = state.unshare_project(project_id, request.sender_id)?;
 453            broadcast(request.sender_id, project.connection_ids, |conn_id| {
 454                self.peer
 455                    .send(conn_id, proto::UnshareProject { project_id })
 456            });
 457        }
 458        self.update_user_contacts(project.host_user_id).await?;
 459        Ok(())
 460    }
 461
 462    async fn join_project(
 463        self: Arc<Server>,
 464        request: TypedEnvelope<proto::JoinProject>,
 465        response: Response<proto::JoinProject>,
 466    ) -> Result<()> {
 467        let project_id = request.payload.project_id;
 468        let host_user_id;
 469        let guest_user_id;
 470        {
 471            let state = self.store().await;
 472            host_user_id = state.project(project_id)?.host_user_id;
 473            guest_user_id = state.user_id_for_connection(request.sender_id)?;
 474        };
 475
 476        let guest_contacts = self.app_state.db.get_contacts(guest_user_id).await?;
 477        if !guest_contacts.current.contains(&host_user_id) {
 478            return Err(anyhow!("no such project"))?;
 479        }
 480
 481        {
 482            let state = &mut *self.store_mut().await;
 483            let joined = state.join_project(request.sender_id, guest_user_id, project_id)?;
 484            let share = joined.project.share()?;
 485            let peer_count = share.guests.len();
 486            let mut collaborators = Vec::with_capacity(peer_count);
 487            collaborators.push(proto::Collaborator {
 488                peer_id: joined.project.host_connection_id.0,
 489                replica_id: 0,
 490                user_id: joined.project.host_user_id.to_proto(),
 491            });
 492            let worktrees = share
 493                .worktrees
 494                .iter()
 495                .filter_map(|(id, shared_worktree)| {
 496                    let worktree = joined.project.worktrees.get(&id)?;
 497                    Some(proto::Worktree {
 498                        id: *id,
 499                        root_name: worktree.root_name.clone(),
 500                        entries: shared_worktree.entries.values().cloned().collect(),
 501                        diagnostic_summaries: shared_worktree
 502                            .diagnostic_summaries
 503                            .values()
 504                            .cloned()
 505                            .collect(),
 506                        visible: worktree.visible,
 507                        scan_id: shared_worktree.scan_id,
 508                    })
 509                })
 510                .collect();
 511            for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 512                if *peer_conn_id != request.sender_id {
 513                    collaborators.push(proto::Collaborator {
 514                        peer_id: peer_conn_id.0,
 515                        replica_id: *peer_replica_id as u32,
 516                        user_id: peer_user_id.to_proto(),
 517                    });
 518                }
 519            }
 520            broadcast(
 521                request.sender_id,
 522                joined.project.connection_ids(),
 523                |conn_id| {
 524                    self.peer.send(
 525                        conn_id,
 526                        proto::AddProjectCollaborator {
 527                            project_id,
 528                            collaborator: Some(proto::Collaborator {
 529                                peer_id: request.sender_id.0,
 530                                replica_id: joined.replica_id as u32,
 531                                user_id: guest_user_id.to_proto(),
 532                            }),
 533                        },
 534                    )
 535                },
 536            );
 537            response.send(proto::JoinProjectResponse {
 538                worktrees,
 539                replica_id: joined.replica_id as u32,
 540                collaborators,
 541                language_servers: joined.project.language_servers.clone(),
 542            })?;
 543        }
 544        self.update_user_contacts(host_user_id).await?;
 545        Ok(())
 546    }
 547
 548    async fn leave_project(
 549        self: Arc<Server>,
 550        request: TypedEnvelope<proto::LeaveProject>,
 551    ) -> Result<()> {
 552        let sender_id = request.sender_id;
 553        let project_id = request.payload.project_id;
 554        let project;
 555        {
 556            let mut state = self.store_mut().await;
 557            project = state.leave_project(sender_id, project_id)?;
 558            broadcast(sender_id, project.connection_ids, |conn_id| {
 559                self.peer.send(
 560                    conn_id,
 561                    proto::RemoveProjectCollaborator {
 562                        project_id,
 563                        peer_id: sender_id.0,
 564                    },
 565                )
 566            });
 567        }
 568        self.update_user_contacts(project.host_user_id).await?;
 569        Ok(())
 570    }
 571
 572    async fn register_worktree(
 573        self: Arc<Server>,
 574        request: TypedEnvelope<proto::RegisterWorktree>,
 575        response: Response<proto::RegisterWorktree>,
 576    ) -> Result<()> {
 577        let host_user_id;
 578        {
 579            let mut state = self.store_mut().await;
 580            host_user_id = state.user_id_for_connection(request.sender_id)?;
 581
 582            let guest_connection_ids = state
 583                .read_project(request.payload.project_id, request.sender_id)?
 584                .guest_connection_ids();
 585            state.register_worktree(
 586                request.payload.project_id,
 587                request.payload.worktree_id,
 588                request.sender_id,
 589                Worktree {
 590                    root_name: request.payload.root_name.clone(),
 591                    visible: request.payload.visible,
 592                },
 593            )?;
 594
 595            broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 596                self.peer
 597                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 598            });
 599        }
 600        self.update_user_contacts(host_user_id).await?;
 601        response.send(proto::Ack {})?;
 602        Ok(())
 603    }
 604
 605    async fn unregister_worktree(
 606        self: Arc<Server>,
 607        request: TypedEnvelope<proto::UnregisterWorktree>,
 608    ) -> Result<()> {
 609        let host_user_id;
 610        let project_id = request.payload.project_id;
 611        let worktree_id = request.payload.worktree_id;
 612        {
 613            let mut state = self.store_mut().await;
 614            let (_, guest_connection_ids) =
 615                state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 616            host_user_id = state.user_id_for_connection(request.sender_id)?;
 617            broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 618                self.peer.send(
 619                    conn_id,
 620                    proto::UnregisterWorktree {
 621                        project_id,
 622                        worktree_id,
 623                    },
 624                )
 625            });
 626        }
 627        self.update_user_contacts(host_user_id).await?;
 628        Ok(())
 629    }
 630
 631    async fn update_worktree(
 632        self: Arc<Server>,
 633        request: TypedEnvelope<proto::UpdateWorktree>,
 634        response: Response<proto::UpdateWorktree>,
 635    ) -> Result<()> {
 636        let connection_ids = self.store_mut().await.update_worktree(
 637            request.sender_id,
 638            request.payload.project_id,
 639            request.payload.worktree_id,
 640            &request.payload.removed_entries,
 641            &request.payload.updated_entries,
 642            request.payload.scan_id,
 643        )?;
 644
 645        broadcast(request.sender_id, connection_ids, |connection_id| {
 646            self.peer
 647                .forward_send(request.sender_id, connection_id, request.payload.clone())
 648        });
 649        response.send(proto::Ack {})?;
 650        Ok(())
 651    }
 652
 653    async fn update_diagnostic_summary(
 654        self: Arc<Server>,
 655        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 656    ) -> Result<()> {
 657        let summary = request
 658            .payload
 659            .summary
 660            .clone()
 661            .ok_or_else(|| anyhow!("invalid summary"))?;
 662        let receiver_ids = self.store_mut().await.update_diagnostic_summary(
 663            request.payload.project_id,
 664            request.payload.worktree_id,
 665            request.sender_id,
 666            summary,
 667        )?;
 668
 669        broadcast(request.sender_id, receiver_ids, |connection_id| {
 670            self.peer
 671                .forward_send(request.sender_id, connection_id, request.payload.clone())
 672        });
 673        Ok(())
 674    }
 675
 676    async fn start_language_server(
 677        self: Arc<Server>,
 678        request: TypedEnvelope<proto::StartLanguageServer>,
 679    ) -> Result<()> {
 680        let receiver_ids = self.store_mut().await.start_language_server(
 681            request.payload.project_id,
 682            request.sender_id,
 683            request
 684                .payload
 685                .server
 686                .clone()
 687                .ok_or_else(|| anyhow!("invalid language server"))?,
 688        )?;
 689        broadcast(request.sender_id, receiver_ids, |connection_id| {
 690            self.peer
 691                .forward_send(request.sender_id, connection_id, request.payload.clone())
 692        });
 693        Ok(())
 694    }
 695
 696    async fn update_language_server(
 697        self: Arc<Server>,
 698        request: TypedEnvelope<proto::UpdateLanguageServer>,
 699    ) -> Result<()> {
 700        let receiver_ids = self
 701            .store()
 702            .await
 703            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 704        broadcast(request.sender_id, receiver_ids, |connection_id| {
 705            self.peer
 706                .forward_send(request.sender_id, connection_id, request.payload.clone())
 707        });
 708        Ok(())
 709    }
 710
 711    async fn forward_project_request<T>(
 712        self: Arc<Server>,
 713        request: TypedEnvelope<T>,
 714        response: Response<T>,
 715    ) -> Result<()>
 716    where
 717        T: EntityMessage + RequestMessage,
 718    {
 719        let host_connection_id = self
 720            .store()
 721            .await
 722            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 723            .host_connection_id;
 724
 725        response.send(
 726            self.peer
 727                .forward_request(request.sender_id, host_connection_id, request.payload)
 728                .await?,
 729        )?;
 730        Ok(())
 731    }
 732
 733    async fn save_buffer(
 734        self: Arc<Server>,
 735        request: TypedEnvelope<proto::SaveBuffer>,
 736        response: Response<proto::SaveBuffer>,
 737    ) -> Result<()> {
 738        let host = self
 739            .store()
 740            .await
 741            .read_project(request.payload.project_id, request.sender_id)?
 742            .host_connection_id;
 743        let response_payload = self
 744            .peer
 745            .forward_request(request.sender_id, host, request.payload.clone())
 746            .await?;
 747
 748        let mut guests = self
 749            .store()
 750            .await
 751            .read_project(request.payload.project_id, request.sender_id)?
 752            .connection_ids();
 753        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 754        broadcast(host, guests, |conn_id| {
 755            self.peer
 756                .forward_send(host, conn_id, response_payload.clone())
 757        });
 758        response.send(response_payload)?;
 759        Ok(())
 760    }
 761
 762    async fn update_buffer(
 763        self: Arc<Server>,
 764        request: TypedEnvelope<proto::UpdateBuffer>,
 765        response: Response<proto::UpdateBuffer>,
 766    ) -> Result<()> {
 767        let receiver_ids = self
 768            .store()
 769            .await
 770            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 771        broadcast(request.sender_id, receiver_ids, |connection_id| {
 772            self.peer
 773                .forward_send(request.sender_id, connection_id, request.payload.clone())
 774        });
 775        response.send(proto::Ack {})?;
 776        Ok(())
 777    }
 778
 779    async fn update_buffer_file(
 780        self: Arc<Server>,
 781        request: TypedEnvelope<proto::UpdateBufferFile>,
 782    ) -> Result<()> {
 783        let receiver_ids = self
 784            .store()
 785            .await
 786            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 787        broadcast(request.sender_id, receiver_ids, |connection_id| {
 788            self.peer
 789                .forward_send(request.sender_id, connection_id, request.payload.clone())
 790        });
 791        Ok(())
 792    }
 793
 794    async fn buffer_reloaded(
 795        self: Arc<Server>,
 796        request: TypedEnvelope<proto::BufferReloaded>,
 797    ) -> Result<()> {
 798        let receiver_ids = self
 799            .store()
 800            .await
 801            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 802        broadcast(request.sender_id, receiver_ids, |connection_id| {
 803            self.peer
 804                .forward_send(request.sender_id, connection_id, request.payload.clone())
 805        });
 806        Ok(())
 807    }
 808
 809    async fn buffer_saved(
 810        self: Arc<Server>,
 811        request: TypedEnvelope<proto::BufferSaved>,
 812    ) -> Result<()> {
 813        let receiver_ids = self
 814            .store()
 815            .await
 816            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 817        broadcast(request.sender_id, receiver_ids, |connection_id| {
 818            self.peer
 819                .forward_send(request.sender_id, connection_id, request.payload.clone())
 820        });
 821        Ok(())
 822    }
 823
 824    async fn follow(
 825        self: Arc<Self>,
 826        request: TypedEnvelope<proto::Follow>,
 827        response: Response<proto::Follow>,
 828    ) -> Result<()> {
 829        let leader_id = ConnectionId(request.payload.leader_id);
 830        let follower_id = request.sender_id;
 831        if !self
 832            .store()
 833            .await
 834            .project_connection_ids(request.payload.project_id, follower_id)?
 835            .contains(&leader_id)
 836        {
 837            Err(anyhow!("no such peer"))?;
 838        }
 839        let mut response_payload = self
 840            .peer
 841            .forward_request(request.sender_id, leader_id, request.payload)
 842            .await?;
 843        response_payload
 844            .views
 845            .retain(|view| view.leader_id != Some(follower_id.0));
 846        response.send(response_payload)?;
 847        Ok(())
 848    }
 849
 850    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 851        let leader_id = ConnectionId(request.payload.leader_id);
 852        if !self
 853            .store()
 854            .await
 855            .project_connection_ids(request.payload.project_id, request.sender_id)?
 856            .contains(&leader_id)
 857        {
 858            Err(anyhow!("no such peer"))?;
 859        }
 860        self.peer
 861            .forward_send(request.sender_id, leader_id, request.payload)?;
 862        Ok(())
 863    }
 864
 865    async fn update_followers(
 866        self: Arc<Self>,
 867        request: TypedEnvelope<proto::UpdateFollowers>,
 868    ) -> Result<()> {
 869        let connection_ids = self
 870            .store()
 871            .await
 872            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 873        let leader_id = request
 874            .payload
 875            .variant
 876            .as_ref()
 877            .and_then(|variant| match variant {
 878                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 879                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 880                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 881            });
 882        for follower_id in &request.payload.follower_ids {
 883            let follower_id = ConnectionId(*follower_id);
 884            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 885                self.peer
 886                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 887            }
 888        }
 889        Ok(())
 890    }
 891
 892    async fn get_channels(
 893        self: Arc<Server>,
 894        request: TypedEnvelope<proto::GetChannels>,
 895        response: Response<proto::GetChannels>,
 896    ) -> Result<()> {
 897        let user_id = self
 898            .store()
 899            .await
 900            .user_id_for_connection(request.sender_id)?;
 901        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 902        response.send(proto::GetChannelsResponse {
 903            channels: channels
 904                .into_iter()
 905                .map(|chan| proto::Channel {
 906                    id: chan.id.to_proto(),
 907                    name: chan.name,
 908                })
 909                .collect(),
 910        })?;
 911        Ok(())
 912    }
 913
 914    async fn get_users(
 915        self: Arc<Server>,
 916        request: TypedEnvelope<proto::GetUsers>,
 917        response: Response<proto::GetUsers>,
 918    ) -> Result<()> {
 919        let user_ids = request
 920            .payload
 921            .user_ids
 922            .into_iter()
 923            .map(UserId::from_proto)
 924            .collect();
 925        let users = self
 926            .app_state
 927            .db
 928            .get_users_by_ids(user_ids)
 929            .await?
 930            .into_iter()
 931            .map(|user| proto::User {
 932                id: user.id.to_proto(),
 933                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 934                github_login: user.github_login,
 935            })
 936            .collect();
 937        response.send(proto::UsersResponse { users })?;
 938        Ok(())
 939    }
 940
 941    async fn fuzzy_search_users(
 942        self: Arc<Server>,
 943        request: TypedEnvelope<proto::FuzzySearchUsers>,
 944        response: Response<proto::FuzzySearchUsers>,
 945    ) -> Result<()> {
 946        let user_id = self
 947            .store()
 948            .await
 949            .user_id_for_connection(request.sender_id)?;
 950        let query = request.payload.query;
 951        let db = &self.app_state.db;
 952        let users = match query.len() {
 953            0 => vec![],
 954            1 | 2 => db
 955                .get_user_by_github_login(&query)
 956                .await?
 957                .into_iter()
 958                .collect(),
 959            _ => db.fuzzy_search_users(&query, 10).await?,
 960        };
 961        let users = users
 962            .into_iter()
 963            .filter(|user| user.id != user_id)
 964            .map(|user| proto::User {
 965                id: user.id.to_proto(),
 966                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 967                github_login: user.github_login,
 968            })
 969            .collect();
 970        response.send(proto::UsersResponse { users })?;
 971        Ok(())
 972    }
 973
 974    async fn request_contact(
 975        self: Arc<Server>,
 976        request: TypedEnvelope<proto::RequestContact>,
 977        response: Response<proto::RequestContact>,
 978    ) -> Result<()> {
 979        let requester_id = self
 980            .store()
 981            .await
 982            .user_id_for_connection(request.sender_id)?;
 983        let responder_id = UserId::from_proto(request.payload.responder_id);
 984        if requester_id == responder_id {
 985            return Err(anyhow!("cannot add yourself as a contact"))?;
 986        }
 987
 988        self.app_state
 989            .db
 990            .send_contact_request(requester_id, responder_id)
 991            .await?;
 992
 993        // Update outgoing contact requests of requester
 994        let mut update = proto::UpdateContacts::default();
 995        update.outgoing_requests.push(responder_id.to_proto());
 996        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
 997            self.peer.send(connection_id, update.clone())?;
 998        }
 999
1000        // Update incoming contact requests of responder
1001        let mut update = proto::UpdateContacts::default();
1002        update
1003            .incoming_requests
1004            .push(proto::IncomingContactRequest {
1005                requester_id: requester_id.to_proto(),
1006                should_notify: true,
1007            });
1008        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1009            self.peer.send(connection_id, update.clone())?;
1010        }
1011
1012        response.send(proto::Ack {})?;
1013        Ok(())
1014    }
1015
1016    async fn respond_to_contact_request(
1017        self: Arc<Server>,
1018        request: TypedEnvelope<proto::RespondToContactRequest>,
1019        response: Response<proto::RespondToContactRequest>,
1020    ) -> Result<()> {
1021        let responder_id = self
1022            .store()
1023            .await
1024            .user_id_for_connection(request.sender_id)?;
1025        let requester_id = UserId::from_proto(request.payload.requester_id);
1026        let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1027        self.app_state
1028            .db
1029            .respond_to_contact_request(responder_id, requester_id, accept)
1030            .await?;
1031
1032        let store = self.store().await;
1033        // Update responder with new contact
1034        let mut update = proto::UpdateContacts::default();
1035        if accept {
1036            update.contacts.push(store.contact_for_user(requester_id));
1037        }
1038        update
1039            .remove_incoming_requests
1040            .push(requester_id.to_proto());
1041        for connection_id in store.connection_ids_for_user(responder_id) {
1042            self.peer.send(connection_id, update.clone())?;
1043        }
1044
1045        // Update requester with new contact
1046        let mut update = proto::UpdateContacts::default();
1047        if accept {
1048            update.contacts.push(store.contact_for_user(responder_id));
1049        }
1050        update
1051            .remove_outgoing_requests
1052            .push(responder_id.to_proto());
1053        for connection_id in store.connection_ids_for_user(requester_id) {
1054            self.peer.send(connection_id, update.clone())?;
1055        }
1056
1057        response.send(proto::Ack {})?;
1058        Ok(())
1059    }
1060
1061    async fn remove_contact(
1062        self: Arc<Server>,
1063        request: TypedEnvelope<proto::RemoveContact>,
1064        response: Response<proto::RemoveContact>,
1065    ) -> Result<()> {
1066        let requester_id = self
1067            .store()
1068            .await
1069            .user_id_for_connection(request.sender_id)?;
1070        let responder_id = UserId::from_proto(request.payload.user_id);
1071        self.app_state
1072            .db
1073            .remove_contact(requester_id, responder_id)
1074            .await?;
1075
1076        // Update outgoing contact requests of requester
1077        let mut update = proto::UpdateContacts::default();
1078        update
1079            .remove_outgoing_requests
1080            .push(responder_id.to_proto());
1081        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1082            self.peer.send(connection_id, update.clone())?;
1083        }
1084
1085        // Update incoming contact requests of responder
1086        let mut update = proto::UpdateContacts::default();
1087        update
1088            .remove_incoming_requests
1089            .push(requester_id.to_proto());
1090        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1091            self.peer.send(connection_id, update.clone())?;
1092        }
1093
1094        response.send(proto::Ack {})?;
1095        Ok(())
1096    }
1097
1098    // #[instrument(skip(self, state, user_ids))]
1099    // fn update_contacts_for_users<'a>(
1100    //     self: &Arc<Self>,
1101    //     state: &Store,
1102    //     user_ids: impl IntoIterator<Item = &'a UserId>,
1103    // ) {
1104    //     for user_id in user_ids {
1105    //         let contacts = state.contacts_for_user(*user_id);
1106    //         for connection_id in state.connection_ids_for_user(*user_id) {
1107    //             self.peer
1108    //                 .send(
1109    //                     connection_id,
1110    //                     proto::UpdateContacts {
1111    //                         contacts: contacts.clone(),
1112    //                         pending_requests_from_user_ids: Default::default(),
1113    //                         pending_requests_to_user_ids: Default::default(),
1114    //                     },
1115    //                 )
1116    //                 .trace_err();
1117    //         }
1118    //     }
1119    // }
1120
1121    async fn join_channel(
1122        self: Arc<Self>,
1123        request: TypedEnvelope<proto::JoinChannel>,
1124        response: Response<proto::JoinChannel>,
1125    ) -> Result<()> {
1126        let user_id = self
1127            .store()
1128            .await
1129            .user_id_for_connection(request.sender_id)?;
1130        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1131        if !self
1132            .app_state
1133            .db
1134            .can_user_access_channel(user_id, channel_id)
1135            .await?
1136        {
1137            Err(anyhow!("access denied"))?;
1138        }
1139
1140        self.store_mut()
1141            .await
1142            .join_channel(request.sender_id, channel_id);
1143        let messages = self
1144            .app_state
1145            .db
1146            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1147            .await?
1148            .into_iter()
1149            .map(|msg| proto::ChannelMessage {
1150                id: msg.id.to_proto(),
1151                body: msg.body,
1152                timestamp: msg.sent_at.unix_timestamp() as u64,
1153                sender_id: msg.sender_id.to_proto(),
1154                nonce: Some(msg.nonce.as_u128().into()),
1155            })
1156            .collect::<Vec<_>>();
1157        response.send(proto::JoinChannelResponse {
1158            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1159            messages,
1160        })?;
1161        Ok(())
1162    }
1163
1164    async fn leave_channel(
1165        self: Arc<Self>,
1166        request: TypedEnvelope<proto::LeaveChannel>,
1167    ) -> Result<()> {
1168        let user_id = self
1169            .store()
1170            .await
1171            .user_id_for_connection(request.sender_id)?;
1172        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1173        if !self
1174            .app_state
1175            .db
1176            .can_user_access_channel(user_id, channel_id)
1177            .await?
1178        {
1179            Err(anyhow!("access denied"))?;
1180        }
1181
1182        self.store_mut()
1183            .await
1184            .leave_channel(request.sender_id, channel_id);
1185
1186        Ok(())
1187    }
1188
1189    async fn send_channel_message(
1190        self: Arc<Self>,
1191        request: TypedEnvelope<proto::SendChannelMessage>,
1192        response: Response<proto::SendChannelMessage>,
1193    ) -> Result<()> {
1194        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1195        let user_id;
1196        let connection_ids;
1197        {
1198            let state = self.store().await;
1199            user_id = state.user_id_for_connection(request.sender_id)?;
1200            connection_ids = state.channel_connection_ids(channel_id)?;
1201        }
1202
1203        // Validate the message body.
1204        let body = request.payload.body.trim().to_string();
1205        if body.len() > MAX_MESSAGE_LEN {
1206            return Err(anyhow!("message is too long"))?;
1207        }
1208        if body.is_empty() {
1209            return Err(anyhow!("message can't be blank"))?;
1210        }
1211
1212        let timestamp = OffsetDateTime::now_utc();
1213        let nonce = request
1214            .payload
1215            .nonce
1216            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1217
1218        let message_id = self
1219            .app_state
1220            .db
1221            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1222            .await?
1223            .to_proto();
1224        let message = proto::ChannelMessage {
1225            sender_id: user_id.to_proto(),
1226            id: message_id,
1227            body,
1228            timestamp: timestamp.unix_timestamp() as u64,
1229            nonce: Some(nonce),
1230        };
1231        broadcast(request.sender_id, connection_ids, |conn_id| {
1232            self.peer.send(
1233                conn_id,
1234                proto::ChannelMessageSent {
1235                    channel_id: channel_id.to_proto(),
1236                    message: Some(message.clone()),
1237                },
1238            )
1239        });
1240        response.send(proto::SendChannelMessageResponse {
1241            message: Some(message),
1242        })?;
1243        Ok(())
1244    }
1245
1246    async fn get_channel_messages(
1247        self: Arc<Self>,
1248        request: TypedEnvelope<proto::GetChannelMessages>,
1249        response: Response<proto::GetChannelMessages>,
1250    ) -> Result<()> {
1251        let user_id = self
1252            .store()
1253            .await
1254            .user_id_for_connection(request.sender_id)?;
1255        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1256        if !self
1257            .app_state
1258            .db
1259            .can_user_access_channel(user_id, channel_id)
1260            .await?
1261        {
1262            Err(anyhow!("access denied"))?;
1263        }
1264
1265        let messages = self
1266            .app_state
1267            .db
1268            .get_channel_messages(
1269                channel_id,
1270                MESSAGE_COUNT_PER_PAGE,
1271                Some(MessageId::from_proto(request.payload.before_message_id)),
1272            )
1273            .await?
1274            .into_iter()
1275            .map(|msg| proto::ChannelMessage {
1276                id: msg.id.to_proto(),
1277                body: msg.body,
1278                timestamp: msg.sent_at.unix_timestamp() as u64,
1279                sender_id: msg.sender_id.to_proto(),
1280                nonce: Some(msg.nonce.as_u128().into()),
1281            })
1282            .collect::<Vec<_>>();
1283        response.send(proto::GetChannelMessagesResponse {
1284            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1285            messages,
1286        })?;
1287        Ok(())
1288    }
1289
1290    async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1291        #[cfg(test)]
1292        tokio::task::yield_now().await;
1293        let guard = self.store.read().await;
1294        #[cfg(test)]
1295        tokio::task::yield_now().await;
1296        StoreReadGuard {
1297            guard,
1298            _not_send: PhantomData,
1299        }
1300    }
1301
1302    async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1303        #[cfg(test)]
1304        tokio::task::yield_now().await;
1305        let guard = self.store.write().await;
1306        #[cfg(test)]
1307        tokio::task::yield_now().await;
1308        StoreWriteGuard {
1309            guard,
1310            _not_send: PhantomData,
1311        }
1312    }
1313}
1314
1315impl<'a> Deref for StoreReadGuard<'a> {
1316    type Target = Store;
1317
1318    fn deref(&self) -> &Self::Target {
1319        &*self.guard
1320    }
1321}
1322
1323impl<'a> Deref for StoreWriteGuard<'a> {
1324    type Target = Store;
1325
1326    fn deref(&self) -> &Self::Target {
1327        &*self.guard
1328    }
1329}
1330
1331impl<'a> DerefMut for StoreWriteGuard<'a> {
1332    fn deref_mut(&mut self) -> &mut Self::Target {
1333        &mut *self.guard
1334    }
1335}
1336
1337impl<'a> Drop for StoreWriteGuard<'a> {
1338    fn drop(&mut self) {
1339        #[cfg(test)]
1340        self.check_invariants();
1341
1342        let metrics = self.metrics();
1343        tracing::info!(
1344            connections = metrics.connections,
1345            registered_projects = metrics.registered_projects,
1346            shared_projects = metrics.shared_projects,
1347            "metrics"
1348        );
1349    }
1350}
1351
1352impl Executor for RealExecutor {
1353    type Sleep = Sleep;
1354
1355    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1356        tokio::task::spawn(future);
1357    }
1358
1359    fn sleep(&self, duration: Duration) -> Self::Sleep {
1360        tokio::time::sleep(duration)
1361    }
1362}
1363
1364fn broadcast<F>(
1365    sender_id: ConnectionId,
1366    receiver_ids: impl IntoIterator<Item = ConnectionId>,
1367    mut f: F,
1368) where
1369    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1370{
1371    for receiver_id in receiver_ids {
1372        if receiver_id != sender_id {
1373            f(receiver_id).trace_err();
1374        }
1375    }
1376}
1377
1378lazy_static! {
1379    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1380}
1381
1382pub struct ProtocolVersion(u32);
1383
1384impl Header for ProtocolVersion {
1385    fn name() -> &'static HeaderName {
1386        &ZED_PROTOCOL_VERSION
1387    }
1388
1389    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1390    where
1391        Self: Sized,
1392        I: Iterator<Item = &'i axum::http::HeaderValue>,
1393    {
1394        let version = values
1395            .next()
1396            .ok_or_else(|| axum::headers::Error::invalid())?
1397            .to_str()
1398            .map_err(|_| axum::headers::Error::invalid())?
1399            .parse()
1400            .map_err(|_| axum::headers::Error::invalid())?;
1401        Ok(Self(version))
1402    }
1403
1404    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1405        values.extend([self.0.to_string().parse().unwrap()]);
1406    }
1407}
1408
1409pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1410    let server = Server::new(app_state.clone(), None);
1411    Router::new()
1412        .route("/rpc", get(handle_websocket_request))
1413        .layer(
1414            ServiceBuilder::new()
1415                .layer(Extension(app_state))
1416                .layer(middleware::from_fn(auth::validate_header))
1417                .layer(Extension(server)),
1418        )
1419}
1420
1421pub async fn handle_websocket_request(
1422    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1423    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1424    Extension(server): Extension<Arc<Server>>,
1425    Extension(user_id): Extension<UserId>,
1426    ws: WebSocketUpgrade,
1427) -> axum::response::Response {
1428    if protocol_version != rpc::PROTOCOL_VERSION {
1429        return (
1430            StatusCode::UPGRADE_REQUIRED,
1431            "client must be upgraded".to_string(),
1432        )
1433            .into_response();
1434    }
1435    let socket_address = socket_address.to_string();
1436    ws.on_upgrade(move |socket| {
1437        use util::ResultExt;
1438        let socket = socket
1439            .map_ok(to_tungstenite_message)
1440            .err_into()
1441            .with(|message| async move { Ok(to_axum_message(message)) });
1442        let connection = Connection::new(Box::pin(socket));
1443        async move {
1444            server
1445                .handle_connection(connection, socket_address, user_id, None, RealExecutor)
1446                .await
1447                .log_err();
1448        }
1449    })
1450}
1451
1452fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1453    match message {
1454        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1455        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1456        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1457        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1458        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1459            code: frame.code.into(),
1460            reason: frame.reason,
1461        })),
1462    }
1463}
1464
1465fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1466    match message {
1467        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1468        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1469        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1470        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1471        AxumMessage::Close(frame) => {
1472            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1473                code: frame.code.into(),
1474                reason: frame.reason,
1475            }))
1476        }
1477    }
1478}
1479
1480pub trait ResultExt {
1481    type Ok;
1482
1483    fn trace_err(self) -> Option<Self::Ok>;
1484}
1485
1486impl<T, E> ResultExt for Result<T, E>
1487where
1488    E: std::fmt::Debug,
1489{
1490    type Ok = T;
1491
1492    fn trace_err(self) -> Option<T> {
1493        match self {
1494            Ok(value) => Some(value),
1495            Err(error) => {
1496                tracing::error!("{:?}", error);
1497                None
1498            }
1499        }
1500    }
1501}
1502
1503#[cfg(test)]
1504mod tests {
1505    use super::*;
1506    use crate::{
1507        db::{tests::TestDb, UserId},
1508        AppState,
1509    };
1510    use ::rpc::Peer;
1511    use client::{
1512        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1513        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1514    };
1515    use collections::{BTreeMap, HashSet};
1516    use editor::{
1517        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1518        ToOffset, ToggleCodeActions, Undo,
1519    };
1520    use gpui::{
1521        executor::{self, Deterministic},
1522        geometry::vector::vec2f,
1523        ModelHandle, TestAppContext, ViewHandle,
1524    };
1525    use language::{
1526        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1527        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1528    };
1529    use lsp::{self, FakeLanguageServer};
1530    use parking_lot::Mutex;
1531    use project::{
1532        fs::{FakeFs, Fs as _},
1533        search::SearchQuery,
1534        worktree::WorktreeHandle,
1535        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1536    };
1537    use rand::prelude::*;
1538    use rpc::PeerId;
1539    use serde_json::json;
1540    use settings::Settings;
1541    use sqlx::types::time::OffsetDateTime;
1542    use std::{
1543        env,
1544        ops::Deref,
1545        path::{Path, PathBuf},
1546        rc::Rc,
1547        sync::{
1548            atomic::{AtomicBool, Ordering::SeqCst},
1549            Arc,
1550        },
1551        time::Duration,
1552    };
1553    use theme::ThemeRegistry;
1554    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1555
1556    #[cfg(test)]
1557    #[ctor::ctor]
1558    fn init_logger() {
1559        if std::env::var("RUST_LOG").is_ok() {
1560            env_logger::init();
1561        }
1562    }
1563
1564    #[gpui::test(iterations = 10)]
1565    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1566        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1567        let lang_registry = Arc::new(LanguageRegistry::test());
1568        let fs = FakeFs::new(cx_a.background());
1569        cx_a.foreground().forbid_parking();
1570
1571        // Connect to a server as 2 clients.
1572        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1573        let client_a = server.create_client(cx_a, "user_a").await;
1574        let client_b = server.create_client(cx_b, "user_b").await;
1575        server
1576            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1577            .await;
1578
1579        // Share a project as client A
1580        fs.insert_tree(
1581            "/a",
1582            json!({
1583                "a.txt": "a-contents",
1584                "b.txt": "b-contents",
1585            }),
1586        )
1587        .await;
1588        let project_a = cx_a.update(|cx| {
1589            Project::local(
1590                client_a.clone(),
1591                client_a.user_store.clone(),
1592                lang_registry.clone(),
1593                fs.clone(),
1594                cx,
1595            )
1596        });
1597        let (worktree_a, _) = project_a
1598            .update(cx_a, |p, cx| {
1599                p.find_or_create_local_worktree("/a", true, cx)
1600            })
1601            .await
1602            .unwrap();
1603        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1604        worktree_a
1605            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1606            .await;
1607        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1608        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1609
1610        // Join that project as client B
1611        let project_b = Project::remote(
1612            project_id,
1613            client_b.clone(),
1614            client_b.user_store.clone(),
1615            lang_registry.clone(),
1616            fs.clone(),
1617            &mut cx_b.to_async(),
1618        )
1619        .await
1620        .unwrap();
1621
1622        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1623            assert_eq!(
1624                project
1625                    .collaborators()
1626                    .get(&client_a.peer_id)
1627                    .unwrap()
1628                    .user
1629                    .github_login,
1630                "user_a"
1631            );
1632            project.replica_id()
1633        });
1634        project_a
1635            .condition(&cx_a, |tree, _| {
1636                tree.collaborators()
1637                    .get(&client_b.peer_id)
1638                    .map_or(false, |collaborator| {
1639                        collaborator.replica_id == replica_id_b
1640                            && collaborator.user.github_login == "user_b"
1641                    })
1642            })
1643            .await;
1644
1645        // Open the same file as client B and client A.
1646        let buffer_b = project_b
1647            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1648            .await
1649            .unwrap();
1650        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1651        project_a.read_with(cx_a, |project, cx| {
1652            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1653        });
1654        let buffer_a = project_a
1655            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1656            .await
1657            .unwrap();
1658
1659        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1660
1661        // TODO
1662        // // Create a selection set as client B and see that selection set as client A.
1663        // buffer_a
1664        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1665        //     .await;
1666
1667        // Edit the buffer as client B and see that edit as client A.
1668        editor_b.update(cx_b, |editor, cx| {
1669            editor.handle_input(&Input("ok, ".into()), cx)
1670        });
1671        buffer_a
1672            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1673            .await;
1674
1675        // TODO
1676        // // Remove the selection set as client B, see those selections disappear as client A.
1677        cx_b.update(move |_| drop(editor_b));
1678        // buffer_a
1679        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1680        //     .await;
1681
1682        // Dropping the client B's project removes client B from client A's collaborators.
1683        cx_b.update(move |_| drop(project_b));
1684        project_a
1685            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1686            .await;
1687    }
1688
1689    #[gpui::test(iterations = 10)]
1690    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1691        let lang_registry = Arc::new(LanguageRegistry::test());
1692        let fs = FakeFs::new(cx_a.background());
1693        cx_a.foreground().forbid_parking();
1694
1695        // Connect to a server as 2 clients.
1696        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1697        let client_a = server.create_client(cx_a, "user_a").await;
1698        let client_b = server.create_client(cx_b, "user_b").await;
1699        server
1700            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1701            .await;
1702
1703        // Share a project as client A
1704        fs.insert_tree(
1705            "/a",
1706            json!({
1707                "a.txt": "a-contents",
1708                "b.txt": "b-contents",
1709            }),
1710        )
1711        .await;
1712        let project_a = cx_a.update(|cx| {
1713            Project::local(
1714                client_a.clone(),
1715                client_a.user_store.clone(),
1716                lang_registry.clone(),
1717                fs.clone(),
1718                cx,
1719            )
1720        });
1721        let (worktree_a, _) = project_a
1722            .update(cx_a, |p, cx| {
1723                p.find_or_create_local_worktree("/a", true, cx)
1724            })
1725            .await
1726            .unwrap();
1727        worktree_a
1728            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1729            .await;
1730        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1731        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1732        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1733        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1734
1735        // Join that project as client B
1736        let project_b = Project::remote(
1737            project_id,
1738            client_b.clone(),
1739            client_b.user_store.clone(),
1740            lang_registry.clone(),
1741            fs.clone(),
1742            &mut cx_b.to_async(),
1743        )
1744        .await
1745        .unwrap();
1746        project_b
1747            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1748            .await
1749            .unwrap();
1750
1751        // Unshare the project as client A
1752        project_a.update(cx_a, |project, cx| project.unshare(cx));
1753        project_b
1754            .condition(cx_b, |project, _| project.is_read_only())
1755            .await;
1756        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1757        cx_b.update(|_| {
1758            drop(project_b);
1759        });
1760
1761        // Share the project again and ensure guests can still join.
1762        project_a
1763            .update(cx_a, |project, cx| project.share(cx))
1764            .await
1765            .unwrap();
1766        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1767
1768        let project_b2 = Project::remote(
1769            project_id,
1770            client_b.clone(),
1771            client_b.user_store.clone(),
1772            lang_registry.clone(),
1773            fs.clone(),
1774            &mut cx_b.to_async(),
1775        )
1776        .await
1777        .unwrap();
1778        project_b2
1779            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1780            .await
1781            .unwrap();
1782    }
1783
1784    #[gpui::test(iterations = 10)]
1785    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1786        let lang_registry = Arc::new(LanguageRegistry::test());
1787        let fs = FakeFs::new(cx_a.background());
1788        cx_a.foreground().forbid_parking();
1789
1790        // Connect to a server as 2 clients.
1791        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1792        let client_a = server.create_client(cx_a, "user_a").await;
1793        let client_b = server.create_client(cx_b, "user_b").await;
1794        server
1795            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1796            .await;
1797
1798        // Share a project as client A
1799        fs.insert_tree(
1800            "/a",
1801            json!({
1802                "a.txt": "a-contents",
1803                "b.txt": "b-contents",
1804            }),
1805        )
1806        .await;
1807        let project_a = cx_a.update(|cx| {
1808            Project::local(
1809                client_a.clone(),
1810                client_a.user_store.clone(),
1811                lang_registry.clone(),
1812                fs.clone(),
1813                cx,
1814            )
1815        });
1816        let (worktree_a, _) = project_a
1817            .update(cx_a, |p, cx| {
1818                p.find_or_create_local_worktree("/a", true, cx)
1819            })
1820            .await
1821            .unwrap();
1822        worktree_a
1823            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1824            .await;
1825        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1826        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1827        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1828        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1829
1830        // Join that project as client B
1831        let project_b = Project::remote(
1832            project_id,
1833            client_b.clone(),
1834            client_b.user_store.clone(),
1835            lang_registry.clone(),
1836            fs.clone(),
1837            &mut cx_b.to_async(),
1838        )
1839        .await
1840        .unwrap();
1841        project_b
1842            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1843            .await
1844            .unwrap();
1845
1846        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1847        server.disconnect_client(client_a.current_user_id(cx_a));
1848        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1849        project_a
1850            .condition(cx_a, |project, _| project.collaborators().is_empty())
1851            .await;
1852        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1853        project_b
1854            .condition(cx_b, |project, _| project.is_read_only())
1855            .await;
1856        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1857        cx_b.update(|_| {
1858            drop(project_b);
1859        });
1860
1861        // Await reconnection
1862        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1863
1864        // Share the project again and ensure guests can still join.
1865        project_a
1866            .update(cx_a, |project, cx| project.share(cx))
1867            .await
1868            .unwrap();
1869        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1870
1871        let project_b2 = Project::remote(
1872            project_id,
1873            client_b.clone(),
1874            client_b.user_store.clone(),
1875            lang_registry.clone(),
1876            fs.clone(),
1877            &mut cx_b.to_async(),
1878        )
1879        .await
1880        .unwrap();
1881        project_b2
1882            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1883            .await
1884            .unwrap();
1885    }
1886
1887    #[gpui::test(iterations = 10)]
1888    async fn test_propagate_saves_and_fs_changes(
1889        cx_a: &mut TestAppContext,
1890        cx_b: &mut TestAppContext,
1891        cx_c: &mut TestAppContext,
1892    ) {
1893        let lang_registry = Arc::new(LanguageRegistry::test());
1894        let fs = FakeFs::new(cx_a.background());
1895        cx_a.foreground().forbid_parking();
1896
1897        // Connect to a server as 3 clients.
1898        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1899        let client_a = server.create_client(cx_a, "user_a").await;
1900        let client_b = server.create_client(cx_b, "user_b").await;
1901        let client_c = server.create_client(cx_c, "user_c").await;
1902        server
1903            .make_contacts(vec![
1904                (&client_a, cx_a),
1905                (&client_b, cx_b),
1906                (&client_c, cx_c),
1907            ])
1908            .await;
1909
1910        // Share a worktree as client A.
1911        fs.insert_tree(
1912            "/a",
1913            json!({
1914                "file1": "",
1915                "file2": ""
1916            }),
1917        )
1918        .await;
1919        let project_a = cx_a.update(|cx| {
1920            Project::local(
1921                client_a.clone(),
1922                client_a.user_store.clone(),
1923                lang_registry.clone(),
1924                fs.clone(),
1925                cx,
1926            )
1927        });
1928        let (worktree_a, _) = project_a
1929            .update(cx_a, |p, cx| {
1930                p.find_or_create_local_worktree("/a", true, cx)
1931            })
1932            .await
1933            .unwrap();
1934        worktree_a
1935            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1936            .await;
1937        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1938        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1939        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1940
1941        // Join that worktree as clients B and C.
1942        let project_b = Project::remote(
1943            project_id,
1944            client_b.clone(),
1945            client_b.user_store.clone(),
1946            lang_registry.clone(),
1947            fs.clone(),
1948            &mut cx_b.to_async(),
1949        )
1950        .await
1951        .unwrap();
1952        let project_c = Project::remote(
1953            project_id,
1954            client_c.clone(),
1955            client_c.user_store.clone(),
1956            lang_registry.clone(),
1957            fs.clone(),
1958            &mut cx_c.to_async(),
1959        )
1960        .await
1961        .unwrap();
1962        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1963        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1964
1965        // Open and edit a buffer as both guests B and C.
1966        let buffer_b = project_b
1967            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1968            .await
1969            .unwrap();
1970        let buffer_c = project_c
1971            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1972            .await
1973            .unwrap();
1974        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1975        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1976
1977        // Open and edit that buffer as the host.
1978        let buffer_a = project_a
1979            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1980            .await
1981            .unwrap();
1982
1983        buffer_a
1984            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1985            .await;
1986        buffer_a.update(cx_a, |buf, cx| {
1987            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1988        });
1989
1990        // Wait for edits to propagate
1991        buffer_a
1992            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1993            .await;
1994        buffer_b
1995            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1996            .await;
1997        buffer_c
1998            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1999            .await;
2000
2001        // Edit the buffer as the host and concurrently save as guest B.
2002        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2003        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2004        save_b.await.unwrap();
2005        assert_eq!(
2006            fs.load("/a/file1".as_ref()).await.unwrap(),
2007            "hi-a, i-am-c, i-am-b, i-am-a"
2008        );
2009        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2010        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2011        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2012
2013        worktree_a.flush_fs_events(cx_a).await;
2014
2015        // Make changes on host's file system, see those changes on guest worktrees.
2016        fs.rename(
2017            "/a/file1".as_ref(),
2018            "/a/file1-renamed".as_ref(),
2019            Default::default(),
2020        )
2021        .await
2022        .unwrap();
2023
2024        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2025            .await
2026            .unwrap();
2027        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2028
2029        worktree_a
2030            .condition(&cx_a, |tree, _| {
2031                tree.paths()
2032                    .map(|p| p.to_string_lossy())
2033                    .collect::<Vec<_>>()
2034                    == ["file1-renamed", "file3", "file4"]
2035            })
2036            .await;
2037        worktree_b
2038            .condition(&cx_b, |tree, _| {
2039                tree.paths()
2040                    .map(|p| p.to_string_lossy())
2041                    .collect::<Vec<_>>()
2042                    == ["file1-renamed", "file3", "file4"]
2043            })
2044            .await;
2045        worktree_c
2046            .condition(&cx_c, |tree, _| {
2047                tree.paths()
2048                    .map(|p| p.to_string_lossy())
2049                    .collect::<Vec<_>>()
2050                    == ["file1-renamed", "file3", "file4"]
2051            })
2052            .await;
2053
2054        // Ensure buffer files are updated as well.
2055        buffer_a
2056            .condition(&cx_a, |buf, _| {
2057                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2058            })
2059            .await;
2060        buffer_b
2061            .condition(&cx_b, |buf, _| {
2062                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2063            })
2064            .await;
2065        buffer_c
2066            .condition(&cx_c, |buf, _| {
2067                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2068            })
2069            .await;
2070    }
2071
2072    #[gpui::test(iterations = 10)]
2073    async fn test_fs_operations(
2074        executor: Arc<Deterministic>,
2075        cx_a: &mut TestAppContext,
2076        cx_b: &mut TestAppContext,
2077    ) {
2078        executor.forbid_parking();
2079        let fs = FakeFs::new(cx_a.background());
2080
2081        // Connect to a server as 2 clients.
2082        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2083        let mut client_a = server.create_client(cx_a, "user_a").await;
2084        let mut client_b = server.create_client(cx_b, "user_b").await;
2085        server
2086            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2087            .await;
2088
2089        // Share a project as client A
2090        fs.insert_tree(
2091            "/dir",
2092            json!({
2093                "a.txt": "a-contents",
2094                "b.txt": "b-contents",
2095            }),
2096        )
2097        .await;
2098
2099        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2100        let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
2101        project_a
2102            .update(cx_a, |project, cx| project.share(cx))
2103            .await
2104            .unwrap();
2105
2106        let project_b = client_b.build_remote_project(project_id, cx_b).await;
2107
2108        let worktree_a =
2109            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2110        let worktree_b =
2111            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2112
2113        let entry = project_b
2114            .update(cx_b, |project, cx| {
2115                project
2116                    .create_entry((worktree_id, "c.txt"), false, cx)
2117                    .unwrap()
2118            })
2119            .await
2120            .unwrap();
2121        worktree_a.read_with(cx_a, |worktree, _| {
2122            assert_eq!(
2123                worktree
2124                    .paths()
2125                    .map(|p| p.to_string_lossy())
2126                    .collect::<Vec<_>>(),
2127                ["a.txt", "b.txt", "c.txt"]
2128            );
2129        });
2130        worktree_b.read_with(cx_b, |worktree, _| {
2131            assert_eq!(
2132                worktree
2133                    .paths()
2134                    .map(|p| p.to_string_lossy())
2135                    .collect::<Vec<_>>(),
2136                ["a.txt", "b.txt", "c.txt"]
2137            );
2138        });
2139
2140        project_b
2141            .update(cx_b, |project, cx| {
2142                project.rename_entry(entry.id, Path::new("d.txt"), cx)
2143            })
2144            .unwrap()
2145            .await
2146            .unwrap();
2147        worktree_a.read_with(cx_a, |worktree, _| {
2148            assert_eq!(
2149                worktree
2150                    .paths()
2151                    .map(|p| p.to_string_lossy())
2152                    .collect::<Vec<_>>(),
2153                ["a.txt", "b.txt", "d.txt"]
2154            );
2155        });
2156        worktree_b.read_with(cx_b, |worktree, _| {
2157            assert_eq!(
2158                worktree
2159                    .paths()
2160                    .map(|p| p.to_string_lossy())
2161                    .collect::<Vec<_>>(),
2162                ["a.txt", "b.txt", "d.txt"]
2163            );
2164        });
2165
2166        let dir_entry = project_b
2167            .update(cx_b, |project, cx| {
2168                project
2169                    .create_entry((worktree_id, "DIR"), true, cx)
2170                    .unwrap()
2171            })
2172            .await
2173            .unwrap();
2174        worktree_a.read_with(cx_a, |worktree, _| {
2175            assert_eq!(
2176                worktree
2177                    .paths()
2178                    .map(|p| p.to_string_lossy())
2179                    .collect::<Vec<_>>(),
2180                ["DIR", "a.txt", "b.txt", "d.txt"]
2181            );
2182        });
2183        worktree_b.read_with(cx_b, |worktree, _| {
2184            assert_eq!(
2185                worktree
2186                    .paths()
2187                    .map(|p| p.to_string_lossy())
2188                    .collect::<Vec<_>>(),
2189                ["DIR", "a.txt", "b.txt", "d.txt"]
2190            );
2191        });
2192
2193        project_b
2194            .update(cx_b, |project, cx| {
2195                project.delete_entry(dir_entry.id, cx).unwrap()
2196            })
2197            .await
2198            .unwrap();
2199        worktree_a.read_with(cx_a, |worktree, _| {
2200            assert_eq!(
2201                worktree
2202                    .paths()
2203                    .map(|p| p.to_string_lossy())
2204                    .collect::<Vec<_>>(),
2205                ["a.txt", "b.txt", "d.txt"]
2206            );
2207        });
2208        worktree_b.read_with(cx_b, |worktree, _| {
2209            assert_eq!(
2210                worktree
2211                    .paths()
2212                    .map(|p| p.to_string_lossy())
2213                    .collect::<Vec<_>>(),
2214                ["a.txt", "b.txt", "d.txt"]
2215            );
2216        });
2217
2218        project_b
2219            .update(cx_b, |project, cx| {
2220                project.delete_entry(entry.id, cx).unwrap()
2221            })
2222            .await
2223            .unwrap();
2224        worktree_a.read_with(cx_a, |worktree, _| {
2225            assert_eq!(
2226                worktree
2227                    .paths()
2228                    .map(|p| p.to_string_lossy())
2229                    .collect::<Vec<_>>(),
2230                ["a.txt", "b.txt"]
2231            );
2232        });
2233        worktree_b.read_with(cx_b, |worktree, _| {
2234            assert_eq!(
2235                worktree
2236                    .paths()
2237                    .map(|p| p.to_string_lossy())
2238                    .collect::<Vec<_>>(),
2239                ["a.txt", "b.txt"]
2240            );
2241        });
2242    }
2243
2244    #[gpui::test(iterations = 10)]
2245    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2246        cx_a.foreground().forbid_parking();
2247        let lang_registry = Arc::new(LanguageRegistry::test());
2248        let fs = FakeFs::new(cx_a.background());
2249
2250        // Connect to a server as 2 clients.
2251        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2252        let client_a = server.create_client(cx_a, "user_a").await;
2253        let client_b = server.create_client(cx_b, "user_b").await;
2254        server
2255            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2256            .await;
2257
2258        // Share a project as client A
2259        fs.insert_tree(
2260            "/dir",
2261            json!({
2262                "a.txt": "a-contents",
2263            }),
2264        )
2265        .await;
2266
2267        let project_a = cx_a.update(|cx| {
2268            Project::local(
2269                client_a.clone(),
2270                client_a.user_store.clone(),
2271                lang_registry.clone(),
2272                fs.clone(),
2273                cx,
2274            )
2275        });
2276        let (worktree_a, _) = project_a
2277            .update(cx_a, |p, cx| {
2278                p.find_or_create_local_worktree("/dir", true, cx)
2279            })
2280            .await
2281            .unwrap();
2282        worktree_a
2283            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2284            .await;
2285        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2286        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2287        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2288
2289        // Join that project as client B
2290        let project_b = Project::remote(
2291            project_id,
2292            client_b.clone(),
2293            client_b.user_store.clone(),
2294            lang_registry.clone(),
2295            fs.clone(),
2296            &mut cx_b.to_async(),
2297        )
2298        .await
2299        .unwrap();
2300
2301        // Open a buffer as client B
2302        let buffer_b = project_b
2303            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2304            .await
2305            .unwrap();
2306
2307        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2308        buffer_b.read_with(cx_b, |buf, _| {
2309            assert!(buf.is_dirty());
2310            assert!(!buf.has_conflict());
2311        });
2312
2313        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2314        buffer_b
2315            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2316            .await;
2317        buffer_b.read_with(cx_b, |buf, _| {
2318            assert!(!buf.has_conflict());
2319        });
2320
2321        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2322        buffer_b.read_with(cx_b, |buf, _| {
2323            assert!(buf.is_dirty());
2324            assert!(!buf.has_conflict());
2325        });
2326    }
2327
2328    #[gpui::test(iterations = 10)]
2329    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2330        cx_a.foreground().forbid_parking();
2331        let lang_registry = Arc::new(LanguageRegistry::test());
2332        let fs = FakeFs::new(cx_a.background());
2333
2334        // Connect to a server as 2 clients.
2335        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2336        let client_a = server.create_client(cx_a, "user_a").await;
2337        let client_b = server.create_client(cx_b, "user_b").await;
2338        server
2339            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2340            .await;
2341
2342        // Share a project as client A
2343        fs.insert_tree(
2344            "/dir",
2345            json!({
2346                "a.txt": "a-contents",
2347            }),
2348        )
2349        .await;
2350
2351        let project_a = cx_a.update(|cx| {
2352            Project::local(
2353                client_a.clone(),
2354                client_a.user_store.clone(),
2355                lang_registry.clone(),
2356                fs.clone(),
2357                cx,
2358            )
2359        });
2360        let (worktree_a, _) = project_a
2361            .update(cx_a, |p, cx| {
2362                p.find_or_create_local_worktree("/dir", true, cx)
2363            })
2364            .await
2365            .unwrap();
2366        worktree_a
2367            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2368            .await;
2369        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2370        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2371        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2372
2373        // Join that project as client B
2374        let project_b = Project::remote(
2375            project_id,
2376            client_b.clone(),
2377            client_b.user_store.clone(),
2378            lang_registry.clone(),
2379            fs.clone(),
2380            &mut cx_b.to_async(),
2381        )
2382        .await
2383        .unwrap();
2384        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2385
2386        // Open a buffer as client B
2387        let buffer_b = project_b
2388            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2389            .await
2390            .unwrap();
2391        buffer_b.read_with(cx_b, |buf, _| {
2392            assert!(!buf.is_dirty());
2393            assert!(!buf.has_conflict());
2394        });
2395
2396        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2397            .await
2398            .unwrap();
2399        buffer_b
2400            .condition(&cx_b, |buf, _| {
2401                buf.text() == "new contents" && !buf.is_dirty()
2402            })
2403            .await;
2404        buffer_b.read_with(cx_b, |buf, _| {
2405            assert!(!buf.has_conflict());
2406        });
2407    }
2408
2409    #[gpui::test(iterations = 10)]
2410    async fn test_editing_while_guest_opens_buffer(
2411        cx_a: &mut TestAppContext,
2412        cx_b: &mut TestAppContext,
2413    ) {
2414        cx_a.foreground().forbid_parking();
2415        let lang_registry = Arc::new(LanguageRegistry::test());
2416        let fs = FakeFs::new(cx_a.background());
2417
2418        // Connect to a server as 2 clients.
2419        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2420        let client_a = server.create_client(cx_a, "user_a").await;
2421        let client_b = server.create_client(cx_b, "user_b").await;
2422        server
2423            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2424            .await;
2425
2426        // Share a project as client A
2427        fs.insert_tree(
2428            "/dir",
2429            json!({
2430                "a.txt": "a-contents",
2431            }),
2432        )
2433        .await;
2434        let project_a = cx_a.update(|cx| {
2435            Project::local(
2436                client_a.clone(),
2437                client_a.user_store.clone(),
2438                lang_registry.clone(),
2439                fs.clone(),
2440                cx,
2441            )
2442        });
2443        let (worktree_a, _) = project_a
2444            .update(cx_a, |p, cx| {
2445                p.find_or_create_local_worktree("/dir", true, cx)
2446            })
2447            .await
2448            .unwrap();
2449        worktree_a
2450            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2451            .await;
2452        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2453        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2454        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2455
2456        // Join that project as client B
2457        let project_b = Project::remote(
2458            project_id,
2459            client_b.clone(),
2460            client_b.user_store.clone(),
2461            lang_registry.clone(),
2462            fs.clone(),
2463            &mut cx_b.to_async(),
2464        )
2465        .await
2466        .unwrap();
2467
2468        // Open a buffer as client A
2469        let buffer_a = project_a
2470            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2471            .await
2472            .unwrap();
2473
2474        // Start opening the same buffer as client B
2475        let buffer_b = cx_b
2476            .background()
2477            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2478
2479        // Edit the buffer as client A while client B is still opening it.
2480        cx_b.background().simulate_random_delay().await;
2481        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2482        cx_b.background().simulate_random_delay().await;
2483        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2484
2485        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2486        let buffer_b = buffer_b.await.unwrap();
2487        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2488    }
2489
2490    #[gpui::test(iterations = 10)]
2491    async fn test_leaving_worktree_while_opening_buffer(
2492        cx_a: &mut TestAppContext,
2493        cx_b: &mut TestAppContext,
2494    ) {
2495        cx_a.foreground().forbid_parking();
2496        let lang_registry = Arc::new(LanguageRegistry::test());
2497        let fs = FakeFs::new(cx_a.background());
2498
2499        // Connect to a server as 2 clients.
2500        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2501        let client_a = server.create_client(cx_a, "user_a").await;
2502        let client_b = server.create_client(cx_b, "user_b").await;
2503        server
2504            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2505            .await;
2506
2507        // Share a project as client A
2508        fs.insert_tree(
2509            "/dir",
2510            json!({
2511                "a.txt": "a-contents",
2512            }),
2513        )
2514        .await;
2515        let project_a = cx_a.update(|cx| {
2516            Project::local(
2517                client_a.clone(),
2518                client_a.user_store.clone(),
2519                lang_registry.clone(),
2520                fs.clone(),
2521                cx,
2522            )
2523        });
2524        let (worktree_a, _) = project_a
2525            .update(cx_a, |p, cx| {
2526                p.find_or_create_local_worktree("/dir", true, cx)
2527            })
2528            .await
2529            .unwrap();
2530        worktree_a
2531            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2532            .await;
2533        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2534        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2535        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2536
2537        // Join that project as client B
2538        let project_b = Project::remote(
2539            project_id,
2540            client_b.clone(),
2541            client_b.user_store.clone(),
2542            lang_registry.clone(),
2543            fs.clone(),
2544            &mut cx_b.to_async(),
2545        )
2546        .await
2547        .unwrap();
2548
2549        // See that a guest has joined as client A.
2550        project_a
2551            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2552            .await;
2553
2554        // Begin opening a buffer as client B, but leave the project before the open completes.
2555        let buffer_b = cx_b
2556            .background()
2557            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2558        cx_b.update(|_| drop(project_b));
2559        drop(buffer_b);
2560
2561        // See that the guest has left.
2562        project_a
2563            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2564            .await;
2565    }
2566
2567    #[gpui::test(iterations = 10)]
2568    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2569        cx_a.foreground().forbid_parking();
2570        let lang_registry = Arc::new(LanguageRegistry::test());
2571        let fs = FakeFs::new(cx_a.background());
2572
2573        // Connect to a server as 2 clients.
2574        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2575        let client_a = server.create_client(cx_a, "user_a").await;
2576        let client_b = server.create_client(cx_b, "user_b").await;
2577        server
2578            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2579            .await;
2580
2581        // Share a project as client A
2582        fs.insert_tree(
2583            "/a",
2584            json!({
2585                "a.txt": "a-contents",
2586                "b.txt": "b-contents",
2587            }),
2588        )
2589        .await;
2590        let project_a = cx_a.update(|cx| {
2591            Project::local(
2592                client_a.clone(),
2593                client_a.user_store.clone(),
2594                lang_registry.clone(),
2595                fs.clone(),
2596                cx,
2597            )
2598        });
2599        let (worktree_a, _) = project_a
2600            .update(cx_a, |p, cx| {
2601                p.find_or_create_local_worktree("/a", true, cx)
2602            })
2603            .await
2604            .unwrap();
2605        worktree_a
2606            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2607            .await;
2608        let project_id = project_a
2609            .update(cx_a, |project, _| project.next_remote_id())
2610            .await;
2611        project_a
2612            .update(cx_a, |project, cx| project.share(cx))
2613            .await
2614            .unwrap();
2615
2616        // Join that project as client B
2617        let _project_b = Project::remote(
2618            project_id,
2619            client_b.clone(),
2620            client_b.user_store.clone(),
2621            lang_registry.clone(),
2622            fs.clone(),
2623            &mut cx_b.to_async(),
2624        )
2625        .await
2626        .unwrap();
2627
2628        // Client A sees that a guest has joined.
2629        project_a
2630            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2631            .await;
2632
2633        // Drop client B's connection and ensure client A observes client B leaving the project.
2634        client_b.disconnect(&cx_b.to_async()).unwrap();
2635        project_a
2636            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2637            .await;
2638
2639        // Rejoin the project as client B
2640        let _project_b = Project::remote(
2641            project_id,
2642            client_b.clone(),
2643            client_b.user_store.clone(),
2644            lang_registry.clone(),
2645            fs.clone(),
2646            &mut cx_b.to_async(),
2647        )
2648        .await
2649        .unwrap();
2650
2651        // Client A sees that a guest has re-joined.
2652        project_a
2653            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2654            .await;
2655
2656        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2657        client_b.wait_for_current_user(cx_b).await;
2658        server.disconnect_client(client_b.current_user_id(cx_b));
2659        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2660        project_a
2661            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2662            .await;
2663    }
2664
2665    #[gpui::test(iterations = 10)]
2666    async fn test_collaborating_with_diagnostics(
2667        cx_a: &mut TestAppContext,
2668        cx_b: &mut TestAppContext,
2669    ) {
2670        cx_a.foreground().forbid_parking();
2671        let lang_registry = Arc::new(LanguageRegistry::test());
2672        let fs = FakeFs::new(cx_a.background());
2673
2674        // Set up a fake language server.
2675        let mut language = Language::new(
2676            LanguageConfig {
2677                name: "Rust".into(),
2678                path_suffixes: vec!["rs".to_string()],
2679                ..Default::default()
2680            },
2681            Some(tree_sitter_rust::language()),
2682        );
2683        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2684        lang_registry.add(Arc::new(language));
2685
2686        // Connect to a server as 2 clients.
2687        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2688        let client_a = server.create_client(cx_a, "user_a").await;
2689        let client_b = server.create_client(cx_b, "user_b").await;
2690        server
2691            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2692            .await;
2693
2694        // Share a project as client A
2695        fs.insert_tree(
2696            "/a",
2697            json!({
2698                "a.rs": "let one = two",
2699                "other.rs": "",
2700            }),
2701        )
2702        .await;
2703        let project_a = cx_a.update(|cx| {
2704            Project::local(
2705                client_a.clone(),
2706                client_a.user_store.clone(),
2707                lang_registry.clone(),
2708                fs.clone(),
2709                cx,
2710            )
2711        });
2712        let (worktree_a, _) = project_a
2713            .update(cx_a, |p, cx| {
2714                p.find_or_create_local_worktree("/a", true, cx)
2715            })
2716            .await
2717            .unwrap();
2718        worktree_a
2719            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2720            .await;
2721        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2722        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2723        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2724
2725        // Cause the language server to start.
2726        let _ = cx_a
2727            .background()
2728            .spawn(project_a.update(cx_a, |project, cx| {
2729                project.open_buffer(
2730                    ProjectPath {
2731                        worktree_id,
2732                        path: Path::new("other.rs").into(),
2733                    },
2734                    cx,
2735                )
2736            }))
2737            .await
2738            .unwrap();
2739
2740        // Simulate a language server reporting errors for a file.
2741        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2742        fake_language_server
2743            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2744            .await;
2745        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2746            lsp::PublishDiagnosticsParams {
2747                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2748                version: None,
2749                diagnostics: vec![lsp::Diagnostic {
2750                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2751                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2752                    message: "message 1".to_string(),
2753                    ..Default::default()
2754                }],
2755            },
2756        );
2757
2758        // Wait for server to see the diagnostics update.
2759        server
2760            .condition(|store| {
2761                let worktree = store
2762                    .project(project_id)
2763                    .unwrap()
2764                    .share
2765                    .as_ref()
2766                    .unwrap()
2767                    .worktrees
2768                    .get(&worktree_id.to_proto())
2769                    .unwrap();
2770
2771                !worktree.diagnostic_summaries.is_empty()
2772            })
2773            .await;
2774
2775        // Join the worktree as client B.
2776        let project_b = Project::remote(
2777            project_id,
2778            client_b.clone(),
2779            client_b.user_store.clone(),
2780            lang_registry.clone(),
2781            fs.clone(),
2782            &mut cx_b.to_async(),
2783        )
2784        .await
2785        .unwrap();
2786
2787        project_b.read_with(cx_b, |project, cx| {
2788            assert_eq!(
2789                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2790                &[(
2791                    ProjectPath {
2792                        worktree_id,
2793                        path: Arc::from(Path::new("a.rs")),
2794                    },
2795                    DiagnosticSummary {
2796                        error_count: 1,
2797                        warning_count: 0,
2798                        ..Default::default()
2799                    },
2800                )]
2801            )
2802        });
2803
2804        // Simulate a language server reporting more errors for a file.
2805        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2806            lsp::PublishDiagnosticsParams {
2807                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2808                version: None,
2809                diagnostics: vec![
2810                    lsp::Diagnostic {
2811                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2812                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2813                        message: "message 1".to_string(),
2814                        ..Default::default()
2815                    },
2816                    lsp::Diagnostic {
2817                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2818                        range: lsp::Range::new(
2819                            lsp::Position::new(0, 10),
2820                            lsp::Position::new(0, 13),
2821                        ),
2822                        message: "message 2".to_string(),
2823                        ..Default::default()
2824                    },
2825                ],
2826            },
2827        );
2828
2829        // Client b gets the updated summaries
2830        project_b
2831            .condition(&cx_b, |project, cx| {
2832                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2833                    == &[(
2834                        ProjectPath {
2835                            worktree_id,
2836                            path: Arc::from(Path::new("a.rs")),
2837                        },
2838                        DiagnosticSummary {
2839                            error_count: 1,
2840                            warning_count: 1,
2841                            ..Default::default()
2842                        },
2843                    )]
2844            })
2845            .await;
2846
2847        // Open the file with the errors on client B. They should be present.
2848        let buffer_b = cx_b
2849            .background()
2850            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2851            .await
2852            .unwrap();
2853
2854        buffer_b.read_with(cx_b, |buffer, _| {
2855            assert_eq!(
2856                buffer
2857                    .snapshot()
2858                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2859                    .map(|entry| entry)
2860                    .collect::<Vec<_>>(),
2861                &[
2862                    DiagnosticEntry {
2863                        range: Point::new(0, 4)..Point::new(0, 7),
2864                        diagnostic: Diagnostic {
2865                            group_id: 0,
2866                            message: "message 1".to_string(),
2867                            severity: lsp::DiagnosticSeverity::ERROR,
2868                            is_primary: true,
2869                            ..Default::default()
2870                        }
2871                    },
2872                    DiagnosticEntry {
2873                        range: Point::new(0, 10)..Point::new(0, 13),
2874                        diagnostic: Diagnostic {
2875                            group_id: 1,
2876                            severity: lsp::DiagnosticSeverity::WARNING,
2877                            message: "message 2".to_string(),
2878                            is_primary: true,
2879                            ..Default::default()
2880                        }
2881                    }
2882                ]
2883            );
2884        });
2885
2886        // Simulate a language server reporting no errors for a file.
2887        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2888            lsp::PublishDiagnosticsParams {
2889                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2890                version: None,
2891                diagnostics: vec![],
2892            },
2893        );
2894        project_a
2895            .condition(cx_a, |project, cx| {
2896                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2897            })
2898            .await;
2899        project_b
2900            .condition(cx_b, |project, cx| {
2901                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2902            })
2903            .await;
2904    }
2905
2906    #[gpui::test(iterations = 10)]
2907    async fn test_collaborating_with_completion(
2908        cx_a: &mut TestAppContext,
2909        cx_b: &mut TestAppContext,
2910    ) {
2911        cx_a.foreground().forbid_parking();
2912        let lang_registry = Arc::new(LanguageRegistry::test());
2913        let fs = FakeFs::new(cx_a.background());
2914
2915        // Set up a fake language server.
2916        let mut language = Language::new(
2917            LanguageConfig {
2918                name: "Rust".into(),
2919                path_suffixes: vec!["rs".to_string()],
2920                ..Default::default()
2921            },
2922            Some(tree_sitter_rust::language()),
2923        );
2924        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2925            capabilities: lsp::ServerCapabilities {
2926                completion_provider: Some(lsp::CompletionOptions {
2927                    trigger_characters: Some(vec![".".to_string()]),
2928                    ..Default::default()
2929                }),
2930                ..Default::default()
2931            },
2932            ..Default::default()
2933        });
2934        lang_registry.add(Arc::new(language));
2935
2936        // Connect to a server as 2 clients.
2937        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2938        let client_a = server.create_client(cx_a, "user_a").await;
2939        let client_b = server.create_client(cx_b, "user_b").await;
2940        server
2941            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2942            .await;
2943
2944        // Share a project as client A
2945        fs.insert_tree(
2946            "/a",
2947            json!({
2948                "main.rs": "fn main() { a }",
2949                "other.rs": "",
2950            }),
2951        )
2952        .await;
2953        let project_a = cx_a.update(|cx| {
2954            Project::local(
2955                client_a.clone(),
2956                client_a.user_store.clone(),
2957                lang_registry.clone(),
2958                fs.clone(),
2959                cx,
2960            )
2961        });
2962        let (worktree_a, _) = project_a
2963            .update(cx_a, |p, cx| {
2964                p.find_or_create_local_worktree("/a", true, cx)
2965            })
2966            .await
2967            .unwrap();
2968        worktree_a
2969            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2970            .await;
2971        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2972        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2973        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2974
2975        // Join the worktree as client B.
2976        let project_b = Project::remote(
2977            project_id,
2978            client_b.clone(),
2979            client_b.user_store.clone(),
2980            lang_registry.clone(),
2981            fs.clone(),
2982            &mut cx_b.to_async(),
2983        )
2984        .await
2985        .unwrap();
2986
2987        // Open a file in an editor as the guest.
2988        let buffer_b = project_b
2989            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2990            .await
2991            .unwrap();
2992        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2993        let editor_b = cx_b.add_view(window_b, |cx| {
2994            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2995        });
2996
2997        let fake_language_server = fake_language_servers.next().await.unwrap();
2998        buffer_b
2999            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3000            .await;
3001
3002        // Type a completion trigger character as the guest.
3003        editor_b.update(cx_b, |editor, cx| {
3004            editor.select_ranges([13..13], None, cx);
3005            editor.handle_input(&Input(".".into()), cx);
3006            cx.focus(&editor_b);
3007        });
3008
3009        // Receive a completion request as the host's language server.
3010        // Return some completions from the host's language server.
3011        cx_a.foreground().start_waiting();
3012        fake_language_server
3013            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3014                assert_eq!(
3015                    params.text_document_position.text_document.uri,
3016                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3017                );
3018                assert_eq!(
3019                    params.text_document_position.position,
3020                    lsp::Position::new(0, 14),
3021                );
3022
3023                Ok(Some(lsp::CompletionResponse::Array(vec![
3024                    lsp::CompletionItem {
3025                        label: "first_method(…)".into(),
3026                        detail: Some("fn(&mut self, B) -> C".into()),
3027                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3028                            new_text: "first_method($1)".to_string(),
3029                            range: lsp::Range::new(
3030                                lsp::Position::new(0, 14),
3031                                lsp::Position::new(0, 14),
3032                            ),
3033                        })),
3034                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3035                        ..Default::default()
3036                    },
3037                    lsp::CompletionItem {
3038                        label: "second_method(…)".into(),
3039                        detail: Some("fn(&mut self, C) -> D<E>".into()),
3040                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3041                            new_text: "second_method()".to_string(),
3042                            range: lsp::Range::new(
3043                                lsp::Position::new(0, 14),
3044                                lsp::Position::new(0, 14),
3045                            ),
3046                        })),
3047                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3048                        ..Default::default()
3049                    },
3050                ])))
3051            })
3052            .next()
3053            .await
3054            .unwrap();
3055        cx_a.foreground().finish_waiting();
3056
3057        // Open the buffer on the host.
3058        let buffer_a = project_a
3059            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3060            .await
3061            .unwrap();
3062        buffer_a
3063            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3064            .await;
3065
3066        // Confirm a completion on the guest.
3067        editor_b
3068            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3069            .await;
3070        editor_b.update(cx_b, |editor, cx| {
3071            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3072            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3073        });
3074
3075        // Return a resolved completion from the host's language server.
3076        // The resolved completion has an additional text edit.
3077        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3078            |params, _| async move {
3079                assert_eq!(params.label, "first_method(…)");
3080                Ok(lsp::CompletionItem {
3081                    label: "first_method(…)".into(),
3082                    detail: Some("fn(&mut self, B) -> C".into()),
3083                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3084                        new_text: "first_method($1)".to_string(),
3085                        range: lsp::Range::new(
3086                            lsp::Position::new(0, 14),
3087                            lsp::Position::new(0, 14),
3088                        ),
3089                    })),
3090                    additional_text_edits: Some(vec![lsp::TextEdit {
3091                        new_text: "use d::SomeTrait;\n".to_string(),
3092                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3093                    }]),
3094                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3095                    ..Default::default()
3096                })
3097            },
3098        );
3099
3100        // The additional edit is applied.
3101        buffer_a
3102            .condition(&cx_a, |buffer, _| {
3103                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3104            })
3105            .await;
3106        buffer_b
3107            .condition(&cx_b, |buffer, _| {
3108                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3109            })
3110            .await;
3111    }
3112
3113    #[gpui::test(iterations = 10)]
3114    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3115        cx_a.foreground().forbid_parking();
3116        let lang_registry = Arc::new(LanguageRegistry::test());
3117        let fs = FakeFs::new(cx_a.background());
3118
3119        // Connect to a server as 2 clients.
3120        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3121        let client_a = server.create_client(cx_a, "user_a").await;
3122        let client_b = server.create_client(cx_b, "user_b").await;
3123        server
3124            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3125            .await;
3126
3127        // Share a project as client A
3128        fs.insert_tree(
3129            "/a",
3130            json!({
3131                "a.rs": "let one = 1;",
3132            }),
3133        )
3134        .await;
3135        let project_a = cx_a.update(|cx| {
3136            Project::local(
3137                client_a.clone(),
3138                client_a.user_store.clone(),
3139                lang_registry.clone(),
3140                fs.clone(),
3141                cx,
3142            )
3143        });
3144        let (worktree_a, _) = project_a
3145            .update(cx_a, |p, cx| {
3146                p.find_or_create_local_worktree("/a", true, cx)
3147            })
3148            .await
3149            .unwrap();
3150        worktree_a
3151            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3152            .await;
3153        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3154        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3155        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3156        let buffer_a = project_a
3157            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3158            .await
3159            .unwrap();
3160
3161        // Join the worktree as client B.
3162        let project_b = Project::remote(
3163            project_id,
3164            client_b.clone(),
3165            client_b.user_store.clone(),
3166            lang_registry.clone(),
3167            fs.clone(),
3168            &mut cx_b.to_async(),
3169        )
3170        .await
3171        .unwrap();
3172
3173        let buffer_b = cx_b
3174            .background()
3175            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3176            .await
3177            .unwrap();
3178        buffer_b.update(cx_b, |buffer, cx| {
3179            buffer.edit([(4..7, "six")], cx);
3180            buffer.edit([(10..11, "6")], cx);
3181            assert_eq!(buffer.text(), "let six = 6;");
3182            assert!(buffer.is_dirty());
3183            assert!(!buffer.has_conflict());
3184        });
3185        buffer_a
3186            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3187            .await;
3188
3189        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3190            .await
3191            .unwrap();
3192        buffer_a
3193            .condition(cx_a, |buffer, _| buffer.has_conflict())
3194            .await;
3195        buffer_b
3196            .condition(cx_b, |buffer, _| buffer.has_conflict())
3197            .await;
3198
3199        project_b
3200            .update(cx_b, |project, cx| {
3201                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3202            })
3203            .await
3204            .unwrap();
3205        buffer_a.read_with(cx_a, |buffer, _| {
3206            assert_eq!(buffer.text(), "let seven = 7;");
3207            assert!(!buffer.is_dirty());
3208            assert!(!buffer.has_conflict());
3209        });
3210        buffer_b.read_with(cx_b, |buffer, _| {
3211            assert_eq!(buffer.text(), "let seven = 7;");
3212            assert!(!buffer.is_dirty());
3213            assert!(!buffer.has_conflict());
3214        });
3215
3216        buffer_a.update(cx_a, |buffer, cx| {
3217            // Undoing on the host is a no-op when the reload was initiated by the guest.
3218            buffer.undo(cx);
3219            assert_eq!(buffer.text(), "let seven = 7;");
3220            assert!(!buffer.is_dirty());
3221            assert!(!buffer.has_conflict());
3222        });
3223        buffer_b.update(cx_b, |buffer, cx| {
3224            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3225            buffer.undo(cx);
3226            assert_eq!(buffer.text(), "let six = 6;");
3227            assert!(buffer.is_dirty());
3228            assert!(!buffer.has_conflict());
3229        });
3230    }
3231
3232    #[gpui::test(iterations = 10)]
3233    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3234        cx_a.foreground().forbid_parking();
3235        let lang_registry = Arc::new(LanguageRegistry::test());
3236        let fs = FakeFs::new(cx_a.background());
3237
3238        // Set up a fake language server.
3239        let mut language = Language::new(
3240            LanguageConfig {
3241                name: "Rust".into(),
3242                path_suffixes: vec!["rs".to_string()],
3243                ..Default::default()
3244            },
3245            Some(tree_sitter_rust::language()),
3246        );
3247        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3248        lang_registry.add(Arc::new(language));
3249
3250        // Connect to a server as 2 clients.
3251        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3252        let client_a = server.create_client(cx_a, "user_a").await;
3253        let client_b = server.create_client(cx_b, "user_b").await;
3254        server
3255            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3256            .await;
3257
3258        // Share a project as client A
3259        fs.insert_tree(
3260            "/a",
3261            json!({
3262                "a.rs": "let one = two",
3263            }),
3264        )
3265        .await;
3266        let project_a = cx_a.update(|cx| {
3267            Project::local(
3268                client_a.clone(),
3269                client_a.user_store.clone(),
3270                lang_registry.clone(),
3271                fs.clone(),
3272                cx,
3273            )
3274        });
3275        let (worktree_a, _) = project_a
3276            .update(cx_a, |p, cx| {
3277                p.find_or_create_local_worktree("/a", true, cx)
3278            })
3279            .await
3280            .unwrap();
3281        worktree_a
3282            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3283            .await;
3284        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3285        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3286        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3287
3288        // Join the worktree as client B.
3289        let project_b = Project::remote(
3290            project_id,
3291            client_b.clone(),
3292            client_b.user_store.clone(),
3293            lang_registry.clone(),
3294            fs.clone(),
3295            &mut cx_b.to_async(),
3296        )
3297        .await
3298        .unwrap();
3299
3300        let buffer_b = cx_b
3301            .background()
3302            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3303            .await
3304            .unwrap();
3305
3306        let fake_language_server = fake_language_servers.next().await.unwrap();
3307        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3308            Ok(Some(vec![
3309                lsp::TextEdit {
3310                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3311                    new_text: "h".to_string(),
3312                },
3313                lsp::TextEdit {
3314                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3315                    new_text: "y".to_string(),
3316                },
3317            ]))
3318        });
3319
3320        project_b
3321            .update(cx_b, |project, cx| {
3322                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3323            })
3324            .await
3325            .unwrap();
3326        assert_eq!(
3327            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3328            "let honey = two"
3329        );
3330    }
3331
3332    #[gpui::test(iterations = 10)]
3333    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3334        cx_a.foreground().forbid_parking();
3335        let lang_registry = Arc::new(LanguageRegistry::test());
3336        let fs = FakeFs::new(cx_a.background());
3337        fs.insert_tree(
3338            "/root-1",
3339            json!({
3340                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3341            }),
3342        )
3343        .await;
3344        fs.insert_tree(
3345            "/root-2",
3346            json!({
3347                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3348            }),
3349        )
3350        .await;
3351
3352        // Set up a fake language server.
3353        let mut language = Language::new(
3354            LanguageConfig {
3355                name: "Rust".into(),
3356                path_suffixes: vec!["rs".to_string()],
3357                ..Default::default()
3358            },
3359            Some(tree_sitter_rust::language()),
3360        );
3361        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3362        lang_registry.add(Arc::new(language));
3363
3364        // Connect to a server as 2 clients.
3365        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3366        let client_a = server.create_client(cx_a, "user_a").await;
3367        let client_b = server.create_client(cx_b, "user_b").await;
3368        server
3369            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3370            .await;
3371
3372        // Share a project as client A
3373        let project_a = cx_a.update(|cx| {
3374            Project::local(
3375                client_a.clone(),
3376                client_a.user_store.clone(),
3377                lang_registry.clone(),
3378                fs.clone(),
3379                cx,
3380            )
3381        });
3382        let (worktree_a, _) = project_a
3383            .update(cx_a, |p, cx| {
3384                p.find_or_create_local_worktree("/root-1", true, cx)
3385            })
3386            .await
3387            .unwrap();
3388        worktree_a
3389            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3390            .await;
3391        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3392        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3393        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3394
3395        // Join the worktree as client B.
3396        let project_b = Project::remote(
3397            project_id,
3398            client_b.clone(),
3399            client_b.user_store.clone(),
3400            lang_registry.clone(),
3401            fs.clone(),
3402            &mut cx_b.to_async(),
3403        )
3404        .await
3405        .unwrap();
3406
3407        // Open the file on client B.
3408        let buffer_b = cx_b
3409            .background()
3410            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3411            .await
3412            .unwrap();
3413
3414        // Request the definition of a symbol as the guest.
3415        let fake_language_server = fake_language_servers.next().await.unwrap();
3416        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3417            |_, _| async move {
3418                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3419                    lsp::Location::new(
3420                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3421                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3422                    ),
3423                )))
3424            },
3425        );
3426
3427        let definitions_1 = project_b
3428            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3429            .await
3430            .unwrap();
3431        cx_b.read(|cx| {
3432            assert_eq!(definitions_1.len(), 1);
3433            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3434            let target_buffer = definitions_1[0].buffer.read(cx);
3435            assert_eq!(
3436                target_buffer.text(),
3437                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3438            );
3439            assert_eq!(
3440                definitions_1[0].range.to_point(target_buffer),
3441                Point::new(0, 6)..Point::new(0, 9)
3442            );
3443        });
3444
3445        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3446        // the previous call to `definition`.
3447        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3448            |_, _| async move {
3449                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3450                    lsp::Location::new(
3451                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3452                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3453                    ),
3454                )))
3455            },
3456        );
3457
3458        let definitions_2 = project_b
3459            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3460            .await
3461            .unwrap();
3462        cx_b.read(|cx| {
3463            assert_eq!(definitions_2.len(), 1);
3464            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3465            let target_buffer = definitions_2[0].buffer.read(cx);
3466            assert_eq!(
3467                target_buffer.text(),
3468                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3469            );
3470            assert_eq!(
3471                definitions_2[0].range.to_point(target_buffer),
3472                Point::new(1, 6)..Point::new(1, 11)
3473            );
3474        });
3475        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3476    }
3477
3478    #[gpui::test(iterations = 10)]
3479    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3480        cx_a.foreground().forbid_parking();
3481        let lang_registry = Arc::new(LanguageRegistry::test());
3482        let fs = FakeFs::new(cx_a.background());
3483        fs.insert_tree(
3484            "/root-1",
3485            json!({
3486                "one.rs": "const ONE: usize = 1;",
3487                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3488            }),
3489        )
3490        .await;
3491        fs.insert_tree(
3492            "/root-2",
3493            json!({
3494                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3495            }),
3496        )
3497        .await;
3498
3499        // Set up a fake language server.
3500        let mut language = Language::new(
3501            LanguageConfig {
3502                name: "Rust".into(),
3503                path_suffixes: vec!["rs".to_string()],
3504                ..Default::default()
3505            },
3506            Some(tree_sitter_rust::language()),
3507        );
3508        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3509        lang_registry.add(Arc::new(language));
3510
3511        // Connect to a server as 2 clients.
3512        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3513        let client_a = server.create_client(cx_a, "user_a").await;
3514        let client_b = server.create_client(cx_b, "user_b").await;
3515        server
3516            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3517            .await;
3518
3519        // Share a project as client A
3520        let project_a = cx_a.update(|cx| {
3521            Project::local(
3522                client_a.clone(),
3523                client_a.user_store.clone(),
3524                lang_registry.clone(),
3525                fs.clone(),
3526                cx,
3527            )
3528        });
3529        let (worktree_a, _) = project_a
3530            .update(cx_a, |p, cx| {
3531                p.find_or_create_local_worktree("/root-1", true, cx)
3532            })
3533            .await
3534            .unwrap();
3535        worktree_a
3536            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3537            .await;
3538        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3539        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3540        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3541
3542        // Join the worktree as client B.
3543        let project_b = Project::remote(
3544            project_id,
3545            client_b.clone(),
3546            client_b.user_store.clone(),
3547            lang_registry.clone(),
3548            fs.clone(),
3549            &mut cx_b.to_async(),
3550        )
3551        .await
3552        .unwrap();
3553
3554        // Open the file on client B.
3555        let buffer_b = cx_b
3556            .background()
3557            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3558            .await
3559            .unwrap();
3560
3561        // Request references to a symbol as the guest.
3562        let fake_language_server = fake_language_servers.next().await.unwrap();
3563        fake_language_server.handle_request::<lsp::request::References, _, _>(
3564            |params, _| async move {
3565                assert_eq!(
3566                    params.text_document_position.text_document.uri.as_str(),
3567                    "file:///root-1/one.rs"
3568                );
3569                Ok(Some(vec![
3570                    lsp::Location {
3571                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3572                        range: lsp::Range::new(
3573                            lsp::Position::new(0, 24),
3574                            lsp::Position::new(0, 27),
3575                        ),
3576                    },
3577                    lsp::Location {
3578                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3579                        range: lsp::Range::new(
3580                            lsp::Position::new(0, 35),
3581                            lsp::Position::new(0, 38),
3582                        ),
3583                    },
3584                    lsp::Location {
3585                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3586                        range: lsp::Range::new(
3587                            lsp::Position::new(0, 37),
3588                            lsp::Position::new(0, 40),
3589                        ),
3590                    },
3591                ]))
3592            },
3593        );
3594
3595        let references = project_b
3596            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3597            .await
3598            .unwrap();
3599        cx_b.read(|cx| {
3600            assert_eq!(references.len(), 3);
3601            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3602
3603            let two_buffer = references[0].buffer.read(cx);
3604            let three_buffer = references[2].buffer.read(cx);
3605            assert_eq!(
3606                two_buffer.file().unwrap().path().as_ref(),
3607                Path::new("two.rs")
3608            );
3609            assert_eq!(references[1].buffer, references[0].buffer);
3610            assert_eq!(
3611                three_buffer.file().unwrap().full_path(cx),
3612                Path::new("three.rs")
3613            );
3614
3615            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3616            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3617            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3618        });
3619    }
3620
3621    #[gpui::test(iterations = 10)]
3622    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3623        cx_a.foreground().forbid_parking();
3624        let lang_registry = Arc::new(LanguageRegistry::test());
3625        let fs = FakeFs::new(cx_a.background());
3626        fs.insert_tree(
3627            "/root-1",
3628            json!({
3629                "a": "hello world",
3630                "b": "goodnight moon",
3631                "c": "a world of goo",
3632                "d": "world champion of clown world",
3633            }),
3634        )
3635        .await;
3636        fs.insert_tree(
3637            "/root-2",
3638            json!({
3639                "e": "disney world is fun",
3640            }),
3641        )
3642        .await;
3643
3644        // Connect to a server as 2 clients.
3645        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3646        let client_a = server.create_client(cx_a, "user_a").await;
3647        let client_b = server.create_client(cx_b, "user_b").await;
3648        server
3649            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3650            .await;
3651
3652        // Share a project as client A
3653        let project_a = cx_a.update(|cx| {
3654            Project::local(
3655                client_a.clone(),
3656                client_a.user_store.clone(),
3657                lang_registry.clone(),
3658                fs.clone(),
3659                cx,
3660            )
3661        });
3662        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3663
3664        let (worktree_1, _) = project_a
3665            .update(cx_a, |p, cx| {
3666                p.find_or_create_local_worktree("/root-1", true, cx)
3667            })
3668            .await
3669            .unwrap();
3670        worktree_1
3671            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3672            .await;
3673        let (worktree_2, _) = project_a
3674            .update(cx_a, |p, cx| {
3675                p.find_or_create_local_worktree("/root-2", true, cx)
3676            })
3677            .await
3678            .unwrap();
3679        worktree_2
3680            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3681            .await;
3682
3683        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3684
3685        // Join the worktree as client B.
3686        let project_b = Project::remote(
3687            project_id,
3688            client_b.clone(),
3689            client_b.user_store.clone(),
3690            lang_registry.clone(),
3691            fs.clone(),
3692            &mut cx_b.to_async(),
3693        )
3694        .await
3695        .unwrap();
3696
3697        let results = project_b
3698            .update(cx_b, |project, cx| {
3699                project.search(SearchQuery::text("world", false, false), cx)
3700            })
3701            .await
3702            .unwrap();
3703
3704        let mut ranges_by_path = results
3705            .into_iter()
3706            .map(|(buffer, ranges)| {
3707                buffer.read_with(cx_b, |buffer, cx| {
3708                    let path = buffer.file().unwrap().full_path(cx);
3709                    let offset_ranges = ranges
3710                        .into_iter()
3711                        .map(|range| range.to_offset(buffer))
3712                        .collect::<Vec<_>>();
3713                    (path, offset_ranges)
3714                })
3715            })
3716            .collect::<Vec<_>>();
3717        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3718
3719        assert_eq!(
3720            ranges_by_path,
3721            &[
3722                (PathBuf::from("root-1/a"), vec![6..11]),
3723                (PathBuf::from("root-1/c"), vec![2..7]),
3724                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3725                (PathBuf::from("root-2/e"), vec![7..12]),
3726            ]
3727        );
3728    }
3729
3730    #[gpui::test(iterations = 10)]
3731    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3732        cx_a.foreground().forbid_parking();
3733        let lang_registry = Arc::new(LanguageRegistry::test());
3734        let fs = FakeFs::new(cx_a.background());
3735        fs.insert_tree(
3736            "/root-1",
3737            json!({
3738                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3739            }),
3740        )
3741        .await;
3742
3743        // Set up a fake language server.
3744        let mut language = Language::new(
3745            LanguageConfig {
3746                name: "Rust".into(),
3747                path_suffixes: vec!["rs".to_string()],
3748                ..Default::default()
3749            },
3750            Some(tree_sitter_rust::language()),
3751        );
3752        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3753        lang_registry.add(Arc::new(language));
3754
3755        // Connect to a server as 2 clients.
3756        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3757        let client_a = server.create_client(cx_a, "user_a").await;
3758        let client_b = server.create_client(cx_b, "user_b").await;
3759        server
3760            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3761            .await;
3762
3763        // Share a project as client A
3764        let project_a = cx_a.update(|cx| {
3765            Project::local(
3766                client_a.clone(),
3767                client_a.user_store.clone(),
3768                lang_registry.clone(),
3769                fs.clone(),
3770                cx,
3771            )
3772        });
3773        let (worktree_a, _) = project_a
3774            .update(cx_a, |p, cx| {
3775                p.find_or_create_local_worktree("/root-1", true, cx)
3776            })
3777            .await
3778            .unwrap();
3779        worktree_a
3780            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3781            .await;
3782        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3783        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3784        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3785
3786        // Join the worktree as client B.
3787        let project_b = Project::remote(
3788            project_id,
3789            client_b.clone(),
3790            client_b.user_store.clone(),
3791            lang_registry.clone(),
3792            fs.clone(),
3793            &mut cx_b.to_async(),
3794        )
3795        .await
3796        .unwrap();
3797
3798        // Open the file on client B.
3799        let buffer_b = cx_b
3800            .background()
3801            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3802            .await
3803            .unwrap();
3804
3805        // Request document highlights as the guest.
3806        let fake_language_server = fake_language_servers.next().await.unwrap();
3807        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3808            |params, _| async move {
3809                assert_eq!(
3810                    params
3811                        .text_document_position_params
3812                        .text_document
3813                        .uri
3814                        .as_str(),
3815                    "file:///root-1/main.rs"
3816                );
3817                assert_eq!(
3818                    params.text_document_position_params.position,
3819                    lsp::Position::new(0, 34)
3820                );
3821                Ok(Some(vec![
3822                    lsp::DocumentHighlight {
3823                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3824                        range: lsp::Range::new(
3825                            lsp::Position::new(0, 10),
3826                            lsp::Position::new(0, 16),
3827                        ),
3828                    },
3829                    lsp::DocumentHighlight {
3830                        kind: Some(lsp::DocumentHighlightKind::READ),
3831                        range: lsp::Range::new(
3832                            lsp::Position::new(0, 32),
3833                            lsp::Position::new(0, 38),
3834                        ),
3835                    },
3836                    lsp::DocumentHighlight {
3837                        kind: Some(lsp::DocumentHighlightKind::READ),
3838                        range: lsp::Range::new(
3839                            lsp::Position::new(0, 41),
3840                            lsp::Position::new(0, 47),
3841                        ),
3842                    },
3843                ]))
3844            },
3845        );
3846
3847        let highlights = project_b
3848            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3849            .await
3850            .unwrap();
3851        buffer_b.read_with(cx_b, |buffer, _| {
3852            let snapshot = buffer.snapshot();
3853
3854            let highlights = highlights
3855                .into_iter()
3856                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3857                .collect::<Vec<_>>();
3858            assert_eq!(
3859                highlights,
3860                &[
3861                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3862                    (lsp::DocumentHighlightKind::READ, 32..38),
3863                    (lsp::DocumentHighlightKind::READ, 41..47)
3864                ]
3865            )
3866        });
3867    }
3868
3869    #[gpui::test(iterations = 10)]
3870    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3871        cx_a.foreground().forbid_parking();
3872        let lang_registry = Arc::new(LanguageRegistry::test());
3873        let fs = FakeFs::new(cx_a.background());
3874        fs.insert_tree(
3875            "/code",
3876            json!({
3877                "crate-1": {
3878                    "one.rs": "const ONE: usize = 1;",
3879                },
3880                "crate-2": {
3881                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3882                },
3883                "private": {
3884                    "passwords.txt": "the-password",
3885                }
3886            }),
3887        )
3888        .await;
3889
3890        // Set up a fake language server.
3891        let mut language = Language::new(
3892            LanguageConfig {
3893                name: "Rust".into(),
3894                path_suffixes: vec!["rs".to_string()],
3895                ..Default::default()
3896            },
3897            Some(tree_sitter_rust::language()),
3898        );
3899        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3900        lang_registry.add(Arc::new(language));
3901
3902        // Connect to a server as 2 clients.
3903        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3904        let client_a = server.create_client(cx_a, "user_a").await;
3905        let client_b = server.create_client(cx_b, "user_b").await;
3906        server
3907            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3908            .await;
3909
3910        // Share a project as client A
3911        let project_a = cx_a.update(|cx| {
3912            Project::local(
3913                client_a.clone(),
3914                client_a.user_store.clone(),
3915                lang_registry.clone(),
3916                fs.clone(),
3917                cx,
3918            )
3919        });
3920        let (worktree_a, _) = project_a
3921            .update(cx_a, |p, cx| {
3922                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3923            })
3924            .await
3925            .unwrap();
3926        worktree_a
3927            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3928            .await;
3929        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3930        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3931        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3932
3933        // Join the worktree as client B.
3934        let project_b = Project::remote(
3935            project_id,
3936            client_b.clone(),
3937            client_b.user_store.clone(),
3938            lang_registry.clone(),
3939            fs.clone(),
3940            &mut cx_b.to_async(),
3941        )
3942        .await
3943        .unwrap();
3944
3945        // Cause the language server to start.
3946        let _buffer = cx_b
3947            .background()
3948            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3949            .await
3950            .unwrap();
3951
3952        let fake_language_server = fake_language_servers.next().await.unwrap();
3953        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3954            |_, _| async move {
3955                #[allow(deprecated)]
3956                Ok(Some(vec![lsp::SymbolInformation {
3957                    name: "TWO".into(),
3958                    location: lsp::Location {
3959                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3960                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3961                    },
3962                    kind: lsp::SymbolKind::CONSTANT,
3963                    tags: None,
3964                    container_name: None,
3965                    deprecated: None,
3966                }]))
3967            },
3968        );
3969
3970        // Request the definition of a symbol as the guest.
3971        let symbols = project_b
3972            .update(cx_b, |p, cx| p.symbols("two", cx))
3973            .await
3974            .unwrap();
3975        assert_eq!(symbols.len(), 1);
3976        assert_eq!(symbols[0].name, "TWO");
3977
3978        // Open one of the returned symbols.
3979        let buffer_b_2 = project_b
3980            .update(cx_b, |project, cx| {
3981                project.open_buffer_for_symbol(&symbols[0], cx)
3982            })
3983            .await
3984            .unwrap();
3985        buffer_b_2.read_with(cx_b, |buffer, _| {
3986            assert_eq!(
3987                buffer.file().unwrap().path().as_ref(),
3988                Path::new("../crate-2/two.rs")
3989            );
3990        });
3991
3992        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3993        let mut fake_symbol = symbols[0].clone();
3994        fake_symbol.path = Path::new("/code/secrets").into();
3995        let error = project_b
3996            .update(cx_b, |project, cx| {
3997                project.open_buffer_for_symbol(&fake_symbol, cx)
3998            })
3999            .await
4000            .unwrap_err();
4001        assert!(error.to_string().contains("invalid symbol signature"));
4002    }
4003
4004    #[gpui::test(iterations = 10)]
4005    async fn test_open_buffer_while_getting_definition_pointing_to_it(
4006        cx_a: &mut TestAppContext,
4007        cx_b: &mut TestAppContext,
4008        mut rng: StdRng,
4009    ) {
4010        cx_a.foreground().forbid_parking();
4011        let lang_registry = Arc::new(LanguageRegistry::test());
4012        let fs = FakeFs::new(cx_a.background());
4013        fs.insert_tree(
4014            "/root",
4015            json!({
4016                "a.rs": "const ONE: usize = b::TWO;",
4017                "b.rs": "const TWO: usize = 2",
4018            }),
4019        )
4020        .await;
4021
4022        // Set up a fake language server.
4023        let mut language = Language::new(
4024            LanguageConfig {
4025                name: "Rust".into(),
4026                path_suffixes: vec!["rs".to_string()],
4027                ..Default::default()
4028            },
4029            Some(tree_sitter_rust::language()),
4030        );
4031        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4032        lang_registry.add(Arc::new(language));
4033
4034        // Connect to a server as 2 clients.
4035        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4036        let client_a = server.create_client(cx_a, "user_a").await;
4037        let client_b = server.create_client(cx_b, "user_b").await;
4038        server
4039            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4040            .await;
4041
4042        // Share a project as client A
4043        let project_a = cx_a.update(|cx| {
4044            Project::local(
4045                client_a.clone(),
4046                client_a.user_store.clone(),
4047                lang_registry.clone(),
4048                fs.clone(),
4049                cx,
4050            )
4051        });
4052
4053        let (worktree_a, _) = project_a
4054            .update(cx_a, |p, cx| {
4055                p.find_or_create_local_worktree("/root", true, cx)
4056            })
4057            .await
4058            .unwrap();
4059        worktree_a
4060            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4061            .await;
4062        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4063        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4064        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4065
4066        // Join the worktree as client B.
4067        let project_b = Project::remote(
4068            project_id,
4069            client_b.clone(),
4070            client_b.user_store.clone(),
4071            lang_registry.clone(),
4072            fs.clone(),
4073            &mut cx_b.to_async(),
4074        )
4075        .await
4076        .unwrap();
4077
4078        let buffer_b1 = cx_b
4079            .background()
4080            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4081            .await
4082            .unwrap();
4083
4084        let fake_language_server = fake_language_servers.next().await.unwrap();
4085        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4086            |_, _| async move {
4087                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4088                    lsp::Location::new(
4089                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
4090                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4091                    ),
4092                )))
4093            },
4094        );
4095
4096        let definitions;
4097        let buffer_b2;
4098        if rng.gen() {
4099            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4100            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4101        } else {
4102            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4103            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4104        }
4105
4106        let buffer_b2 = buffer_b2.await.unwrap();
4107        let definitions = definitions.await.unwrap();
4108        assert_eq!(definitions.len(), 1);
4109        assert_eq!(definitions[0].buffer, buffer_b2);
4110    }
4111
4112    #[gpui::test(iterations = 10)]
4113    async fn test_collaborating_with_code_actions(
4114        cx_a: &mut TestAppContext,
4115        cx_b: &mut TestAppContext,
4116    ) {
4117        cx_a.foreground().forbid_parking();
4118        let lang_registry = Arc::new(LanguageRegistry::test());
4119        let fs = FakeFs::new(cx_a.background());
4120        cx_b.update(|cx| editor::init(cx));
4121
4122        // Set up a fake language server.
4123        let mut language = Language::new(
4124            LanguageConfig {
4125                name: "Rust".into(),
4126                path_suffixes: vec!["rs".to_string()],
4127                ..Default::default()
4128            },
4129            Some(tree_sitter_rust::language()),
4130        );
4131        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4132        lang_registry.add(Arc::new(language));
4133
4134        // Connect to a server as 2 clients.
4135        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4136        let client_a = server.create_client(cx_a, "user_a").await;
4137        let client_b = server.create_client(cx_b, "user_b").await;
4138        server
4139            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4140            .await;
4141
4142        // Share a project as client A
4143        fs.insert_tree(
4144            "/a",
4145            json!({
4146                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4147                "other.rs": "pub fn foo() -> usize { 4 }",
4148            }),
4149        )
4150        .await;
4151        let project_a = cx_a.update(|cx| {
4152            Project::local(
4153                client_a.clone(),
4154                client_a.user_store.clone(),
4155                lang_registry.clone(),
4156                fs.clone(),
4157                cx,
4158            )
4159        });
4160        let (worktree_a, _) = project_a
4161            .update(cx_a, |p, cx| {
4162                p.find_or_create_local_worktree("/a", true, cx)
4163            })
4164            .await
4165            .unwrap();
4166        worktree_a
4167            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4168            .await;
4169        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4170        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4171        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4172
4173        // Join the worktree as client B.
4174        let project_b = Project::remote(
4175            project_id,
4176            client_b.clone(),
4177            client_b.user_store.clone(),
4178            lang_registry.clone(),
4179            fs.clone(),
4180            &mut cx_b.to_async(),
4181        )
4182        .await
4183        .unwrap();
4184        let mut params = cx_b.update(WorkspaceParams::test);
4185        params.languages = lang_registry.clone();
4186        params.client = client_b.client.clone();
4187        params.user_store = client_b.user_store.clone();
4188        params.project = project_b;
4189
4190        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4191        let editor_b = workspace_b
4192            .update(cx_b, |workspace, cx| {
4193                workspace.open_path((worktree_id, "main.rs"), true, cx)
4194            })
4195            .await
4196            .unwrap()
4197            .downcast::<Editor>()
4198            .unwrap();
4199
4200        let mut fake_language_server = fake_language_servers.next().await.unwrap();
4201        fake_language_server
4202            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4203                assert_eq!(
4204                    params.text_document.uri,
4205                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4206                );
4207                assert_eq!(params.range.start, lsp::Position::new(0, 0));
4208                assert_eq!(params.range.end, lsp::Position::new(0, 0));
4209                Ok(None)
4210            })
4211            .next()
4212            .await;
4213
4214        // Move cursor to a location that contains code actions.
4215        editor_b.update(cx_b, |editor, cx| {
4216            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4217            cx.focus(&editor_b);
4218        });
4219
4220        fake_language_server
4221            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4222                assert_eq!(
4223                    params.text_document.uri,
4224                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4225                );
4226                assert_eq!(params.range.start, lsp::Position::new(1, 31));
4227                assert_eq!(params.range.end, lsp::Position::new(1, 31));
4228
4229                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4230                    lsp::CodeAction {
4231                        title: "Inline into all callers".to_string(),
4232                        edit: Some(lsp::WorkspaceEdit {
4233                            changes: Some(
4234                                [
4235                                    (
4236                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
4237                                        vec![lsp::TextEdit::new(
4238                                            lsp::Range::new(
4239                                                lsp::Position::new(1, 22),
4240                                                lsp::Position::new(1, 34),
4241                                            ),
4242                                            "4".to_string(),
4243                                        )],
4244                                    ),
4245                                    (
4246                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
4247                                        vec![lsp::TextEdit::new(
4248                                            lsp::Range::new(
4249                                                lsp::Position::new(0, 0),
4250                                                lsp::Position::new(0, 27),
4251                                            ),
4252                                            "".to_string(),
4253                                        )],
4254                                    ),
4255                                ]
4256                                .into_iter()
4257                                .collect(),
4258                            ),
4259                            ..Default::default()
4260                        }),
4261                        data: Some(json!({
4262                            "codeActionParams": {
4263                                "range": {
4264                                    "start": {"line": 1, "column": 31},
4265                                    "end": {"line": 1, "column": 31},
4266                                }
4267                            }
4268                        })),
4269                        ..Default::default()
4270                    },
4271                )]))
4272            })
4273            .next()
4274            .await;
4275
4276        // Toggle code actions and wait for them to display.
4277        editor_b.update(cx_b, |editor, cx| {
4278            editor.toggle_code_actions(
4279                &ToggleCodeActions {
4280                    deployed_from_indicator: false,
4281                },
4282                cx,
4283            );
4284        });
4285        editor_b
4286            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4287            .await;
4288
4289        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4290
4291        // Confirming the code action will trigger a resolve request.
4292        let confirm_action = workspace_b
4293            .update(cx_b, |workspace, cx| {
4294                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4295            })
4296            .unwrap();
4297        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4298            |_, _| async move {
4299                Ok(lsp::CodeAction {
4300                    title: "Inline into all callers".to_string(),
4301                    edit: Some(lsp::WorkspaceEdit {
4302                        changes: Some(
4303                            [
4304                                (
4305                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4306                                    vec![lsp::TextEdit::new(
4307                                        lsp::Range::new(
4308                                            lsp::Position::new(1, 22),
4309                                            lsp::Position::new(1, 34),
4310                                        ),
4311                                        "4".to_string(),
4312                                    )],
4313                                ),
4314                                (
4315                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4316                                    vec![lsp::TextEdit::new(
4317                                        lsp::Range::new(
4318                                            lsp::Position::new(0, 0),
4319                                            lsp::Position::new(0, 27),
4320                                        ),
4321                                        "".to_string(),
4322                                    )],
4323                                ),
4324                            ]
4325                            .into_iter()
4326                            .collect(),
4327                        ),
4328                        ..Default::default()
4329                    }),
4330                    ..Default::default()
4331                })
4332            },
4333        );
4334
4335        // After the action is confirmed, an editor containing both modified files is opened.
4336        confirm_action.await.unwrap();
4337        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4338            workspace
4339                .active_item(cx)
4340                .unwrap()
4341                .downcast::<Editor>()
4342                .unwrap()
4343        });
4344        code_action_editor.update(cx_b, |editor, cx| {
4345            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4346            editor.undo(&Undo, cx);
4347            assert_eq!(
4348                editor.text(cx),
4349                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4350            );
4351            editor.redo(&Redo, cx);
4352            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4353        });
4354    }
4355
4356    #[gpui::test(iterations = 10)]
4357    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4358        cx_a.foreground().forbid_parking();
4359        let lang_registry = Arc::new(LanguageRegistry::test());
4360        let fs = FakeFs::new(cx_a.background());
4361        cx_b.update(|cx| editor::init(cx));
4362
4363        // Set up a fake language server.
4364        let mut language = Language::new(
4365            LanguageConfig {
4366                name: "Rust".into(),
4367                path_suffixes: vec!["rs".to_string()],
4368                ..Default::default()
4369            },
4370            Some(tree_sitter_rust::language()),
4371        );
4372        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4373            capabilities: lsp::ServerCapabilities {
4374                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4375                    prepare_provider: Some(true),
4376                    work_done_progress_options: Default::default(),
4377                })),
4378                ..Default::default()
4379            },
4380            ..Default::default()
4381        });
4382        lang_registry.add(Arc::new(language));
4383
4384        // Connect to a server as 2 clients.
4385        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4386        let client_a = server.create_client(cx_a, "user_a").await;
4387        let client_b = server.create_client(cx_b, "user_b").await;
4388        server
4389            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4390            .await;
4391
4392        // Share a project as client A
4393        fs.insert_tree(
4394            "/dir",
4395            json!({
4396                "one.rs": "const ONE: usize = 1;",
4397                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4398            }),
4399        )
4400        .await;
4401        let project_a = cx_a.update(|cx| {
4402            Project::local(
4403                client_a.clone(),
4404                client_a.user_store.clone(),
4405                lang_registry.clone(),
4406                fs.clone(),
4407                cx,
4408            )
4409        });
4410        let (worktree_a, _) = project_a
4411            .update(cx_a, |p, cx| {
4412                p.find_or_create_local_worktree("/dir", true, cx)
4413            })
4414            .await
4415            .unwrap();
4416        worktree_a
4417            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4418            .await;
4419        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4420        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4421        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4422
4423        // Join the worktree as client B.
4424        let project_b = Project::remote(
4425            project_id,
4426            client_b.clone(),
4427            client_b.user_store.clone(),
4428            lang_registry.clone(),
4429            fs.clone(),
4430            &mut cx_b.to_async(),
4431        )
4432        .await
4433        .unwrap();
4434        let mut params = cx_b.update(WorkspaceParams::test);
4435        params.languages = lang_registry.clone();
4436        params.client = client_b.client.clone();
4437        params.user_store = client_b.user_store.clone();
4438        params.project = project_b;
4439
4440        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4441        let editor_b = workspace_b
4442            .update(cx_b, |workspace, cx| {
4443                workspace.open_path((worktree_id, "one.rs"), true, cx)
4444            })
4445            .await
4446            .unwrap()
4447            .downcast::<Editor>()
4448            .unwrap();
4449        let fake_language_server = fake_language_servers.next().await.unwrap();
4450
4451        // Move cursor to a location that can be renamed.
4452        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4453            editor.select_ranges([7..7], None, cx);
4454            editor.rename(&Rename, cx).unwrap()
4455        });
4456
4457        fake_language_server
4458            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4459                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4460                assert_eq!(params.position, lsp::Position::new(0, 7));
4461                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4462                    lsp::Position::new(0, 6),
4463                    lsp::Position::new(0, 9),
4464                ))))
4465            })
4466            .next()
4467            .await
4468            .unwrap();
4469        prepare_rename.await.unwrap();
4470        editor_b.update(cx_b, |editor, cx| {
4471            let rename = editor.pending_rename().unwrap();
4472            let buffer = editor.buffer().read(cx).snapshot(cx);
4473            assert_eq!(
4474                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4475                6..9
4476            );
4477            rename.editor.update(cx, |rename_editor, cx| {
4478                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4479                    rename_buffer.edit([(0..3, "THREE")], cx);
4480                });
4481            });
4482        });
4483
4484        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4485            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4486        });
4487        fake_language_server
4488            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4489                assert_eq!(
4490                    params.text_document_position.text_document.uri.as_str(),
4491                    "file:///dir/one.rs"
4492                );
4493                assert_eq!(
4494                    params.text_document_position.position,
4495                    lsp::Position::new(0, 6)
4496                );
4497                assert_eq!(params.new_name, "THREE");
4498                Ok(Some(lsp::WorkspaceEdit {
4499                    changes: Some(
4500                        [
4501                            (
4502                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4503                                vec![lsp::TextEdit::new(
4504                                    lsp::Range::new(
4505                                        lsp::Position::new(0, 6),
4506                                        lsp::Position::new(0, 9),
4507                                    ),
4508                                    "THREE".to_string(),
4509                                )],
4510                            ),
4511                            (
4512                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4513                                vec![
4514                                    lsp::TextEdit::new(
4515                                        lsp::Range::new(
4516                                            lsp::Position::new(0, 24),
4517                                            lsp::Position::new(0, 27),
4518                                        ),
4519                                        "THREE".to_string(),
4520                                    ),
4521                                    lsp::TextEdit::new(
4522                                        lsp::Range::new(
4523                                            lsp::Position::new(0, 35),
4524                                            lsp::Position::new(0, 38),
4525                                        ),
4526                                        "THREE".to_string(),
4527                                    ),
4528                                ],
4529                            ),
4530                        ]
4531                        .into_iter()
4532                        .collect(),
4533                    ),
4534                    ..Default::default()
4535                }))
4536            })
4537            .next()
4538            .await
4539            .unwrap();
4540        confirm_rename.await.unwrap();
4541
4542        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4543            workspace
4544                .active_item(cx)
4545                .unwrap()
4546                .downcast::<Editor>()
4547                .unwrap()
4548        });
4549        rename_editor.update(cx_b, |editor, cx| {
4550            assert_eq!(
4551                editor.text(cx),
4552                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4553            );
4554            editor.undo(&Undo, cx);
4555            assert_eq!(
4556                editor.text(cx),
4557                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4558            );
4559            editor.redo(&Redo, cx);
4560            assert_eq!(
4561                editor.text(cx),
4562                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4563            );
4564        });
4565
4566        // Ensure temporary rename edits cannot be undone/redone.
4567        editor_b.update(cx_b, |editor, cx| {
4568            editor.undo(&Undo, cx);
4569            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4570            editor.undo(&Undo, cx);
4571            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4572            editor.redo(&Redo, cx);
4573            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4574        })
4575    }
4576
4577    #[gpui::test(iterations = 10)]
4578    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4579        cx_a.foreground().forbid_parking();
4580
4581        // Connect to a server as 2 clients.
4582        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4583        let client_a = server.create_client(cx_a, "user_a").await;
4584        let client_b = server.create_client(cx_b, "user_b").await;
4585
4586        // Create an org that includes these 2 users.
4587        let db = &server.app_state.db;
4588        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4589        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4590            .await
4591            .unwrap();
4592        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4593            .await
4594            .unwrap();
4595
4596        // Create a channel that includes all the users.
4597        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4598        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4599            .await
4600            .unwrap();
4601        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4602            .await
4603            .unwrap();
4604        db.create_channel_message(
4605            channel_id,
4606            client_b.current_user_id(&cx_b),
4607            "hello A, it's B.",
4608            OffsetDateTime::now_utc(),
4609            1,
4610        )
4611        .await
4612        .unwrap();
4613
4614        let channels_a = cx_a
4615            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4616        channels_a
4617            .condition(cx_a, |list, _| list.available_channels().is_some())
4618            .await;
4619        channels_a.read_with(cx_a, |list, _| {
4620            assert_eq!(
4621                list.available_channels().unwrap(),
4622                &[ChannelDetails {
4623                    id: channel_id.to_proto(),
4624                    name: "test-channel".to_string()
4625                }]
4626            )
4627        });
4628        let channel_a = channels_a.update(cx_a, |this, cx| {
4629            this.get_channel(channel_id.to_proto(), cx).unwrap()
4630        });
4631        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4632        channel_a
4633            .condition(&cx_a, |channel, _| {
4634                channel_messages(channel)
4635                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4636            })
4637            .await;
4638
4639        let channels_b = cx_b
4640            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4641        channels_b
4642            .condition(cx_b, |list, _| list.available_channels().is_some())
4643            .await;
4644        channels_b.read_with(cx_b, |list, _| {
4645            assert_eq!(
4646                list.available_channels().unwrap(),
4647                &[ChannelDetails {
4648                    id: channel_id.to_proto(),
4649                    name: "test-channel".to_string()
4650                }]
4651            )
4652        });
4653
4654        let channel_b = channels_b.update(cx_b, |this, cx| {
4655            this.get_channel(channel_id.to_proto(), cx).unwrap()
4656        });
4657        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4658        channel_b
4659            .condition(&cx_b, |channel, _| {
4660                channel_messages(channel)
4661                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4662            })
4663            .await;
4664
4665        channel_a
4666            .update(cx_a, |channel, cx| {
4667                channel
4668                    .send_message("oh, hi B.".to_string(), cx)
4669                    .unwrap()
4670                    .detach();
4671                let task = channel.send_message("sup".to_string(), cx).unwrap();
4672                assert_eq!(
4673                    channel_messages(channel),
4674                    &[
4675                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4676                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4677                        ("user_a".to_string(), "sup".to_string(), true)
4678                    ]
4679                );
4680                task
4681            })
4682            .await
4683            .unwrap();
4684
4685        channel_b
4686            .condition(&cx_b, |channel, _| {
4687                channel_messages(channel)
4688                    == [
4689                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4690                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4691                        ("user_a".to_string(), "sup".to_string(), false),
4692                    ]
4693            })
4694            .await;
4695
4696        assert_eq!(
4697            server
4698                .state()
4699                .await
4700                .channel(channel_id)
4701                .unwrap()
4702                .connection_ids
4703                .len(),
4704            2
4705        );
4706        cx_b.update(|_| drop(channel_b));
4707        server
4708            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4709            .await;
4710
4711        cx_a.update(|_| drop(channel_a));
4712        server
4713            .condition(|state| state.channel(channel_id).is_none())
4714            .await;
4715    }
4716
4717    #[gpui::test(iterations = 10)]
4718    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4719        cx_a.foreground().forbid_parking();
4720
4721        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4722        let client_a = server.create_client(cx_a, "user_a").await;
4723
4724        let db = &server.app_state.db;
4725        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4726        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4727        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4728            .await
4729            .unwrap();
4730        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4731            .await
4732            .unwrap();
4733
4734        let channels_a = cx_a
4735            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4736        channels_a
4737            .condition(cx_a, |list, _| list.available_channels().is_some())
4738            .await;
4739        let channel_a = channels_a.update(cx_a, |this, cx| {
4740            this.get_channel(channel_id.to_proto(), cx).unwrap()
4741        });
4742
4743        // Messages aren't allowed to be too long.
4744        channel_a
4745            .update(cx_a, |channel, cx| {
4746                let long_body = "this is long.\n".repeat(1024);
4747                channel.send_message(long_body, cx).unwrap()
4748            })
4749            .await
4750            .unwrap_err();
4751
4752        // Messages aren't allowed to be blank.
4753        channel_a.update(cx_a, |channel, cx| {
4754            channel.send_message(String::new(), cx).unwrap_err()
4755        });
4756
4757        // Leading and trailing whitespace are trimmed.
4758        channel_a
4759            .update(cx_a, |channel, cx| {
4760                channel
4761                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4762                    .unwrap()
4763            })
4764            .await
4765            .unwrap();
4766        assert_eq!(
4767            db.get_channel_messages(channel_id, 10, None)
4768                .await
4769                .unwrap()
4770                .iter()
4771                .map(|m| &m.body)
4772                .collect::<Vec<_>>(),
4773            &["surrounded by whitespace"]
4774        );
4775    }
4776
4777    #[gpui::test(iterations = 10)]
4778    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4779        cx_a.foreground().forbid_parking();
4780
4781        // Connect to a server as 2 clients.
4782        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4783        let client_a = server.create_client(cx_a, "user_a").await;
4784        let client_b = server.create_client(cx_b, "user_b").await;
4785        let mut status_b = client_b.status();
4786
4787        // Create an org that includes these 2 users.
4788        let db = &server.app_state.db;
4789        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4790        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4791            .await
4792            .unwrap();
4793        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4794            .await
4795            .unwrap();
4796
4797        // Create a channel that includes all the users.
4798        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4799        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4800            .await
4801            .unwrap();
4802        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4803            .await
4804            .unwrap();
4805        db.create_channel_message(
4806            channel_id,
4807            client_b.current_user_id(&cx_b),
4808            "hello A, it's B.",
4809            OffsetDateTime::now_utc(),
4810            2,
4811        )
4812        .await
4813        .unwrap();
4814
4815        let channels_a = cx_a
4816            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4817        channels_a
4818            .condition(cx_a, |list, _| list.available_channels().is_some())
4819            .await;
4820
4821        channels_a.read_with(cx_a, |list, _| {
4822            assert_eq!(
4823                list.available_channels().unwrap(),
4824                &[ChannelDetails {
4825                    id: channel_id.to_proto(),
4826                    name: "test-channel".to_string()
4827                }]
4828            )
4829        });
4830        let channel_a = channels_a.update(cx_a, |this, cx| {
4831            this.get_channel(channel_id.to_proto(), cx).unwrap()
4832        });
4833        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4834        channel_a
4835            .condition(&cx_a, |channel, _| {
4836                channel_messages(channel)
4837                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4838            })
4839            .await;
4840
4841        let channels_b = cx_b
4842            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4843        channels_b
4844            .condition(cx_b, |list, _| list.available_channels().is_some())
4845            .await;
4846        channels_b.read_with(cx_b, |list, _| {
4847            assert_eq!(
4848                list.available_channels().unwrap(),
4849                &[ChannelDetails {
4850                    id: channel_id.to_proto(),
4851                    name: "test-channel".to_string()
4852                }]
4853            )
4854        });
4855
4856        let channel_b = channels_b.update(cx_b, |this, cx| {
4857            this.get_channel(channel_id.to_proto(), cx).unwrap()
4858        });
4859        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4860        channel_b
4861            .condition(&cx_b, |channel, _| {
4862                channel_messages(channel)
4863                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4864            })
4865            .await;
4866
4867        // Disconnect client B, ensuring we can still access its cached channel data.
4868        server.forbid_connections();
4869        server.disconnect_client(client_b.current_user_id(&cx_b));
4870        cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4871        while !matches!(
4872            status_b.next().await,
4873            Some(client::Status::ReconnectionError { .. })
4874        ) {}
4875
4876        channels_b.read_with(cx_b, |channels, _| {
4877            assert_eq!(
4878                channels.available_channels().unwrap(),
4879                [ChannelDetails {
4880                    id: channel_id.to_proto(),
4881                    name: "test-channel".to_string()
4882                }]
4883            )
4884        });
4885        channel_b.read_with(cx_b, |channel, _| {
4886            assert_eq!(
4887                channel_messages(channel),
4888                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4889            )
4890        });
4891
4892        // Send a message from client B while it is disconnected.
4893        channel_b
4894            .update(cx_b, |channel, cx| {
4895                let task = channel
4896                    .send_message("can you see this?".to_string(), cx)
4897                    .unwrap();
4898                assert_eq!(
4899                    channel_messages(channel),
4900                    &[
4901                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4902                        ("user_b".to_string(), "can you see this?".to_string(), true)
4903                    ]
4904                );
4905                task
4906            })
4907            .await
4908            .unwrap_err();
4909
4910        // Send a message from client A while B is disconnected.
4911        channel_a
4912            .update(cx_a, |channel, cx| {
4913                channel
4914                    .send_message("oh, hi B.".to_string(), cx)
4915                    .unwrap()
4916                    .detach();
4917                let task = channel.send_message("sup".to_string(), cx).unwrap();
4918                assert_eq!(
4919                    channel_messages(channel),
4920                    &[
4921                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4922                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4923                        ("user_a".to_string(), "sup".to_string(), true)
4924                    ]
4925                );
4926                task
4927            })
4928            .await
4929            .unwrap();
4930
4931        // Give client B a chance to reconnect.
4932        server.allow_connections();
4933        cx_b.foreground().advance_clock(Duration::from_secs(10));
4934
4935        // Verify that B sees the new messages upon reconnection, as well as the message client B
4936        // sent while offline.
4937        channel_b
4938            .condition(&cx_b, |channel, _| {
4939                channel_messages(channel)
4940                    == [
4941                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4942                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4943                        ("user_a".to_string(), "sup".to_string(), false),
4944                        ("user_b".to_string(), "can you see this?".to_string(), false),
4945                    ]
4946            })
4947            .await;
4948
4949        // Ensure client A and B can communicate normally after reconnection.
4950        channel_a
4951            .update(cx_a, |channel, cx| {
4952                channel.send_message("you online?".to_string(), cx).unwrap()
4953            })
4954            .await
4955            .unwrap();
4956        channel_b
4957            .condition(&cx_b, |channel, _| {
4958                channel_messages(channel)
4959                    == [
4960                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4961                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4962                        ("user_a".to_string(), "sup".to_string(), false),
4963                        ("user_b".to_string(), "can you see this?".to_string(), false),
4964                        ("user_a".to_string(), "you online?".to_string(), false),
4965                    ]
4966            })
4967            .await;
4968
4969        channel_b
4970            .update(cx_b, |channel, cx| {
4971                channel.send_message("yep".to_string(), cx).unwrap()
4972            })
4973            .await
4974            .unwrap();
4975        channel_a
4976            .condition(&cx_a, |channel, _| {
4977                channel_messages(channel)
4978                    == [
4979                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4980                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4981                        ("user_a".to_string(), "sup".to_string(), false),
4982                        ("user_b".to_string(), "can you see this?".to_string(), false),
4983                        ("user_a".to_string(), "you online?".to_string(), false),
4984                        ("user_b".to_string(), "yep".to_string(), false),
4985                    ]
4986            })
4987            .await;
4988    }
4989
4990    #[gpui::test(iterations = 10)]
4991    async fn test_contacts(
4992        deterministic: Arc<Deterministic>,
4993        cx_a: &mut TestAppContext,
4994        cx_b: &mut TestAppContext,
4995        cx_c: &mut TestAppContext,
4996    ) {
4997        cx_a.foreground().forbid_parking();
4998        let lang_registry = Arc::new(LanguageRegistry::test());
4999        let fs = FakeFs::new(cx_a.background());
5000
5001        // Connect to a server as 3 clients.
5002        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5003        let client_a = server.create_client(cx_a, "user_a").await;
5004        let client_b = server.create_client(cx_b, "user_b").await;
5005        let client_c = server.create_client(cx_c, "user_c").await;
5006        server
5007            .make_contacts(vec![
5008                (&client_a, cx_a),
5009                (&client_b, cx_b),
5010                (&client_c, cx_c),
5011            ])
5012            .await;
5013
5014        deterministic.run_until_parked();
5015        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5016            client.user_store.read_with(*cx, |store, _| {
5017                assert_eq!(
5018                    contacts(store),
5019                    [
5020                        ("user_a", true, vec![]),
5021                        ("user_b", true, vec![]),
5022                        ("user_c", true, vec![])
5023                    ]
5024                )
5025            });
5026        }
5027
5028        // Share a worktree as client A.
5029        fs.create_dir(Path::new("/a")).await.unwrap();
5030
5031        let project_a = cx_a.update(|cx| {
5032            Project::local(
5033                client_a.clone(),
5034                client_a.user_store.clone(),
5035                lang_registry.clone(),
5036                fs.clone(),
5037                cx,
5038            )
5039        });
5040        let (worktree_a, _) = project_a
5041            .update(cx_a, |p, cx| {
5042                p.find_or_create_local_worktree("/a", true, cx)
5043            })
5044            .await
5045            .unwrap();
5046        worktree_a
5047            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
5048            .await;
5049
5050        deterministic.run_until_parked();
5051        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5052            client.user_store.read_with(*cx, |store, _| {
5053                assert_eq!(
5054                    contacts(store),
5055                    [
5056                        ("user_a", true, vec![("a", false, vec![])]),
5057                        ("user_b", true, vec![]),
5058                        ("user_c", true, vec![])
5059                    ]
5060                )
5061            });
5062        }
5063
5064        let project_id = project_a
5065            .update(cx_a, |project, _| project.next_remote_id())
5066            .await;
5067        project_a
5068            .update(cx_a, |project, cx| project.share(cx))
5069            .await
5070            .unwrap();
5071
5072        deterministic.run_until_parked();
5073        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5074            client.user_store.read_with(*cx, |store, _| {
5075                assert_eq!(
5076                    contacts(store),
5077                    [
5078                        ("user_a", true, vec![("a", true, vec![])]),
5079                        ("user_b", true, vec![]),
5080                        ("user_c", true, vec![])
5081                    ]
5082                )
5083            });
5084        }
5085
5086        let _project_b = Project::remote(
5087            project_id,
5088            client_b.clone(),
5089            client_b.user_store.clone(),
5090            lang_registry.clone(),
5091            fs.clone(),
5092            &mut cx_b.to_async(),
5093        )
5094        .await
5095        .unwrap();
5096
5097        deterministic.run_until_parked();
5098        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5099            client.user_store.read_with(*cx, |store, _| {
5100                assert_eq!(
5101                    contacts(store),
5102                    [
5103                        ("user_a", true, vec![("a", true, vec!["user_b"])]),
5104                        ("user_b", true, vec![]),
5105                        ("user_c", true, vec![])
5106                    ]
5107                )
5108            });
5109        }
5110
5111        project_a
5112            .condition(&cx_a, |project, _| {
5113                project.collaborators().contains_key(&client_b.peer_id)
5114            })
5115            .await;
5116
5117        cx_a.update(move |_| drop(project_a));
5118        deterministic.run_until_parked();
5119        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5120            client.user_store.read_with(*cx, |store, _| {
5121                assert_eq!(
5122                    contacts(store),
5123                    [
5124                        ("user_a", true, vec![]),
5125                        ("user_b", true, vec![]),
5126                        ("user_c", true, vec![])
5127                    ]
5128                )
5129            });
5130        }
5131
5132        server.disconnect_client(client_c.current_user_id(cx_c));
5133        server.forbid_connections();
5134        deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5135        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5136            client.user_store.read_with(*cx, |store, _| {
5137                assert_eq!(
5138                    contacts(store),
5139                    [
5140                        ("user_a", true, vec![]),
5141                        ("user_b", true, vec![]),
5142                        ("user_c", false, vec![])
5143                    ]
5144                )
5145            });
5146        }
5147        client_c
5148            .user_store
5149            .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5150
5151        server.allow_connections();
5152        client_c
5153            .authenticate_and_connect(false, &cx_c.to_async())
5154            .await
5155            .unwrap();
5156
5157        deterministic.run_until_parked();
5158        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5159            client.user_store.read_with(*cx, |store, _| {
5160                assert_eq!(
5161                    contacts(store),
5162                    [
5163                        ("user_a", true, vec![]),
5164                        ("user_b", true, vec![]),
5165                        ("user_c", true, vec![])
5166                    ]
5167                )
5168            });
5169        }
5170
5171        fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, bool, Vec<&str>)>)> {
5172            user_store
5173                .contacts()
5174                .iter()
5175                .map(|contact| {
5176                    let worktrees = contact
5177                        .projects
5178                        .iter()
5179                        .map(|p| {
5180                            (
5181                                p.worktree_root_names[0].as_str(),
5182                                p.is_shared,
5183                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5184                            )
5185                        })
5186                        .collect();
5187                    (
5188                        contact.user.github_login.as_str(),
5189                        contact.online,
5190                        worktrees,
5191                    )
5192                })
5193                .collect()
5194        }
5195    }
5196
5197    #[gpui::test(iterations = 10)]
5198    async fn test_contact_requests(
5199        executor: Arc<Deterministic>,
5200        cx_a: &mut TestAppContext,
5201        cx_a2: &mut TestAppContext,
5202        cx_b: &mut TestAppContext,
5203        cx_b2: &mut TestAppContext,
5204        cx_c: &mut TestAppContext,
5205        cx_c2: &mut TestAppContext,
5206    ) {
5207        cx_a.foreground().forbid_parking();
5208
5209        // Connect to a server as 3 clients.
5210        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5211        let client_a = server.create_client(cx_a, "user_a").await;
5212        let client_a2 = server.create_client(cx_a2, "user_a").await;
5213        let client_b = server.create_client(cx_b, "user_b").await;
5214        let client_b2 = server.create_client(cx_b2, "user_b").await;
5215        let client_c = server.create_client(cx_c, "user_c").await;
5216        let client_c2 = server.create_client(cx_c2, "user_c").await;
5217
5218        assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5219        assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5220        assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5221
5222        // User A and User C request that user B become their contact.
5223        client_a
5224            .user_store
5225            .update(cx_a, |store, cx| {
5226                store.request_contact(client_b.user_id().unwrap(), cx)
5227            })
5228            .await
5229            .unwrap();
5230        client_c
5231            .user_store
5232            .update(cx_c, |store, cx| {
5233                store.request_contact(client_b.user_id().unwrap(), cx)
5234            })
5235            .await
5236            .unwrap();
5237        executor.run_until_parked();
5238
5239        // All users see the pending request appear in all their clients.
5240        assert_eq!(
5241            client_a.summarize_contacts(&cx_a).outgoing_requests,
5242            &["user_b"]
5243        );
5244        assert_eq!(
5245            client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5246            &["user_b"]
5247        );
5248        assert_eq!(
5249            client_b.summarize_contacts(&cx_b).incoming_requests,
5250            &["user_a", "user_c"]
5251        );
5252        assert_eq!(
5253            client_b2.summarize_contacts(&cx_b2).incoming_requests,
5254            &["user_a", "user_c"]
5255        );
5256        assert_eq!(
5257            client_c.summarize_contacts(&cx_c).outgoing_requests,
5258            &["user_b"]
5259        );
5260        assert_eq!(
5261            client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5262            &["user_b"]
5263        );
5264
5265        // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5266        disconnect_and_reconnect(&client_a, cx_a).await;
5267        disconnect_and_reconnect(&client_b, cx_b).await;
5268        disconnect_and_reconnect(&client_c, cx_c).await;
5269        executor.run_until_parked();
5270        assert_eq!(
5271            client_a.summarize_contacts(&cx_a).outgoing_requests,
5272            &["user_b"]
5273        );
5274        assert_eq!(
5275            client_b.summarize_contacts(&cx_b).incoming_requests,
5276            &["user_a", "user_c"]
5277        );
5278        assert_eq!(
5279            client_c.summarize_contacts(&cx_c).outgoing_requests,
5280            &["user_b"]
5281        );
5282
5283        // User B accepts the request from user A.
5284        client_b
5285            .user_store
5286            .update(cx_b, |store, cx| {
5287                store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5288            })
5289            .await
5290            .unwrap();
5291
5292        executor.run_until_parked();
5293
5294        // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5295        let contacts_b = client_b.summarize_contacts(&cx_b);
5296        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5297        assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5298        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5299        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5300        assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5301
5302        // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5303        let contacts_a = client_a.summarize_contacts(&cx_a);
5304        assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5305        assert!(contacts_a.outgoing_requests.is_empty());
5306        let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5307        assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5308        assert!(contacts_a2.outgoing_requests.is_empty());
5309
5310        // Contacts are present upon connecting (tested here via disconnect/reconnect)
5311        disconnect_and_reconnect(&client_a, cx_a).await;
5312        disconnect_and_reconnect(&client_b, cx_b).await;
5313        disconnect_and_reconnect(&client_c, cx_c).await;
5314        executor.run_until_parked();
5315        assert_eq!(
5316            client_a.summarize_contacts(&cx_a).current,
5317            &["user_a", "user_b"]
5318        );
5319        assert_eq!(
5320            client_b.summarize_contacts(&cx_b).current,
5321            &["user_a", "user_b"]
5322        );
5323        assert_eq!(
5324            client_b.summarize_contacts(&cx_b).incoming_requests,
5325            &["user_c"]
5326        );
5327        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5328        assert_eq!(
5329            client_c.summarize_contacts(&cx_c).outgoing_requests,
5330            &["user_b"]
5331        );
5332
5333        // User B rejects the request from user C.
5334        client_b
5335            .user_store
5336            .update(cx_b, |store, cx| {
5337                store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5338            })
5339            .await
5340            .unwrap();
5341
5342        executor.run_until_parked();
5343
5344        // User B doesn't see user C as their contact, and the incoming request from them is removed.
5345        let contacts_b = client_b.summarize_contacts(&cx_b);
5346        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5347        assert!(contacts_b.incoming_requests.is_empty());
5348        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5349        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5350        assert!(contacts_b2.incoming_requests.is_empty());
5351
5352        // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5353        let contacts_c = client_c.summarize_contacts(&cx_c);
5354        assert_eq!(contacts_c.current, &["user_c"]);
5355        assert!(contacts_c.outgoing_requests.is_empty());
5356        let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5357        assert_eq!(contacts_c2.current, &["user_c"]);
5358        assert!(contacts_c2.outgoing_requests.is_empty());
5359
5360        // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5361        disconnect_and_reconnect(&client_a, cx_a).await;
5362        disconnect_and_reconnect(&client_b, cx_b).await;
5363        disconnect_and_reconnect(&client_c, cx_c).await;
5364        executor.run_until_parked();
5365        assert_eq!(
5366            client_a.summarize_contacts(&cx_a).current,
5367            &["user_a", "user_b"]
5368        );
5369        assert_eq!(
5370            client_b.summarize_contacts(&cx_b).current,
5371            &["user_a", "user_b"]
5372        );
5373        assert!(client_b
5374            .summarize_contacts(&cx_b)
5375            .incoming_requests
5376            .is_empty());
5377        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5378        assert!(client_c
5379            .summarize_contacts(&cx_c)
5380            .outgoing_requests
5381            .is_empty());
5382
5383        async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5384            client.disconnect(&cx.to_async()).unwrap();
5385            client.clear_contacts(cx).await;
5386            client
5387                .authenticate_and_connect(false, &cx.to_async())
5388                .await
5389                .unwrap();
5390        }
5391    }
5392
5393    #[gpui::test(iterations = 10)]
5394    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5395        cx_a.foreground().forbid_parking();
5396        let fs = FakeFs::new(cx_a.background());
5397
5398        // 2 clients connect to a server.
5399        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5400        let mut client_a = server.create_client(cx_a, "user_a").await;
5401        let mut client_b = server.create_client(cx_b, "user_b").await;
5402        server
5403            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5404            .await;
5405        cx_a.update(editor::init);
5406        cx_b.update(editor::init);
5407
5408        // Client A shares a project.
5409        fs.insert_tree(
5410            "/a",
5411            json!({
5412                "1.txt": "one",
5413                "2.txt": "two",
5414                "3.txt": "three",
5415            }),
5416        )
5417        .await;
5418        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5419        project_a
5420            .update(cx_a, |project, cx| project.share(cx))
5421            .await
5422            .unwrap();
5423
5424        // Client B joins the project.
5425        let project_b = client_b
5426            .build_remote_project(
5427                project_a
5428                    .read_with(cx_a, |project, _| project.remote_id())
5429                    .unwrap(),
5430                cx_b,
5431            )
5432            .await;
5433
5434        // Client A opens some editors.
5435        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5436        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5437        let editor_a1 = workspace_a
5438            .update(cx_a, |workspace, cx| {
5439                workspace.open_path((worktree_id, "1.txt"), true, cx)
5440            })
5441            .await
5442            .unwrap()
5443            .downcast::<Editor>()
5444            .unwrap();
5445        let editor_a2 = workspace_a
5446            .update(cx_a, |workspace, cx| {
5447                workspace.open_path((worktree_id, "2.txt"), true, cx)
5448            })
5449            .await
5450            .unwrap()
5451            .downcast::<Editor>()
5452            .unwrap();
5453
5454        // Client B opens an editor.
5455        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5456        let editor_b1 = workspace_b
5457            .update(cx_b, |workspace, cx| {
5458                workspace.open_path((worktree_id, "1.txt"), true, cx)
5459            })
5460            .await
5461            .unwrap()
5462            .downcast::<Editor>()
5463            .unwrap();
5464
5465        let client_a_id = project_b.read_with(cx_b, |project, _| {
5466            project.collaborators().values().next().unwrap().peer_id
5467        });
5468        let client_b_id = project_a.read_with(cx_a, |project, _| {
5469            project.collaborators().values().next().unwrap().peer_id
5470        });
5471
5472        // When client B starts following client A, all visible view states are replicated to client B.
5473        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5474        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5475        workspace_b
5476            .update(cx_b, |workspace, cx| {
5477                workspace
5478                    .toggle_follow(&ToggleFollow(client_a_id), cx)
5479                    .unwrap()
5480            })
5481            .await
5482            .unwrap();
5483        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5484            workspace
5485                .active_item(cx)
5486                .unwrap()
5487                .downcast::<Editor>()
5488                .unwrap()
5489        });
5490        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5491        assert_eq!(
5492            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5493            Some((worktree_id, "2.txt").into())
5494        );
5495        assert_eq!(
5496            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5497            vec![2..3]
5498        );
5499        assert_eq!(
5500            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5501            vec![0..1]
5502        );
5503
5504        // When client A activates a different editor, client B does so as well.
5505        workspace_a.update(cx_a, |workspace, cx| {
5506            workspace.activate_item(&editor_a1, cx)
5507        });
5508        workspace_b
5509            .condition(cx_b, |workspace, cx| {
5510                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5511            })
5512            .await;
5513
5514        // When client A navigates back and forth, client B does so as well.
5515        workspace_a
5516            .update(cx_a, |workspace, cx| {
5517                workspace::Pane::go_back(workspace, None, cx)
5518            })
5519            .await;
5520        workspace_b
5521            .condition(cx_b, |workspace, cx| {
5522                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5523            })
5524            .await;
5525
5526        workspace_a
5527            .update(cx_a, |workspace, cx| {
5528                workspace::Pane::go_forward(workspace, None, cx)
5529            })
5530            .await;
5531        workspace_b
5532            .condition(cx_b, |workspace, cx| {
5533                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5534            })
5535            .await;
5536
5537        // Changes to client A's editor are reflected on client B.
5538        editor_a1.update(cx_a, |editor, cx| {
5539            editor.select_ranges([1..1, 2..2], None, cx);
5540        });
5541        editor_b1
5542            .condition(cx_b, |editor, cx| {
5543                editor.selected_ranges(cx) == vec![1..1, 2..2]
5544            })
5545            .await;
5546
5547        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5548        editor_b1
5549            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5550            .await;
5551
5552        editor_a1.update(cx_a, |editor, cx| {
5553            editor.select_ranges([3..3], None, cx);
5554            editor.set_scroll_position(vec2f(0., 100.), cx);
5555        });
5556        editor_b1
5557            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5558            .await;
5559
5560        // After unfollowing, client B stops receiving updates from client A.
5561        workspace_b.update(cx_b, |workspace, cx| {
5562            workspace.unfollow(&workspace.active_pane().clone(), cx)
5563        });
5564        workspace_a.update(cx_a, |workspace, cx| {
5565            workspace.activate_item(&editor_a2, cx)
5566        });
5567        cx_a.foreground().run_until_parked();
5568        assert_eq!(
5569            workspace_b.read_with(cx_b, |workspace, cx| workspace
5570                .active_item(cx)
5571                .unwrap()
5572                .id()),
5573            editor_b1.id()
5574        );
5575
5576        // Client A starts following client B.
5577        workspace_a
5578            .update(cx_a, |workspace, cx| {
5579                workspace
5580                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5581                    .unwrap()
5582            })
5583            .await
5584            .unwrap();
5585        assert_eq!(
5586            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5587            Some(client_b_id)
5588        );
5589        assert_eq!(
5590            workspace_a.read_with(cx_a, |workspace, cx| workspace
5591                .active_item(cx)
5592                .unwrap()
5593                .id()),
5594            editor_a1.id()
5595        );
5596
5597        // Following interrupts when client B disconnects.
5598        client_b.disconnect(&cx_b.to_async()).unwrap();
5599        cx_a.foreground().run_until_parked();
5600        assert_eq!(
5601            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5602            None
5603        );
5604    }
5605
5606    #[gpui::test(iterations = 10)]
5607    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5608        cx_a.foreground().forbid_parking();
5609        let fs = FakeFs::new(cx_a.background());
5610
5611        // 2 clients connect to a server.
5612        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5613        let mut client_a = server.create_client(cx_a, "user_a").await;
5614        let mut client_b = server.create_client(cx_b, "user_b").await;
5615        server
5616            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5617            .await;
5618        cx_a.update(editor::init);
5619        cx_b.update(editor::init);
5620
5621        // Client A shares a project.
5622        fs.insert_tree(
5623            "/a",
5624            json!({
5625                "1.txt": "one",
5626                "2.txt": "two",
5627                "3.txt": "three",
5628                "4.txt": "four",
5629            }),
5630        )
5631        .await;
5632        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5633        project_a
5634            .update(cx_a, |project, cx| project.share(cx))
5635            .await
5636            .unwrap();
5637
5638        // Client B joins the project.
5639        let project_b = client_b
5640            .build_remote_project(
5641                project_a
5642                    .read_with(cx_a, |project, _| project.remote_id())
5643                    .unwrap(),
5644                cx_b,
5645            )
5646            .await;
5647
5648        // Client A opens some editors.
5649        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5650        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5651        let _editor_a1 = workspace_a
5652            .update(cx_a, |workspace, cx| {
5653                workspace.open_path((worktree_id, "1.txt"), true, cx)
5654            })
5655            .await
5656            .unwrap()
5657            .downcast::<Editor>()
5658            .unwrap();
5659
5660        // Client B opens an editor.
5661        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5662        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5663        let _editor_b1 = workspace_b
5664            .update(cx_b, |workspace, cx| {
5665                workspace.open_path((worktree_id, "2.txt"), true, cx)
5666            })
5667            .await
5668            .unwrap()
5669            .downcast::<Editor>()
5670            .unwrap();
5671
5672        // Clients A and B follow each other in split panes
5673        workspace_a
5674            .update(cx_a, |workspace, cx| {
5675                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5676                assert_ne!(*workspace.active_pane(), pane_a1);
5677                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5678                workspace
5679                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5680                    .unwrap()
5681            })
5682            .await
5683            .unwrap();
5684        workspace_b
5685            .update(cx_b, |workspace, cx| {
5686                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5687                assert_ne!(*workspace.active_pane(), pane_b1);
5688                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5689                workspace
5690                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5691                    .unwrap()
5692            })
5693            .await
5694            .unwrap();
5695
5696        workspace_a
5697            .update(cx_a, |workspace, cx| {
5698                workspace.activate_next_pane(cx);
5699                assert_eq!(*workspace.active_pane(), pane_a1);
5700                workspace.open_path((worktree_id, "3.txt"), true, cx)
5701            })
5702            .await
5703            .unwrap();
5704        workspace_b
5705            .update(cx_b, |workspace, cx| {
5706                workspace.activate_next_pane(cx);
5707                assert_eq!(*workspace.active_pane(), pane_b1);
5708                workspace.open_path((worktree_id, "4.txt"), true, cx)
5709            })
5710            .await
5711            .unwrap();
5712        cx_a.foreground().run_until_parked();
5713
5714        // Ensure leader updates don't change the active pane of followers
5715        workspace_a.read_with(cx_a, |workspace, _| {
5716            assert_eq!(*workspace.active_pane(), pane_a1);
5717        });
5718        workspace_b.read_with(cx_b, |workspace, _| {
5719            assert_eq!(*workspace.active_pane(), pane_b1);
5720        });
5721
5722        // Ensure peers following each other doesn't cause an infinite loop.
5723        assert_eq!(
5724            workspace_a.read_with(cx_a, |workspace, cx| workspace
5725                .active_item(cx)
5726                .unwrap()
5727                .project_path(cx)),
5728            Some((worktree_id, "3.txt").into())
5729        );
5730        workspace_a.update(cx_a, |workspace, cx| {
5731            assert_eq!(
5732                workspace.active_item(cx).unwrap().project_path(cx),
5733                Some((worktree_id, "3.txt").into())
5734            );
5735            workspace.activate_next_pane(cx);
5736            assert_eq!(
5737                workspace.active_item(cx).unwrap().project_path(cx),
5738                Some((worktree_id, "4.txt").into())
5739            );
5740        });
5741        workspace_b.update(cx_b, |workspace, cx| {
5742            assert_eq!(
5743                workspace.active_item(cx).unwrap().project_path(cx),
5744                Some((worktree_id, "4.txt").into())
5745            );
5746            workspace.activate_next_pane(cx);
5747            assert_eq!(
5748                workspace.active_item(cx).unwrap().project_path(cx),
5749                Some((worktree_id, "3.txt").into())
5750            );
5751        });
5752    }
5753
5754    #[gpui::test(iterations = 10)]
5755    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5756        cx_a.foreground().forbid_parking();
5757        let fs = FakeFs::new(cx_a.background());
5758
5759        // 2 clients connect to a server.
5760        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5761        let mut client_a = server.create_client(cx_a, "user_a").await;
5762        let mut client_b = server.create_client(cx_b, "user_b").await;
5763        server
5764            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5765            .await;
5766        cx_a.update(editor::init);
5767        cx_b.update(editor::init);
5768
5769        // Client A shares a project.
5770        fs.insert_tree(
5771            "/a",
5772            json!({
5773                "1.txt": "one",
5774                "2.txt": "two",
5775                "3.txt": "three",
5776            }),
5777        )
5778        .await;
5779        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5780        project_a
5781            .update(cx_a, |project, cx| project.share(cx))
5782            .await
5783            .unwrap();
5784
5785        // Client B joins the project.
5786        let project_b = client_b
5787            .build_remote_project(
5788                project_a
5789                    .read_with(cx_a, |project, _| project.remote_id())
5790                    .unwrap(),
5791                cx_b,
5792            )
5793            .await;
5794
5795        // Client A opens some editors.
5796        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5797        let _editor_a1 = workspace_a
5798            .update(cx_a, |workspace, cx| {
5799                workspace.open_path((worktree_id, "1.txt"), true, cx)
5800            })
5801            .await
5802            .unwrap()
5803            .downcast::<Editor>()
5804            .unwrap();
5805
5806        // Client B starts following client A.
5807        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5808        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5809        let leader_id = project_b.read_with(cx_b, |project, _| {
5810            project.collaborators().values().next().unwrap().peer_id
5811        });
5812        workspace_b
5813            .update(cx_b, |workspace, cx| {
5814                workspace
5815                    .toggle_follow(&ToggleFollow(leader_id), cx)
5816                    .unwrap()
5817            })
5818            .await
5819            .unwrap();
5820        assert_eq!(
5821            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5822            Some(leader_id)
5823        );
5824        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5825            workspace
5826                .active_item(cx)
5827                .unwrap()
5828                .downcast::<Editor>()
5829                .unwrap()
5830        });
5831
5832        // When client B moves, it automatically stops following client A.
5833        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5834        assert_eq!(
5835            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5836            None
5837        );
5838
5839        workspace_b
5840            .update(cx_b, |workspace, cx| {
5841                workspace
5842                    .toggle_follow(&ToggleFollow(leader_id), cx)
5843                    .unwrap()
5844            })
5845            .await
5846            .unwrap();
5847        assert_eq!(
5848            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5849            Some(leader_id)
5850        );
5851
5852        // When client B edits, it automatically stops following client A.
5853        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5854        assert_eq!(
5855            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5856            None
5857        );
5858
5859        workspace_b
5860            .update(cx_b, |workspace, cx| {
5861                workspace
5862                    .toggle_follow(&ToggleFollow(leader_id), cx)
5863                    .unwrap()
5864            })
5865            .await
5866            .unwrap();
5867        assert_eq!(
5868            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5869            Some(leader_id)
5870        );
5871
5872        // When client B scrolls, it automatically stops following client A.
5873        editor_b2.update(cx_b, |editor, cx| {
5874            editor.set_scroll_position(vec2f(0., 3.), cx)
5875        });
5876        assert_eq!(
5877            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5878            None
5879        );
5880
5881        workspace_b
5882            .update(cx_b, |workspace, cx| {
5883                workspace
5884                    .toggle_follow(&ToggleFollow(leader_id), cx)
5885                    .unwrap()
5886            })
5887            .await
5888            .unwrap();
5889        assert_eq!(
5890            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5891            Some(leader_id)
5892        );
5893
5894        // When client B activates a different pane, it continues following client A in the original pane.
5895        workspace_b.update(cx_b, |workspace, cx| {
5896            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5897        });
5898        assert_eq!(
5899            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5900            Some(leader_id)
5901        );
5902
5903        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5904        assert_eq!(
5905            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5906            Some(leader_id)
5907        );
5908
5909        // When client B activates a different item in the original pane, it automatically stops following client A.
5910        workspace_b
5911            .update(cx_b, |workspace, cx| {
5912                workspace.open_path((worktree_id, "2.txt"), true, cx)
5913            })
5914            .await
5915            .unwrap();
5916        assert_eq!(
5917            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5918            None
5919        );
5920    }
5921
5922    #[gpui::test(iterations = 100)]
5923    async fn test_random_collaboration(
5924        cx: &mut TestAppContext,
5925        deterministic: Arc<Deterministic>,
5926        rng: StdRng,
5927    ) {
5928        cx.foreground().forbid_parking();
5929        let max_peers = env::var("MAX_PEERS")
5930            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5931            .unwrap_or(5);
5932        assert!(max_peers <= 5);
5933
5934        let max_operations = env::var("OPERATIONS")
5935            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5936            .unwrap_or(10);
5937
5938        let rng = Arc::new(Mutex::new(rng));
5939
5940        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5941        let host_language_registry = Arc::new(LanguageRegistry::test());
5942
5943        let fs = FakeFs::new(cx.background());
5944        fs.insert_tree("/_collab", json!({"init": ""})).await;
5945
5946        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5947        let db = server.app_state.db.clone();
5948        let host_user_id = db.create_user("host", false).await.unwrap();
5949        for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
5950            let guest_user_id = db.create_user(username, false).await.unwrap();
5951            server
5952                .app_state
5953                .db
5954                .send_contact_request(guest_user_id, host_user_id)
5955                .await
5956                .unwrap();
5957            server
5958                .app_state
5959                .db
5960                .respond_to_contact_request(host_user_id, guest_user_id, true)
5961                .await
5962                .unwrap();
5963        }
5964
5965        let mut clients = Vec::new();
5966        let mut user_ids = Vec::new();
5967        let mut op_start_signals = Vec::new();
5968
5969        let mut next_entity_id = 100000;
5970        let mut host_cx = TestAppContext::new(
5971            cx.foreground_platform(),
5972            cx.platform(),
5973            deterministic.build_foreground(next_entity_id),
5974            deterministic.build_background(),
5975            cx.font_cache(),
5976            cx.leak_detector(),
5977            next_entity_id,
5978        );
5979        let host = server.create_client(&mut host_cx, "host").await;
5980        let host_project = host_cx.update(|cx| {
5981            Project::local(
5982                host.client.clone(),
5983                host.user_store.clone(),
5984                host_language_registry.clone(),
5985                fs.clone(),
5986                cx,
5987            )
5988        });
5989        let host_project_id = host_project
5990            .update(&mut host_cx, |p, _| p.next_remote_id())
5991            .await;
5992
5993        let (collab_worktree, _) = host_project
5994            .update(&mut host_cx, |project, cx| {
5995                project.find_or_create_local_worktree("/_collab", true, cx)
5996            })
5997            .await
5998            .unwrap();
5999        collab_worktree
6000            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
6001            .await;
6002        host_project
6003            .update(&mut host_cx, |project, cx| project.share(cx))
6004            .await
6005            .unwrap();
6006
6007        // Set up fake language servers.
6008        let mut language = Language::new(
6009            LanguageConfig {
6010                name: "Rust".into(),
6011                path_suffixes: vec!["rs".to_string()],
6012                ..Default::default()
6013            },
6014            None,
6015        );
6016        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6017            name: "the-fake-language-server",
6018            capabilities: lsp::LanguageServer::full_capabilities(),
6019            initializer: Some(Box::new({
6020                let rng = rng.clone();
6021                let fs = fs.clone();
6022                let project = host_project.downgrade();
6023                move |fake_server: &mut FakeLanguageServer| {
6024                    fake_server.handle_request::<lsp::request::Completion, _, _>(
6025                        |_, _| async move {
6026                            Ok(Some(lsp::CompletionResponse::Array(vec![
6027                                lsp::CompletionItem {
6028                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6029                                        range: lsp::Range::new(
6030                                            lsp::Position::new(0, 0),
6031                                            lsp::Position::new(0, 0),
6032                                        ),
6033                                        new_text: "the-new-text".to_string(),
6034                                    })),
6035                                    ..Default::default()
6036                                },
6037                            ])))
6038                        },
6039                    );
6040
6041                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6042                        |_, _| async move {
6043                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6044                                lsp::CodeAction {
6045                                    title: "the-code-action".to_string(),
6046                                    ..Default::default()
6047                                },
6048                            )]))
6049                        },
6050                    );
6051
6052                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6053                        |params, _| async move {
6054                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6055                                params.position,
6056                                params.position,
6057                            ))))
6058                        },
6059                    );
6060
6061                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6062                        let fs = fs.clone();
6063                        let rng = rng.clone();
6064                        move |_, _| {
6065                            let fs = fs.clone();
6066                            let rng = rng.clone();
6067                            async move {
6068                                let files = fs.files().await;
6069                                let mut rng = rng.lock();
6070                                let count = rng.gen_range::<usize, _>(1..3);
6071                                let files = (0..count)
6072                                    .map(|_| files.choose(&mut *rng).unwrap())
6073                                    .collect::<Vec<_>>();
6074                                log::info!("LSP: Returning definitions in files {:?}", &files);
6075                                Ok(Some(lsp::GotoDefinitionResponse::Array(
6076                                    files
6077                                        .into_iter()
6078                                        .map(|file| lsp::Location {
6079                                            uri: lsp::Url::from_file_path(file).unwrap(),
6080                                            range: Default::default(),
6081                                        })
6082                                        .collect(),
6083                                )))
6084                            }
6085                        }
6086                    });
6087
6088                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6089                        let rng = rng.clone();
6090                        let project = project.clone();
6091                        move |params, mut cx| {
6092                            let highlights = if let Some(project) = project.upgrade(&cx) {
6093                                project.update(&mut cx, |project, cx| {
6094                                    let path = params
6095                                        .text_document_position_params
6096                                        .text_document
6097                                        .uri
6098                                        .to_file_path()
6099                                        .unwrap();
6100                                    let (worktree, relative_path) =
6101                                        project.find_local_worktree(&path, cx)?;
6102                                    let project_path =
6103                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
6104                                    let buffer =
6105                                        project.get_open_buffer(&project_path, cx)?.read(cx);
6106
6107                                    let mut highlights = Vec::new();
6108                                    let highlight_count = rng.lock().gen_range(1..=5);
6109                                    let mut prev_end = 0;
6110                                    for _ in 0..highlight_count {
6111                                        let range =
6112                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
6113
6114                                        highlights.push(lsp::DocumentHighlight {
6115                                            range: range_to_lsp(range.to_point_utf16(buffer)),
6116                                            kind: Some(lsp::DocumentHighlightKind::READ),
6117                                        });
6118                                        prev_end = range.end;
6119                                    }
6120                                    Some(highlights)
6121                                })
6122                            } else {
6123                                None
6124                            };
6125                            async move { Ok(highlights) }
6126                        }
6127                    });
6128                }
6129            })),
6130            ..Default::default()
6131        });
6132        host_language_registry.add(Arc::new(language));
6133
6134        let op_start_signal = futures::channel::mpsc::unbounded();
6135        user_ids.push(host.current_user_id(&host_cx));
6136        op_start_signals.push(op_start_signal.0);
6137        clients.push(host_cx.foreground().spawn(host.simulate_host(
6138            host_project,
6139            op_start_signal.1,
6140            rng.clone(),
6141            host_cx,
6142        )));
6143
6144        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6145            rng.lock().gen_range(0..max_operations)
6146        } else {
6147            max_operations
6148        };
6149        let mut available_guests = vec![
6150            "guest-1".to_string(),
6151            "guest-2".to_string(),
6152            "guest-3".to_string(),
6153            "guest-4".to_string(),
6154        ];
6155        let mut operations = 0;
6156        while operations < max_operations {
6157            if operations == disconnect_host_at {
6158                server.disconnect_client(user_ids[0]);
6159                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6160                drop(op_start_signals);
6161                let mut clients = futures::future::join_all(clients).await;
6162                cx.foreground().run_until_parked();
6163
6164                let (host, mut host_cx, host_err) = clients.remove(0);
6165                if let Some(host_err) = host_err {
6166                    log::error!("host error - {}", host_err);
6167                }
6168                host.project
6169                    .as_ref()
6170                    .unwrap()
6171                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6172                for (guest, mut guest_cx, guest_err) in clients {
6173                    if let Some(guest_err) = guest_err {
6174                        log::error!("{} error - {}", guest.username, guest_err);
6175                    }
6176                    // TODO
6177                    // let contacts = server
6178                    //     .store
6179                    //     .read()
6180                    //     .await
6181                    //     .contacts_for_user(guest.current_user_id(&guest_cx));
6182                    // assert!(!contacts
6183                    //     .iter()
6184                    //     .flat_map(|contact| &contact.projects)
6185                    //     .any(|project| project.id == host_project_id));
6186                    guest
6187                        .project
6188                        .as_ref()
6189                        .unwrap()
6190                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6191                    guest_cx.update(|_| drop(guest));
6192                }
6193                host_cx.update(|_| drop(host));
6194
6195                return;
6196            }
6197
6198            let distribution = rng.lock().gen_range(0..100);
6199            match distribution {
6200                0..=19 if !available_guests.is_empty() => {
6201                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
6202                    let guest_username = available_guests.remove(guest_ix);
6203                    log::info!("Adding new connection for {}", guest_username);
6204                    next_entity_id += 100000;
6205                    let mut guest_cx = TestAppContext::new(
6206                        cx.foreground_platform(),
6207                        cx.platform(),
6208                        deterministic.build_foreground(next_entity_id),
6209                        deterministic.build_background(),
6210                        cx.font_cache(),
6211                        cx.leak_detector(),
6212                        next_entity_id,
6213                    );
6214                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
6215                    let guest_project = Project::remote(
6216                        host_project_id,
6217                        guest.client.clone(),
6218                        guest.user_store.clone(),
6219                        guest_lang_registry.clone(),
6220                        FakeFs::new(cx.background()),
6221                        &mut guest_cx.to_async(),
6222                    )
6223                    .await
6224                    .unwrap();
6225                    let op_start_signal = futures::channel::mpsc::unbounded();
6226                    user_ids.push(guest.current_user_id(&guest_cx));
6227                    op_start_signals.push(op_start_signal.0);
6228                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6229                        guest_username.clone(),
6230                        guest_project,
6231                        op_start_signal.1,
6232                        rng.clone(),
6233                        guest_cx,
6234                    )));
6235
6236                    log::info!("Added connection for {}", guest_username);
6237                    operations += 1;
6238                }
6239                20..=29 if clients.len() > 1 => {
6240                    let guest_ix = rng.lock().gen_range(1..clients.len());
6241                    log::info!("Removing guest {}", user_ids[guest_ix]);
6242                    let removed_guest_id = user_ids.remove(guest_ix);
6243                    let guest = clients.remove(guest_ix);
6244                    op_start_signals.remove(guest_ix);
6245                    server.disconnect_client(removed_guest_id);
6246                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6247                    let (guest, mut guest_cx, guest_err) = guest.await;
6248                    if let Some(guest_err) = guest_err {
6249                        log::error!("{} error - {}", guest.username, guest_err);
6250                    }
6251                    guest
6252                        .project
6253                        .as_ref()
6254                        .unwrap()
6255                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6256                    // TODO
6257                    // for user_id in &user_ids {
6258                    //     for contact in server.store.read().await.contacts_for_user(*user_id) {
6259                    //         assert_ne!(
6260                    //             contact.user_id, removed_guest_id.0 as u64,
6261                    //             "removed guest is still a contact of another peer"
6262                    //         );
6263                    //         for project in contact.projects {
6264                    //             for project_guest_id in project.guests {
6265                    //                 assert_ne!(
6266                    //                     project_guest_id, removed_guest_id.0 as u64,
6267                    //                     "removed guest appears as still participating on a project"
6268                    //                 );
6269                    //             }
6270                    //         }
6271                    //     }
6272                    // }
6273
6274                    log::info!("{} removed", guest.username);
6275                    available_guests.push(guest.username.clone());
6276                    guest_cx.update(|_| drop(guest));
6277
6278                    operations += 1;
6279                }
6280                _ => {
6281                    while operations < max_operations && rng.lock().gen_bool(0.7) {
6282                        op_start_signals
6283                            .choose(&mut *rng.lock())
6284                            .unwrap()
6285                            .unbounded_send(())
6286                            .unwrap();
6287                        operations += 1;
6288                    }
6289
6290                    if rng.lock().gen_bool(0.8) {
6291                        cx.foreground().run_until_parked();
6292                    }
6293                }
6294            }
6295        }
6296
6297        drop(op_start_signals);
6298        let mut clients = futures::future::join_all(clients).await;
6299        cx.foreground().run_until_parked();
6300
6301        let (host_client, mut host_cx, host_err) = clients.remove(0);
6302        if let Some(host_err) = host_err {
6303            panic!("host error - {}", host_err);
6304        }
6305        let host_project = host_client.project.as_ref().unwrap();
6306        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6307            project
6308                .worktrees(cx)
6309                .map(|worktree| {
6310                    let snapshot = worktree.read(cx).snapshot();
6311                    (snapshot.id(), snapshot)
6312                })
6313                .collect::<BTreeMap<_, _>>()
6314        });
6315
6316        host_client
6317            .project
6318            .as_ref()
6319            .unwrap()
6320            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6321
6322        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6323            if let Some(guest_err) = guest_err {
6324                panic!("{} error - {}", guest_client.username, guest_err);
6325            }
6326            let worktree_snapshots =
6327                guest_client
6328                    .project
6329                    .as_ref()
6330                    .unwrap()
6331                    .read_with(&guest_cx, |project, cx| {
6332                        project
6333                            .worktrees(cx)
6334                            .map(|worktree| {
6335                                let worktree = worktree.read(cx);
6336                                (worktree.id(), worktree.snapshot())
6337                            })
6338                            .collect::<BTreeMap<_, _>>()
6339                    });
6340
6341            assert_eq!(
6342                worktree_snapshots.keys().collect::<Vec<_>>(),
6343                host_worktree_snapshots.keys().collect::<Vec<_>>(),
6344                "{} has different worktrees than the host",
6345                guest_client.username
6346            );
6347            for (id, host_snapshot) in &host_worktree_snapshots {
6348                let guest_snapshot = &worktree_snapshots[id];
6349                assert_eq!(
6350                    guest_snapshot.root_name(),
6351                    host_snapshot.root_name(),
6352                    "{} has different root name than the host for worktree {}",
6353                    guest_client.username,
6354                    id
6355                );
6356                assert_eq!(
6357                    guest_snapshot.entries(false).collect::<Vec<_>>(),
6358                    host_snapshot.entries(false).collect::<Vec<_>>(),
6359                    "{} has different snapshot than the host for worktree {}",
6360                    guest_client.username,
6361                    id
6362                );
6363                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6364            }
6365
6366            guest_client
6367                .project
6368                .as_ref()
6369                .unwrap()
6370                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6371
6372            for guest_buffer in &guest_client.buffers {
6373                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6374                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6375                    project.buffer_for_id(buffer_id, cx).expect(&format!(
6376                        "host does not have buffer for guest:{}, peer:{}, id:{}",
6377                        guest_client.username, guest_client.peer_id, buffer_id
6378                    ))
6379                });
6380                let path = host_buffer
6381                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6382
6383                assert_eq!(
6384                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6385                    0,
6386                    "{}, buffer {}, path {:?} has deferred operations",
6387                    guest_client.username,
6388                    buffer_id,
6389                    path,
6390                );
6391                assert_eq!(
6392                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6393                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6394                    "{}, buffer {}, path {:?}, differs from the host's buffer",
6395                    guest_client.username,
6396                    buffer_id,
6397                    path
6398                );
6399            }
6400
6401            guest_cx.update(|_| drop(guest_client));
6402        }
6403
6404        host_cx.update(|_| drop(host_client));
6405    }
6406
6407    struct TestServer {
6408        peer: Arc<Peer>,
6409        app_state: Arc<AppState>,
6410        server: Arc<Server>,
6411        foreground: Rc<executor::Foreground>,
6412        notifications: mpsc::UnboundedReceiver<()>,
6413        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6414        forbid_connections: Arc<AtomicBool>,
6415        _test_db: TestDb,
6416    }
6417
6418    impl TestServer {
6419        async fn start(
6420            foreground: Rc<executor::Foreground>,
6421            background: Arc<executor::Background>,
6422        ) -> Self {
6423            let test_db = TestDb::fake(background);
6424            let app_state = Self::build_app_state(&test_db).await;
6425            let peer = Peer::new();
6426            let notifications = mpsc::unbounded();
6427            let server = Server::new(app_state.clone(), Some(notifications.0));
6428            Self {
6429                peer,
6430                app_state,
6431                server,
6432                foreground,
6433                notifications: notifications.1,
6434                connection_killers: Default::default(),
6435                forbid_connections: Default::default(),
6436                _test_db: test_db,
6437            }
6438        }
6439
6440        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6441            cx.update(|cx| {
6442                let settings = Settings::test(cx);
6443                cx.set_global(settings);
6444            });
6445
6446            let http = FakeHttpClient::with_404_response();
6447            let user_id =
6448                if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6449                    user.id
6450                } else {
6451                    self.app_state.db.create_user(name, false).await.unwrap()
6452                };
6453            let client_name = name.to_string();
6454            let mut client = Client::new(http.clone());
6455            let server = self.server.clone();
6456            let connection_killers = self.connection_killers.clone();
6457            let forbid_connections = self.forbid_connections.clone();
6458            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6459
6460            Arc::get_mut(&mut client)
6461                .unwrap()
6462                .override_authenticate(move |cx| {
6463                    cx.spawn(|_| async move {
6464                        let access_token = "the-token".to_string();
6465                        Ok(Credentials {
6466                            user_id: user_id.0 as u64,
6467                            access_token,
6468                        })
6469                    })
6470                })
6471                .override_establish_connection(move |credentials, cx| {
6472                    assert_eq!(credentials.user_id, user_id.0 as u64);
6473                    assert_eq!(credentials.access_token, "the-token");
6474
6475                    let server = server.clone();
6476                    let connection_killers = connection_killers.clone();
6477                    let forbid_connections = forbid_connections.clone();
6478                    let client_name = client_name.clone();
6479                    let connection_id_tx = connection_id_tx.clone();
6480                    cx.spawn(move |cx| async move {
6481                        if forbid_connections.load(SeqCst) {
6482                            Err(EstablishConnectionError::other(anyhow!(
6483                                "server is forbidding connections"
6484                            )))
6485                        } else {
6486                            let (client_conn, server_conn, killed) =
6487                                Connection::in_memory(cx.background());
6488                            connection_killers.lock().insert(user_id, killed);
6489                            cx.background()
6490                                .spawn(server.handle_connection(
6491                                    server_conn,
6492                                    client_name,
6493                                    user_id,
6494                                    Some(connection_id_tx),
6495                                    cx.background(),
6496                                ))
6497                                .detach();
6498                            Ok(client_conn)
6499                        }
6500                    })
6501                });
6502
6503            client
6504                .authenticate_and_connect(false, &cx.to_async())
6505                .await
6506                .unwrap();
6507
6508            Channel::init(&client);
6509            Project::init(&client);
6510            cx.update(|cx| {
6511                workspace::init(&client, cx);
6512            });
6513
6514            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6515            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6516
6517            let client = TestClient {
6518                client,
6519                peer_id,
6520                username: name.to_string(),
6521                user_store,
6522                language_registry: Arc::new(LanguageRegistry::test()),
6523                project: Default::default(),
6524                buffers: Default::default(),
6525            };
6526            client.wait_for_current_user(cx).await;
6527            client
6528        }
6529
6530        fn disconnect_client(&self, user_id: UserId) {
6531            self.connection_killers
6532                .lock()
6533                .remove(&user_id)
6534                .unwrap()
6535                .store(true, SeqCst);
6536        }
6537
6538        fn forbid_connections(&self) {
6539            self.forbid_connections.store(true, SeqCst);
6540        }
6541
6542        fn allow_connections(&self) {
6543            self.forbid_connections.store(false, SeqCst);
6544        }
6545
6546        async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6547            while let Some((client_a, cx_a)) = clients.pop() {
6548                for (client_b, cx_b) in &mut clients {
6549                    client_a
6550                        .user_store
6551                        .update(cx_a, |store, cx| {
6552                            store.request_contact(client_b.user_id().unwrap(), cx)
6553                        })
6554                        .await
6555                        .unwrap();
6556                    cx_a.foreground().run_until_parked();
6557                    client_b
6558                        .user_store
6559                        .update(*cx_b, |store, cx| {
6560                            store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6561                        })
6562                        .await
6563                        .unwrap();
6564                }
6565            }
6566        }
6567
6568        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6569            Arc::new(AppState {
6570                db: test_db.db().clone(),
6571                api_token: Default::default(),
6572            })
6573        }
6574
6575        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6576            self.server.store.read().await
6577        }
6578
6579        async fn condition<F>(&mut self, mut predicate: F)
6580        where
6581            F: FnMut(&Store) -> bool,
6582        {
6583            assert!(
6584                self.foreground.parking_forbidden(),
6585                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6586            );
6587            while !(predicate)(&*self.server.store.read().await) {
6588                self.foreground.start_waiting();
6589                self.notifications.next().await;
6590                self.foreground.finish_waiting();
6591            }
6592        }
6593    }
6594
6595    impl Deref for TestServer {
6596        type Target = Server;
6597
6598        fn deref(&self) -> &Self::Target {
6599            &self.server
6600        }
6601    }
6602
6603    impl Drop for TestServer {
6604        fn drop(&mut self) {
6605            self.peer.reset();
6606        }
6607    }
6608
6609    struct TestClient {
6610        client: Arc<Client>,
6611        username: String,
6612        pub peer_id: PeerId,
6613        pub user_store: ModelHandle<UserStore>,
6614        language_registry: Arc<LanguageRegistry>,
6615        project: Option<ModelHandle<Project>>,
6616        buffers: HashSet<ModelHandle<language::Buffer>>,
6617    }
6618
6619    impl Deref for TestClient {
6620        type Target = Arc<Client>;
6621
6622        fn deref(&self) -> &Self::Target {
6623            &self.client
6624        }
6625    }
6626
6627    struct ContactsSummary {
6628        pub current: Vec<String>,
6629        pub outgoing_requests: Vec<String>,
6630        pub incoming_requests: Vec<String>,
6631    }
6632
6633    impl TestClient {
6634        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6635            UserId::from_proto(
6636                self.user_store
6637                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6638            )
6639        }
6640
6641        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6642            let mut authed_user = self
6643                .user_store
6644                .read_with(cx, |user_store, _| user_store.watch_current_user());
6645            while authed_user.next().await.unwrap().is_none() {}
6646        }
6647
6648        async fn clear_contacts(&self, cx: &mut TestAppContext) {
6649            self.user_store
6650                .update(cx, |store, _| store.clear_contacts())
6651                .await;
6652        }
6653
6654        fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6655            self.user_store.read_with(cx, |store, _| ContactsSummary {
6656                current: store
6657                    .contacts()
6658                    .iter()
6659                    .map(|contact| contact.user.github_login.clone())
6660                    .collect(),
6661                outgoing_requests: store
6662                    .outgoing_contact_requests()
6663                    .iter()
6664                    .map(|user| user.github_login.clone())
6665                    .collect(),
6666                incoming_requests: store
6667                    .incoming_contact_requests()
6668                    .iter()
6669                    .map(|user| user.github_login.clone())
6670                    .collect(),
6671            })
6672        }
6673
6674        async fn build_local_project(
6675            &mut self,
6676            fs: Arc<FakeFs>,
6677            root_path: impl AsRef<Path>,
6678            cx: &mut TestAppContext,
6679        ) -> (ModelHandle<Project>, WorktreeId) {
6680            let project = cx.update(|cx| {
6681                Project::local(
6682                    self.client.clone(),
6683                    self.user_store.clone(),
6684                    self.language_registry.clone(),
6685                    fs,
6686                    cx,
6687                )
6688            });
6689            self.project = Some(project.clone());
6690            let (worktree, _) = project
6691                .update(cx, |p, cx| {
6692                    p.find_or_create_local_worktree(root_path, true, cx)
6693                })
6694                .await
6695                .unwrap();
6696            worktree
6697                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6698                .await;
6699            project
6700                .update(cx, |project, _| project.next_remote_id())
6701                .await;
6702            (project, worktree.read_with(cx, |tree, _| tree.id()))
6703        }
6704
6705        async fn build_remote_project(
6706            &mut self,
6707            project_id: u64,
6708            cx: &mut TestAppContext,
6709        ) -> ModelHandle<Project> {
6710            let project = Project::remote(
6711                project_id,
6712                self.client.clone(),
6713                self.user_store.clone(),
6714                self.language_registry.clone(),
6715                FakeFs::new(cx.background()),
6716                &mut cx.to_async(),
6717            )
6718            .await
6719            .unwrap();
6720            self.project = Some(project.clone());
6721            project
6722        }
6723
6724        fn build_workspace(
6725            &self,
6726            project: &ModelHandle<Project>,
6727            cx: &mut TestAppContext,
6728        ) -> ViewHandle<Workspace> {
6729            let (window_id, _) = cx.add_window(|_| EmptyView);
6730            cx.add_view(window_id, |cx| {
6731                let fs = project.read(cx).fs().clone();
6732                Workspace::new(
6733                    &WorkspaceParams {
6734                        fs,
6735                        project: project.clone(),
6736                        user_store: self.user_store.clone(),
6737                        languages: self.language_registry.clone(),
6738                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6739                        channel_list: cx.add_model(|cx| {
6740                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6741                        }),
6742                        client: self.client.clone(),
6743                    },
6744                    cx,
6745                )
6746            })
6747        }
6748
6749        async fn simulate_host(
6750            mut self,
6751            project: ModelHandle<Project>,
6752            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6753            rng: Arc<Mutex<StdRng>>,
6754            mut cx: TestAppContext,
6755        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6756            async fn simulate_host_internal(
6757                client: &mut TestClient,
6758                project: ModelHandle<Project>,
6759                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6760                rng: Arc<Mutex<StdRng>>,
6761                cx: &mut TestAppContext,
6762            ) -> anyhow::Result<()> {
6763                let fs = project.read_with(cx, |project, _| project.fs().clone());
6764
6765                while op_start_signal.next().await.is_some() {
6766                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6767                    let files = fs.as_fake().files().await;
6768                    match distribution {
6769                        0..=20 if !files.is_empty() => {
6770                            let path = files.choose(&mut *rng.lock()).unwrap();
6771                            let mut path = path.as_path();
6772                            while let Some(parent_path) = path.parent() {
6773                                path = parent_path;
6774                                if rng.lock().gen() {
6775                                    break;
6776                                }
6777                            }
6778
6779                            log::info!("Host: find/create local worktree {:?}", path);
6780                            let find_or_create_worktree = project.update(cx, |project, cx| {
6781                                project.find_or_create_local_worktree(path, true, cx)
6782                            });
6783                            if rng.lock().gen() {
6784                                cx.background().spawn(find_or_create_worktree).detach();
6785                            } else {
6786                                find_or_create_worktree.await?;
6787                            }
6788                        }
6789                        10..=80 if !files.is_empty() => {
6790                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6791                                let file = files.choose(&mut *rng.lock()).unwrap();
6792                                let (worktree, path) = project
6793                                    .update(cx, |project, cx| {
6794                                        project.find_or_create_local_worktree(
6795                                            file.clone(),
6796                                            true,
6797                                            cx,
6798                                        )
6799                                    })
6800                                    .await?;
6801                                let project_path =
6802                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6803                                log::info!(
6804                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6805                                    file,
6806                                    project_path.0,
6807                                    project_path.1
6808                                );
6809                                let buffer = project
6810                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6811                                    .await
6812                                    .unwrap();
6813                                client.buffers.insert(buffer.clone());
6814                                buffer
6815                            } else {
6816                                client
6817                                    .buffers
6818                                    .iter()
6819                                    .choose(&mut *rng.lock())
6820                                    .unwrap()
6821                                    .clone()
6822                            };
6823
6824                            if rng.lock().gen_bool(0.1) {
6825                                cx.update(|cx| {
6826                                    log::info!(
6827                                        "Host: dropping buffer {:?}",
6828                                        buffer.read(cx).file().unwrap().full_path(cx)
6829                                    );
6830                                    client.buffers.remove(&buffer);
6831                                    drop(buffer);
6832                                });
6833                            } else {
6834                                buffer.update(cx, |buffer, cx| {
6835                                    log::info!(
6836                                        "Host: updating buffer {:?} ({})",
6837                                        buffer.file().unwrap().full_path(cx),
6838                                        buffer.remote_id()
6839                                    );
6840
6841                                    if rng.lock().gen_bool(0.7) {
6842                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6843                                    } else {
6844                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6845                                    }
6846                                });
6847                            }
6848                        }
6849                        _ => loop {
6850                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6851                            let mut path = PathBuf::new();
6852                            path.push("/");
6853                            for _ in 0..path_component_count {
6854                                let letter = rng.lock().gen_range(b'a'..=b'z');
6855                                path.push(std::str::from_utf8(&[letter]).unwrap());
6856                            }
6857                            path.set_extension("rs");
6858                            let parent_path = path.parent().unwrap();
6859
6860                            log::info!("Host: creating file {:?}", path,);
6861
6862                            if fs.create_dir(&parent_path).await.is_ok()
6863                                && fs.create_file(&path, Default::default()).await.is_ok()
6864                            {
6865                                break;
6866                            } else {
6867                                log::info!("Host: cannot create file");
6868                            }
6869                        },
6870                    }
6871
6872                    cx.background().simulate_random_delay().await;
6873                }
6874
6875                Ok(())
6876            }
6877
6878            let result =
6879                simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
6880                    .await;
6881            log::info!("Host done");
6882            self.project = Some(project);
6883            (self, cx, result.err())
6884        }
6885
6886        pub async fn simulate_guest(
6887            mut self,
6888            guest_username: String,
6889            project: ModelHandle<Project>,
6890            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6891            rng: Arc<Mutex<StdRng>>,
6892            mut cx: TestAppContext,
6893        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6894            async fn simulate_guest_internal(
6895                client: &mut TestClient,
6896                guest_username: &str,
6897                project: ModelHandle<Project>,
6898                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6899                rng: Arc<Mutex<StdRng>>,
6900                cx: &mut TestAppContext,
6901            ) -> anyhow::Result<()> {
6902                while op_start_signal.next().await.is_some() {
6903                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6904                        let worktree = if let Some(worktree) =
6905                            project.read_with(cx, |project, cx| {
6906                                project
6907                                    .worktrees(&cx)
6908                                    .filter(|worktree| {
6909                                        let worktree = worktree.read(cx);
6910                                        worktree.is_visible()
6911                                            && worktree.entries(false).any(|e| e.is_file())
6912                                    })
6913                                    .choose(&mut *rng.lock())
6914                            }) {
6915                            worktree
6916                        } else {
6917                            cx.background().simulate_random_delay().await;
6918                            continue;
6919                        };
6920
6921                        let (worktree_root_name, project_path) =
6922                            worktree.read_with(cx, |worktree, _| {
6923                                let entry = worktree
6924                                    .entries(false)
6925                                    .filter(|e| e.is_file())
6926                                    .choose(&mut *rng.lock())
6927                                    .unwrap();
6928                                (
6929                                    worktree.root_name().to_string(),
6930                                    (worktree.id(), entry.path.clone()),
6931                                )
6932                            });
6933                        log::info!(
6934                            "{}: opening path {:?} in worktree {} ({})",
6935                            guest_username,
6936                            project_path.1,
6937                            project_path.0,
6938                            worktree_root_name,
6939                        );
6940                        let buffer = project
6941                            .update(cx, |project, cx| {
6942                                project.open_buffer(project_path.clone(), cx)
6943                            })
6944                            .await?;
6945                        log::info!(
6946                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6947                            guest_username,
6948                            project_path.1,
6949                            project_path.0,
6950                            worktree_root_name,
6951                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6952                        );
6953                        client.buffers.insert(buffer.clone());
6954                        buffer
6955                    } else {
6956                        client
6957                            .buffers
6958                            .iter()
6959                            .choose(&mut *rng.lock())
6960                            .unwrap()
6961                            .clone()
6962                    };
6963
6964                    let choice = rng.lock().gen_range(0..100);
6965                    match choice {
6966                        0..=9 => {
6967                            cx.update(|cx| {
6968                                log::info!(
6969                                    "{}: dropping buffer {:?}",
6970                                    guest_username,
6971                                    buffer.read(cx).file().unwrap().full_path(cx)
6972                                );
6973                                client.buffers.remove(&buffer);
6974                                drop(buffer);
6975                            });
6976                        }
6977                        10..=19 => {
6978                            let completions = project.update(cx, |project, cx| {
6979                                log::info!(
6980                                    "{}: requesting completions for buffer {} ({:?})",
6981                                    guest_username,
6982                                    buffer.read(cx).remote_id(),
6983                                    buffer.read(cx).file().unwrap().full_path(cx)
6984                                );
6985                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6986                                project.completions(&buffer, offset, cx)
6987                            });
6988                            let completions = cx.background().spawn(async move {
6989                                completions
6990                                    .await
6991                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6992                            });
6993                            if rng.lock().gen_bool(0.3) {
6994                                log::info!("{}: detaching completions request", guest_username);
6995                                cx.update(|cx| completions.detach_and_log_err(cx));
6996                            } else {
6997                                completions.await?;
6998                            }
6999                        }
7000                        20..=29 => {
7001                            let code_actions = project.update(cx, |project, cx| {
7002                                log::info!(
7003                                    "{}: requesting code actions for buffer {} ({:?})",
7004                                    guest_username,
7005                                    buffer.read(cx).remote_id(),
7006                                    buffer.read(cx).file().unwrap().full_path(cx)
7007                                );
7008                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7009                                project.code_actions(&buffer, range, cx)
7010                            });
7011                            let code_actions = cx.background().spawn(async move {
7012                                code_actions.await.map_err(|err| {
7013                                    anyhow!("code actions request failed: {:?}", err)
7014                                })
7015                            });
7016                            if rng.lock().gen_bool(0.3) {
7017                                log::info!("{}: detaching code actions request", guest_username);
7018                                cx.update(|cx| code_actions.detach_and_log_err(cx));
7019                            } else {
7020                                code_actions.await?;
7021                            }
7022                        }
7023                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7024                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7025                                log::info!(
7026                                    "{}: saving buffer {} ({:?})",
7027                                    guest_username,
7028                                    buffer.remote_id(),
7029                                    buffer.file().unwrap().full_path(cx)
7030                                );
7031                                (buffer.version(), buffer.save(cx))
7032                            });
7033                            let save = cx.background().spawn(async move {
7034                                let (saved_version, _) = save
7035                                    .await
7036                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7037                                assert!(saved_version.observed_all(&requested_version));
7038                                Ok::<_, anyhow::Error>(())
7039                            });
7040                            if rng.lock().gen_bool(0.3) {
7041                                log::info!("{}: detaching save request", guest_username);
7042                                cx.update(|cx| save.detach_and_log_err(cx));
7043                            } else {
7044                                save.await?;
7045                            }
7046                        }
7047                        40..=44 => {
7048                            let prepare_rename = project.update(cx, |project, cx| {
7049                                log::info!(
7050                                    "{}: preparing rename for buffer {} ({:?})",
7051                                    guest_username,
7052                                    buffer.read(cx).remote_id(),
7053                                    buffer.read(cx).file().unwrap().full_path(cx)
7054                                );
7055                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7056                                project.prepare_rename(buffer, offset, cx)
7057                            });
7058                            let prepare_rename = cx.background().spawn(async move {
7059                                prepare_rename.await.map_err(|err| {
7060                                    anyhow!("prepare rename request failed: {:?}", err)
7061                                })
7062                            });
7063                            if rng.lock().gen_bool(0.3) {
7064                                log::info!("{}: detaching prepare rename request", guest_username);
7065                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7066                            } else {
7067                                prepare_rename.await?;
7068                            }
7069                        }
7070                        45..=49 => {
7071                            let definitions = project.update(cx, |project, cx| {
7072                                log::info!(
7073                                    "{}: requesting definitions for buffer {} ({:?})",
7074                                    guest_username,
7075                                    buffer.read(cx).remote_id(),
7076                                    buffer.read(cx).file().unwrap().full_path(cx)
7077                                );
7078                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7079                                project.definition(&buffer, offset, cx)
7080                            });
7081                            let definitions = cx.background().spawn(async move {
7082                                definitions
7083                                    .await
7084                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7085                            });
7086                            if rng.lock().gen_bool(0.3) {
7087                                log::info!("{}: detaching definitions request", guest_username);
7088                                cx.update(|cx| definitions.detach_and_log_err(cx));
7089                            } else {
7090                                client
7091                                    .buffers
7092                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7093                            }
7094                        }
7095                        50..=54 => {
7096                            let highlights = project.update(cx, |project, cx| {
7097                                log::info!(
7098                                    "{}: requesting highlights for buffer {} ({:?})",
7099                                    guest_username,
7100                                    buffer.read(cx).remote_id(),
7101                                    buffer.read(cx).file().unwrap().full_path(cx)
7102                                );
7103                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7104                                project.document_highlights(&buffer, offset, cx)
7105                            });
7106                            let highlights = cx.background().spawn(async move {
7107                                highlights
7108                                    .await
7109                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7110                            });
7111                            if rng.lock().gen_bool(0.3) {
7112                                log::info!("{}: detaching highlights request", guest_username);
7113                                cx.update(|cx| highlights.detach_and_log_err(cx));
7114                            } else {
7115                                highlights.await?;
7116                            }
7117                        }
7118                        55..=59 => {
7119                            let search = project.update(cx, |project, cx| {
7120                                let query = rng.lock().gen_range('a'..='z');
7121                                log::info!("{}: project-wide search {:?}", guest_username, query);
7122                                project.search(SearchQuery::text(query, false, false), cx)
7123                            });
7124                            let search = cx.background().spawn(async move {
7125                                search
7126                                    .await
7127                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
7128                            });
7129                            if rng.lock().gen_bool(0.3) {
7130                                log::info!("{}: detaching search request", guest_username);
7131                                cx.update(|cx| search.detach_and_log_err(cx));
7132                            } else {
7133                                client.buffers.extend(search.await?.into_keys());
7134                            }
7135                        }
7136                        60..=69 => {
7137                            let worktree = project
7138                                .read_with(cx, |project, cx| {
7139                                    project
7140                                        .worktrees(&cx)
7141                                        .filter(|worktree| {
7142                                            let worktree = worktree.read(cx);
7143                                            worktree.is_visible()
7144                                                && worktree.entries(false).any(|e| e.is_file())
7145                                                && worktree
7146                                                    .root_entry()
7147                                                    .map_or(false, |e| e.is_dir())
7148                                        })
7149                                        .choose(&mut *rng.lock())
7150                                })
7151                                .unwrap();
7152                            let (worktree_id, worktree_root_name) = worktree
7153                                .read_with(cx, |worktree, _| {
7154                                    (worktree.id(), worktree.root_name().to_string())
7155                                });
7156
7157                            let mut new_name = String::new();
7158                            for _ in 0..10 {
7159                                let letter = rng.lock().gen_range('a'..='z');
7160                                new_name.push(letter);
7161                            }
7162                            let mut new_path = PathBuf::new();
7163                            new_path.push(new_name);
7164                            new_path.set_extension("rs");
7165                            log::info!(
7166                                "{}: creating {:?} in worktree {} ({})",
7167                                guest_username,
7168                                new_path,
7169                                worktree_id,
7170                                worktree_root_name,
7171                            );
7172                            project
7173                                .update(cx, |project, cx| {
7174                                    project.create_entry((worktree_id, new_path), false, cx)
7175                                })
7176                                .unwrap()
7177                                .await?;
7178                        }
7179                        _ => {
7180                            buffer.update(cx, |buffer, cx| {
7181                                log::info!(
7182                                    "{}: updating buffer {} ({:?})",
7183                                    guest_username,
7184                                    buffer.remote_id(),
7185                                    buffer.file().unwrap().full_path(cx)
7186                                );
7187                                if rng.lock().gen_bool(0.7) {
7188                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7189                                } else {
7190                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7191                                }
7192                            });
7193                        }
7194                    }
7195                    cx.background().simulate_random_delay().await;
7196                }
7197                Ok(())
7198            }
7199
7200            let result = simulate_guest_internal(
7201                &mut self,
7202                &guest_username,
7203                project.clone(),
7204                op_start_signal,
7205                rng,
7206                &mut cx,
7207            )
7208            .await;
7209            log::info!("{}: done", guest_username);
7210
7211            self.project = Some(project);
7212            (self, cx, result.err())
7213        }
7214    }
7215
7216    impl Drop for TestClient {
7217        fn drop(&mut self) {
7218            self.client.tear_down();
7219        }
7220    }
7221
7222    impl Executor for Arc<gpui::executor::Background> {
7223        type Sleep = gpui::executor::Timer;
7224
7225        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7226            self.spawn(future).detach();
7227        }
7228
7229        fn sleep(&self, duration: Duration) -> Self::Sleep {
7230            self.as_ref().timer(duration)
7231        }
7232    }
7233
7234    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7235        channel
7236            .messages()
7237            .cursor::<()>()
7238            .map(|m| {
7239                (
7240                    m.sender.github_login.clone(),
7241                    m.body.clone(),
7242                    m.is_pending(),
7243                )
7244            })
7245            .collect()
7246    }
7247
7248    struct EmptyView;
7249
7250    impl gpui::Entity for EmptyView {
7251        type Event = ();
7252    }
7253
7254    impl gpui::View for EmptyView {
7255        fn ui_name() -> &'static str {
7256            "empty view"
7257        }
7258
7259        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7260            gpui::Element::boxed(gpui::elements::Empty)
7261        }
7262    }
7263}