rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{self, ChannelId, MessageId, User, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::IntoResponse,
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::HashMap;
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::{
  40        atomic::{AtomicBool, Ordering::SeqCst},
  41        Arc,
  42    },
  43    time::Duration,
  44};
  45use store::{Store, Worktree};
  46use time::OffsetDateTime;
  47use tokio::{
  48    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  49    time::Sleep,
  50};
  51use tower::ServiceBuilder;
  52use tracing::{info_span, instrument, Instrument};
  53
  54type MessageHandler =
  55    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  56
  57struct Response<R> {
  58    server: Arc<Server>,
  59    receipt: Receipt<R>,
  60    responded: Arc<AtomicBool>,
  61}
  62
  63impl<R: RequestMessage> Response<R> {
  64    fn send(self, payload: R::Response) -> Result<()> {
  65        self.responded.store(true, SeqCst);
  66        self.server.peer.respond(self.receipt, payload)?;
  67        Ok(())
  68    }
  69}
  70
  71pub struct Server {
  72    peer: Arc<Peer>,
  73    store: RwLock<Store>,
  74    app_state: Arc<AppState>,
  75    handlers: HashMap<TypeId, MessageHandler>,
  76    notifications: Option<mpsc::UnboundedSender<()>>,
  77}
  78
  79pub trait Executor: Send + Clone {
  80    type Sleep: Send + Future;
  81    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  82    fn sleep(&self, duration: Duration) -> Self::Sleep;
  83}
  84
  85#[derive(Clone)]
  86pub struct RealExecutor;
  87
  88const MESSAGE_COUNT_PER_PAGE: usize = 100;
  89const MAX_MESSAGE_LEN: usize = 1024;
  90
  91struct StoreReadGuard<'a> {
  92    guard: RwLockReadGuard<'a, Store>,
  93    _not_send: PhantomData<Rc<()>>,
  94}
  95
  96struct StoreWriteGuard<'a> {
  97    guard: RwLockWriteGuard<'a, Store>,
  98    _not_send: PhantomData<Rc<()>>,
  99}
 100
 101impl Server {
 102    pub fn new(
 103        app_state: Arc<AppState>,
 104        notifications: Option<mpsc::UnboundedSender<()>>,
 105    ) -> Arc<Self> {
 106        let mut server = Self {
 107            peer: Peer::new(),
 108            app_state,
 109            store: Default::default(),
 110            handlers: Default::default(),
 111            notifications,
 112        };
 113
 114        server
 115            .add_request_handler(Server::ping)
 116            .add_request_handler(Server::register_project)
 117            .add_message_handler(Server::unregister_project)
 118            .add_request_handler(Server::share_project)
 119            .add_message_handler(Server::unshare_project)
 120            .add_request_handler(Server::join_project)
 121            .add_message_handler(Server::leave_project)
 122            .add_request_handler(Server::register_worktree)
 123            .add_message_handler(Server::unregister_worktree)
 124            .add_request_handler(Server::update_worktree)
 125            .add_message_handler(Server::start_language_server)
 126            .add_message_handler(Server::update_language_server)
 127            .add_message_handler(Server::update_diagnostic_summary)
 128            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 129            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 130            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 131            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 132            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 133            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 134            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 135            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 136            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 137            .add_request_handler(
 138                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 139            )
 140            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 141            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 142            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 143            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 144            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 145            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 146            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 147            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 148            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 149            .add_request_handler(Server::update_buffer)
 150            .add_message_handler(Server::update_buffer_file)
 151            .add_message_handler(Server::buffer_reloaded)
 152            .add_message_handler(Server::buffer_saved)
 153            .add_request_handler(Server::save_buffer)
 154            .add_request_handler(Server::get_channels)
 155            .add_request_handler(Server::get_users)
 156            .add_request_handler(Server::fuzzy_search_users)
 157            .add_request_handler(Server::request_contact)
 158            .add_request_handler(Server::remove_contact)
 159            .add_request_handler(Server::respond_to_contact_request)
 160            .add_request_handler(Server::join_channel)
 161            .add_message_handler(Server::leave_channel)
 162            .add_request_handler(Server::send_channel_message)
 163            .add_request_handler(Server::follow)
 164            .add_message_handler(Server::unfollow)
 165            .add_message_handler(Server::update_followers)
 166            .add_request_handler(Server::get_channel_messages);
 167
 168        Arc::new(server)
 169    }
 170
 171    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 172    where
 173        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 174        Fut: 'static + Send + Future<Output = Result<()>>,
 175        M: EnvelopedMessage,
 176    {
 177        let prev_handler = self.handlers.insert(
 178            TypeId::of::<M>(),
 179            Box::new(move |server, envelope| {
 180                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 181                let span = info_span!(
 182                    "handle message",
 183                    payload_type = envelope.payload_type_name(),
 184                    payload = format!("{:?}", envelope.payload).as_str(),
 185                );
 186                let future = (handler)(server, *envelope);
 187                async move {
 188                    if let Err(error) = future.await {
 189                        tracing::error!(%error, "error handling message");
 190                    }
 191                }
 192                .instrument(span)
 193                .boxed()
 194            }),
 195        );
 196        if prev_handler.is_some() {
 197            panic!("registered a handler for the same message twice");
 198        }
 199        self
 200    }
 201
 202    /// Handle a request while holding a lock to the store. This is useful when we're registering
 203    /// a connection but we want to respond on the connection before anybody else can send on it.
 204    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 205    where
 206        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
 207        Fut: Send + Future<Output = Result<()>>,
 208        M: RequestMessage,
 209    {
 210        let handler = Arc::new(handler);
 211        self.add_message_handler(move |server, envelope| {
 212            let receipt = envelope.receipt();
 213            let handler = handler.clone();
 214            async move {
 215                let responded = Arc::new(AtomicBool::default());
 216                let response = Response {
 217                    server: server.clone(),
 218                    responded: responded.clone(),
 219                    receipt: envelope.receipt(),
 220                };
 221                match (handler)(server.clone(), envelope, response).await {
 222                    Ok(()) => {
 223                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 224                            Ok(())
 225                        } else {
 226                            Err(anyhow!("handler did not send a response"))?
 227                        }
 228                    }
 229                    Err(error) => {
 230                        server.peer.respond_with_error(
 231                            receipt,
 232                            proto::Error {
 233                                message: error.to_string(),
 234                            },
 235                        )?;
 236                        Err(error)
 237                    }
 238                }
 239            }
 240        })
 241    }
 242
 243    pub fn handle_connection<E: Executor>(
 244        self: &Arc<Self>,
 245        connection: Connection,
 246        address: String,
 247        user: User,
 248        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 249        executor: E,
 250    ) -> impl Future<Output = Result<()>> {
 251        let mut this = self.clone();
 252        let user_id = user.id;
 253        let login = user.github_login;
 254        let span = info_span!("handle connection", %user_id, %login, %address);
 255        async move {
 256            let (connection_id, handle_io, mut incoming_rx) = this
 257                .peer
 258                .add_connection(connection, {
 259                    let executor = executor.clone();
 260                    move |duration| {
 261                        let timer = executor.sleep(duration);
 262                        async move {
 263                            timer.await;
 264                        }
 265                    }
 266                })
 267                .await;
 268
 269            tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
 270
 271            if let Some(send_connection_id) = send_connection_id.as_mut() {
 272                let _ = send_connection_id.send(connection_id).await;
 273            }
 274
 275            let contacts = this.app_state.db.get_contacts(user_id).await?;
 276
 277            {
 278                let mut store = this.store_mut().await;
 279                store.add_connection(connection_id, user_id);
 280                this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 281            }
 282            this.update_user_contacts(user_id).await?;
 283
 284            let handle_io = handle_io.fuse();
 285            futures::pin_mut!(handle_io);
 286            loop {
 287                let next_message = incoming_rx.next().fuse();
 288                futures::pin_mut!(next_message);
 289                futures::select_biased! {
 290                    result = handle_io => {
 291                        if let Err(error) = result {
 292                            tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
 293                        }
 294                        break;
 295                    }
 296                    message = next_message => {
 297                        if let Some(message) = message {
 298                            let type_name = message.payload_type_name();
 299                            let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
 300                            async {
 301                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 302                                    let notifications = this.notifications.clone();
 303                                    let is_background = message.is_background();
 304                                    let handle_message = (handler)(this.clone(), message);
 305                                    let handle_message = async move {
 306                                        handle_message.await;
 307                                        if let Some(mut notifications) = notifications {
 308                                            let _ = notifications.send(()).await;
 309                                        }
 310                                    };
 311                                    if is_background {
 312                                        executor.spawn_detached(handle_message);
 313                                    } else {
 314                                        handle_message.await;
 315                                    }
 316                                } else {
 317                                    tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
 318                                }
 319                            }.instrument(span).await;
 320                        } else {
 321                            tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
 322                            break;
 323                        }
 324                    }
 325                }
 326            }
 327
 328            tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
 329            if let Err(error) = this.sign_out(connection_id).await {
 330                tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
 331            }
 332
 333            Ok(())
 334        }.instrument(span)
 335    }
 336
 337    #[instrument(skip(self), err)]
 338    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 339        self.peer.disconnect(connection_id);
 340        let removed_connection = self.store_mut().await.remove_connection(connection_id)?;
 341
 342        for (project_id, project) in removed_connection.hosted_projects {
 343            if let Some(share) = project.share {
 344                broadcast(connection_id, share.guests.keys().copied(), |conn_id| {
 345                    self.peer
 346                        .send(conn_id, proto::UnshareProject { project_id })
 347                });
 348            }
 349        }
 350
 351        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 352            broadcast(connection_id, peer_ids, |conn_id| {
 353                self.peer.send(
 354                    conn_id,
 355                    proto::RemoveProjectCollaborator {
 356                        project_id,
 357                        peer_id: connection_id.0,
 358                    },
 359                )
 360            });
 361        }
 362
 363        self.update_user_contacts(removed_connection.user_id)
 364            .await?;
 365
 366        Ok(())
 367    }
 368
 369    async fn ping(
 370        self: Arc<Server>,
 371        _: TypedEnvelope<proto::Ping>,
 372        response: Response<proto::Ping>,
 373    ) -> Result<()> {
 374        response.send(proto::Ack {})?;
 375        Ok(())
 376    }
 377
 378    async fn register_project(
 379        self: Arc<Server>,
 380        request: TypedEnvelope<proto::RegisterProject>,
 381        response: Response<proto::RegisterProject>,
 382    ) -> Result<()> {
 383        let user_id;
 384        let project_id;
 385        {
 386            let mut state = self.store_mut().await;
 387            user_id = state.user_id_for_connection(request.sender_id)?;
 388            project_id = state.register_project(request.sender_id, user_id);
 389        };
 390        self.update_user_contacts(user_id).await?;
 391        response.send(proto::RegisterProjectResponse { project_id })?;
 392        Ok(())
 393    }
 394
 395    async fn unregister_project(
 396        self: Arc<Server>,
 397        request: TypedEnvelope<proto::UnregisterProject>,
 398    ) -> Result<()> {
 399        let user_id = {
 400            let mut state = self.store_mut().await;
 401            state.unregister_project(request.payload.project_id, request.sender_id)?;
 402            state.user_id_for_connection(request.sender_id)?
 403        };
 404
 405        self.update_user_contacts(user_id).await?;
 406        Ok(())
 407    }
 408
 409    async fn share_project(
 410        self: Arc<Server>,
 411        request: TypedEnvelope<proto::ShareProject>,
 412        response: Response<proto::ShareProject>,
 413    ) -> Result<()> {
 414        let user_id = {
 415            let mut state = self.store_mut().await;
 416            state.share_project(request.payload.project_id, request.sender_id)?;
 417            state.user_id_for_connection(request.sender_id)?
 418        };
 419        self.update_user_contacts(user_id).await?;
 420        response.send(proto::Ack {})?;
 421        Ok(())
 422    }
 423
 424    async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
 425        let contacts = self.app_state.db.get_contacts(user_id).await?;
 426        let store = self.store().await;
 427        let updated_contact = store.contact_for_user(user_id, false);
 428        for contact in contacts {
 429            if let db::Contact::Accepted {
 430                user_id: contact_user_id,
 431                ..
 432            } = contact
 433            {
 434                for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
 435                    self.peer
 436                        .send(
 437                            contact_conn_id,
 438                            proto::UpdateContacts {
 439                                contacts: vec![updated_contact.clone()],
 440                                remove_contacts: Default::default(),
 441                                incoming_requests: Default::default(),
 442                                remove_incoming_requests: Default::default(),
 443                                outgoing_requests: Default::default(),
 444                                remove_outgoing_requests: Default::default(),
 445                            },
 446                        )
 447                        .trace_err();
 448                }
 449            }
 450        }
 451        Ok(())
 452    }
 453
 454    async fn unshare_project(
 455        self: Arc<Server>,
 456        request: TypedEnvelope<proto::UnshareProject>,
 457    ) -> Result<()> {
 458        let project_id = request.payload.project_id;
 459        let project;
 460        {
 461            let mut state = self.store_mut().await;
 462            project = state.unshare_project(project_id, request.sender_id)?;
 463            broadcast(request.sender_id, project.connection_ids, |conn_id| {
 464                self.peer
 465                    .send(conn_id, proto::UnshareProject { project_id })
 466            });
 467        }
 468        self.update_user_contacts(project.host_user_id).await?;
 469        Ok(())
 470    }
 471
 472    async fn join_project(
 473        self: Arc<Server>,
 474        request: TypedEnvelope<proto::JoinProject>,
 475        response: Response<proto::JoinProject>,
 476    ) -> Result<()> {
 477        let project_id = request.payload.project_id;
 478        let host_user_id;
 479        let guest_user_id;
 480        {
 481            let state = self.store().await;
 482            host_user_id = state.project(project_id)?.host_user_id;
 483            guest_user_id = state.user_id_for_connection(request.sender_id)?;
 484        };
 485
 486        let has_contact = self
 487            .app_state
 488            .db
 489            .has_contact(guest_user_id, host_user_id)
 490            .await?;
 491        if !has_contact {
 492            return Err(anyhow!("no such project"))?;
 493        }
 494
 495        {
 496            let state = &mut *self.store_mut().await;
 497            let joined = state.join_project(request.sender_id, guest_user_id, project_id)?;
 498            let share = joined.project.share()?;
 499            let peer_count = share.guests.len();
 500            let mut collaborators = Vec::with_capacity(peer_count);
 501            collaborators.push(proto::Collaborator {
 502                peer_id: joined.project.host_connection_id.0,
 503                replica_id: 0,
 504                user_id: joined.project.host_user_id.to_proto(),
 505            });
 506            let worktrees = share
 507                .worktrees
 508                .iter()
 509                .filter_map(|(id, shared_worktree)| {
 510                    let worktree = joined.project.worktrees.get(&id)?;
 511                    Some(proto::Worktree {
 512                        id: *id,
 513                        root_name: worktree.root_name.clone(),
 514                        entries: shared_worktree.entries.values().cloned().collect(),
 515                        diagnostic_summaries: shared_worktree
 516                            .diagnostic_summaries
 517                            .values()
 518                            .cloned()
 519                            .collect(),
 520                        visible: worktree.visible,
 521                        scan_id: shared_worktree.scan_id,
 522                    })
 523                })
 524                .collect();
 525            for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 526                if *peer_conn_id != request.sender_id {
 527                    collaborators.push(proto::Collaborator {
 528                        peer_id: peer_conn_id.0,
 529                        replica_id: *peer_replica_id as u32,
 530                        user_id: peer_user_id.to_proto(),
 531                    });
 532                }
 533            }
 534            broadcast(
 535                request.sender_id,
 536                joined.project.connection_ids(),
 537                |conn_id| {
 538                    self.peer.send(
 539                        conn_id,
 540                        proto::AddProjectCollaborator {
 541                            project_id,
 542                            collaborator: Some(proto::Collaborator {
 543                                peer_id: request.sender_id.0,
 544                                replica_id: joined.replica_id as u32,
 545                                user_id: guest_user_id.to_proto(),
 546                            }),
 547                        },
 548                    )
 549                },
 550            );
 551            response.send(proto::JoinProjectResponse {
 552                worktrees,
 553                replica_id: joined.replica_id as u32,
 554                collaborators,
 555                language_servers: joined.project.language_servers.clone(),
 556            })?;
 557        }
 558        self.update_user_contacts(host_user_id).await?;
 559        Ok(())
 560    }
 561
 562    async fn leave_project(
 563        self: Arc<Server>,
 564        request: TypedEnvelope<proto::LeaveProject>,
 565    ) -> Result<()> {
 566        let sender_id = request.sender_id;
 567        let project_id = request.payload.project_id;
 568        let project;
 569        {
 570            let mut state = self.store_mut().await;
 571            project = state.leave_project(sender_id, project_id)?;
 572            broadcast(sender_id, project.connection_ids, |conn_id| {
 573                self.peer.send(
 574                    conn_id,
 575                    proto::RemoveProjectCollaborator {
 576                        project_id,
 577                        peer_id: sender_id.0,
 578                    },
 579                )
 580            });
 581        }
 582        self.update_user_contacts(project.host_user_id).await?;
 583        Ok(())
 584    }
 585
 586    async fn register_worktree(
 587        self: Arc<Server>,
 588        request: TypedEnvelope<proto::RegisterWorktree>,
 589        response: Response<proto::RegisterWorktree>,
 590    ) -> Result<()> {
 591        let host_user_id;
 592        {
 593            let mut state = self.store_mut().await;
 594            host_user_id = state.user_id_for_connection(request.sender_id)?;
 595
 596            let guest_connection_ids = state
 597                .read_project(request.payload.project_id, request.sender_id)?
 598                .guest_connection_ids();
 599            state.register_worktree(
 600                request.payload.project_id,
 601                request.payload.worktree_id,
 602                request.sender_id,
 603                Worktree {
 604                    root_name: request.payload.root_name.clone(),
 605                    visible: request.payload.visible,
 606                },
 607            )?;
 608
 609            broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 610                self.peer
 611                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 612            });
 613        }
 614        self.update_user_contacts(host_user_id).await?;
 615        response.send(proto::Ack {})?;
 616        Ok(())
 617    }
 618
 619    async fn unregister_worktree(
 620        self: Arc<Server>,
 621        request: TypedEnvelope<proto::UnregisterWorktree>,
 622    ) -> Result<()> {
 623        let host_user_id;
 624        let project_id = request.payload.project_id;
 625        let worktree_id = request.payload.worktree_id;
 626        {
 627            let mut state = self.store_mut().await;
 628            let (_, guest_connection_ids) =
 629                state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 630            host_user_id = state.user_id_for_connection(request.sender_id)?;
 631            broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 632                self.peer.send(
 633                    conn_id,
 634                    proto::UnregisterWorktree {
 635                        project_id,
 636                        worktree_id,
 637                    },
 638                )
 639            });
 640        }
 641        self.update_user_contacts(host_user_id).await?;
 642        Ok(())
 643    }
 644
 645    async fn update_worktree(
 646        self: Arc<Server>,
 647        request: TypedEnvelope<proto::UpdateWorktree>,
 648        response: Response<proto::UpdateWorktree>,
 649    ) -> Result<()> {
 650        let connection_ids = self.store_mut().await.update_worktree(
 651            request.sender_id,
 652            request.payload.project_id,
 653            request.payload.worktree_id,
 654            &request.payload.removed_entries,
 655            &request.payload.updated_entries,
 656            request.payload.scan_id,
 657        )?;
 658
 659        broadcast(request.sender_id, connection_ids, |connection_id| {
 660            self.peer
 661                .forward_send(request.sender_id, connection_id, request.payload.clone())
 662        });
 663        response.send(proto::Ack {})?;
 664        Ok(())
 665    }
 666
 667    async fn update_diagnostic_summary(
 668        self: Arc<Server>,
 669        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 670    ) -> Result<()> {
 671        let summary = request
 672            .payload
 673            .summary
 674            .clone()
 675            .ok_or_else(|| anyhow!("invalid summary"))?;
 676        let receiver_ids = self.store_mut().await.update_diagnostic_summary(
 677            request.payload.project_id,
 678            request.payload.worktree_id,
 679            request.sender_id,
 680            summary,
 681        )?;
 682
 683        broadcast(request.sender_id, receiver_ids, |connection_id| {
 684            self.peer
 685                .forward_send(request.sender_id, connection_id, request.payload.clone())
 686        });
 687        Ok(())
 688    }
 689
 690    async fn start_language_server(
 691        self: Arc<Server>,
 692        request: TypedEnvelope<proto::StartLanguageServer>,
 693    ) -> Result<()> {
 694        let receiver_ids = self.store_mut().await.start_language_server(
 695            request.payload.project_id,
 696            request.sender_id,
 697            request
 698                .payload
 699                .server
 700                .clone()
 701                .ok_or_else(|| anyhow!("invalid language server"))?,
 702        )?;
 703        broadcast(request.sender_id, receiver_ids, |connection_id| {
 704            self.peer
 705                .forward_send(request.sender_id, connection_id, request.payload.clone())
 706        });
 707        Ok(())
 708    }
 709
 710    async fn update_language_server(
 711        self: Arc<Server>,
 712        request: TypedEnvelope<proto::UpdateLanguageServer>,
 713    ) -> Result<()> {
 714        let receiver_ids = self
 715            .store()
 716            .await
 717            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 718        broadcast(request.sender_id, receiver_ids, |connection_id| {
 719            self.peer
 720                .forward_send(request.sender_id, connection_id, request.payload.clone())
 721        });
 722        Ok(())
 723    }
 724
 725    async fn forward_project_request<T>(
 726        self: Arc<Server>,
 727        request: TypedEnvelope<T>,
 728        response: Response<T>,
 729    ) -> Result<()>
 730    where
 731        T: EntityMessage + RequestMessage,
 732    {
 733        let host_connection_id = self
 734            .store()
 735            .await
 736            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 737            .host_connection_id;
 738
 739        response.send(
 740            self.peer
 741                .forward_request(request.sender_id, host_connection_id, request.payload)
 742                .await?,
 743        )?;
 744        Ok(())
 745    }
 746
 747    async fn save_buffer(
 748        self: Arc<Server>,
 749        request: TypedEnvelope<proto::SaveBuffer>,
 750        response: Response<proto::SaveBuffer>,
 751    ) -> Result<()> {
 752        let host = self
 753            .store()
 754            .await
 755            .read_project(request.payload.project_id, request.sender_id)?
 756            .host_connection_id;
 757        let response_payload = self
 758            .peer
 759            .forward_request(request.sender_id, host, request.payload.clone())
 760            .await?;
 761
 762        let mut guests = self
 763            .store()
 764            .await
 765            .read_project(request.payload.project_id, request.sender_id)?
 766            .connection_ids();
 767        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 768        broadcast(host, guests, |conn_id| {
 769            self.peer
 770                .forward_send(host, conn_id, response_payload.clone())
 771        });
 772        response.send(response_payload)?;
 773        Ok(())
 774    }
 775
 776    async fn update_buffer(
 777        self: Arc<Server>,
 778        request: TypedEnvelope<proto::UpdateBuffer>,
 779        response: Response<proto::UpdateBuffer>,
 780    ) -> Result<()> {
 781        let receiver_ids = self
 782            .store()
 783            .await
 784            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 785        broadcast(request.sender_id, receiver_ids, |connection_id| {
 786            self.peer
 787                .forward_send(request.sender_id, connection_id, request.payload.clone())
 788        });
 789        response.send(proto::Ack {})?;
 790        Ok(())
 791    }
 792
 793    async fn update_buffer_file(
 794        self: Arc<Server>,
 795        request: TypedEnvelope<proto::UpdateBufferFile>,
 796    ) -> Result<()> {
 797        let receiver_ids = self
 798            .store()
 799            .await
 800            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 801        broadcast(request.sender_id, receiver_ids, |connection_id| {
 802            self.peer
 803                .forward_send(request.sender_id, connection_id, request.payload.clone())
 804        });
 805        Ok(())
 806    }
 807
 808    async fn buffer_reloaded(
 809        self: Arc<Server>,
 810        request: TypedEnvelope<proto::BufferReloaded>,
 811    ) -> Result<()> {
 812        let receiver_ids = self
 813            .store()
 814            .await
 815            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 816        broadcast(request.sender_id, receiver_ids, |connection_id| {
 817            self.peer
 818                .forward_send(request.sender_id, connection_id, request.payload.clone())
 819        });
 820        Ok(())
 821    }
 822
 823    async fn buffer_saved(
 824        self: Arc<Server>,
 825        request: TypedEnvelope<proto::BufferSaved>,
 826    ) -> Result<()> {
 827        let receiver_ids = self
 828            .store()
 829            .await
 830            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 831        broadcast(request.sender_id, receiver_ids, |connection_id| {
 832            self.peer
 833                .forward_send(request.sender_id, connection_id, request.payload.clone())
 834        });
 835        Ok(())
 836    }
 837
 838    async fn follow(
 839        self: Arc<Self>,
 840        request: TypedEnvelope<proto::Follow>,
 841        response: Response<proto::Follow>,
 842    ) -> Result<()> {
 843        let leader_id = ConnectionId(request.payload.leader_id);
 844        let follower_id = request.sender_id;
 845        if !self
 846            .store()
 847            .await
 848            .project_connection_ids(request.payload.project_id, follower_id)?
 849            .contains(&leader_id)
 850        {
 851            Err(anyhow!("no such peer"))?;
 852        }
 853        let mut response_payload = self
 854            .peer
 855            .forward_request(request.sender_id, leader_id, request.payload)
 856            .await?;
 857        response_payload
 858            .views
 859            .retain(|view| view.leader_id != Some(follower_id.0));
 860        response.send(response_payload)?;
 861        Ok(())
 862    }
 863
 864    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 865        let leader_id = ConnectionId(request.payload.leader_id);
 866        if !self
 867            .store()
 868            .await
 869            .project_connection_ids(request.payload.project_id, request.sender_id)?
 870            .contains(&leader_id)
 871        {
 872            Err(anyhow!("no such peer"))?;
 873        }
 874        self.peer
 875            .forward_send(request.sender_id, leader_id, request.payload)?;
 876        Ok(())
 877    }
 878
 879    async fn update_followers(
 880        self: Arc<Self>,
 881        request: TypedEnvelope<proto::UpdateFollowers>,
 882    ) -> Result<()> {
 883        let connection_ids = self
 884            .store()
 885            .await
 886            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 887        let leader_id = request
 888            .payload
 889            .variant
 890            .as_ref()
 891            .and_then(|variant| match variant {
 892                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 893                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 894                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 895            });
 896        for follower_id in &request.payload.follower_ids {
 897            let follower_id = ConnectionId(*follower_id);
 898            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 899                self.peer
 900                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 901            }
 902        }
 903        Ok(())
 904    }
 905
 906    async fn get_channels(
 907        self: Arc<Server>,
 908        request: TypedEnvelope<proto::GetChannels>,
 909        response: Response<proto::GetChannels>,
 910    ) -> Result<()> {
 911        let user_id = self
 912            .store()
 913            .await
 914            .user_id_for_connection(request.sender_id)?;
 915        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 916        response.send(proto::GetChannelsResponse {
 917            channels: channels
 918                .into_iter()
 919                .map(|chan| proto::Channel {
 920                    id: chan.id.to_proto(),
 921                    name: chan.name,
 922                })
 923                .collect(),
 924        })?;
 925        Ok(())
 926    }
 927
 928    async fn get_users(
 929        self: Arc<Server>,
 930        request: TypedEnvelope<proto::GetUsers>,
 931        response: Response<proto::GetUsers>,
 932    ) -> Result<()> {
 933        let user_ids = request
 934            .payload
 935            .user_ids
 936            .into_iter()
 937            .map(UserId::from_proto)
 938            .collect();
 939        let users = self
 940            .app_state
 941            .db
 942            .get_users_by_ids(user_ids)
 943            .await?
 944            .into_iter()
 945            .map(|user| proto::User {
 946                id: user.id.to_proto(),
 947                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 948                github_login: user.github_login,
 949            })
 950            .collect();
 951        response.send(proto::UsersResponse { users })?;
 952        Ok(())
 953    }
 954
 955    async fn fuzzy_search_users(
 956        self: Arc<Server>,
 957        request: TypedEnvelope<proto::FuzzySearchUsers>,
 958        response: Response<proto::FuzzySearchUsers>,
 959    ) -> Result<()> {
 960        let user_id = self
 961            .store()
 962            .await
 963            .user_id_for_connection(request.sender_id)?;
 964        let query = request.payload.query;
 965        let db = &self.app_state.db;
 966        let users = match query.len() {
 967            0 => vec![],
 968            1 | 2 => db
 969                .get_user_by_github_login(&query)
 970                .await?
 971                .into_iter()
 972                .collect(),
 973            _ => db.fuzzy_search_users(&query, 10).await?,
 974        };
 975        let users = users
 976            .into_iter()
 977            .filter(|user| user.id != user_id)
 978            .map(|user| proto::User {
 979                id: user.id.to_proto(),
 980                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 981                github_login: user.github_login,
 982            })
 983            .collect();
 984        response.send(proto::UsersResponse { users })?;
 985        Ok(())
 986    }
 987
 988    async fn request_contact(
 989        self: Arc<Server>,
 990        request: TypedEnvelope<proto::RequestContact>,
 991        response: Response<proto::RequestContact>,
 992    ) -> Result<()> {
 993        let requester_id = self
 994            .store()
 995            .await
 996            .user_id_for_connection(request.sender_id)?;
 997        let responder_id = UserId::from_proto(request.payload.responder_id);
 998        if requester_id == responder_id {
 999            return Err(anyhow!("cannot add yourself as a contact"))?;
1000        }
1001
1002        self.app_state
1003            .db
1004            .send_contact_request(requester_id, responder_id)
1005            .await?;
1006
1007        // Update outgoing contact requests of requester
1008        let mut update = proto::UpdateContacts::default();
1009        update.outgoing_requests.push(responder_id.to_proto());
1010        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1011            self.peer.send(connection_id, update.clone())?;
1012        }
1013
1014        // Update incoming contact requests of responder
1015        let mut update = proto::UpdateContacts::default();
1016        update
1017            .incoming_requests
1018            .push(proto::IncomingContactRequest {
1019                requester_id: requester_id.to_proto(),
1020                should_notify: true,
1021            });
1022        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1023            self.peer.send(connection_id, update.clone())?;
1024        }
1025
1026        response.send(proto::Ack {})?;
1027        Ok(())
1028    }
1029
1030    async fn respond_to_contact_request(
1031        self: Arc<Server>,
1032        request: TypedEnvelope<proto::RespondToContactRequest>,
1033        response: Response<proto::RespondToContactRequest>,
1034    ) -> Result<()> {
1035        let responder_id = self
1036            .store()
1037            .await
1038            .user_id_for_connection(request.sender_id)?;
1039        let requester_id = UserId::from_proto(request.payload.requester_id);
1040        if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1041            self.app_state
1042                .db
1043                .dismiss_contact_notification(responder_id, requester_id)
1044                .await?;
1045        } else {
1046            let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1047            self.app_state
1048                .db
1049                .respond_to_contact_request(responder_id, requester_id, accept)
1050                .await?;
1051
1052            let store = self.store().await;
1053            // Update responder with new contact
1054            let mut update = proto::UpdateContacts::default();
1055            if accept {
1056                update
1057                    .contacts
1058                    .push(store.contact_for_user(requester_id, false));
1059            }
1060            update
1061                .remove_incoming_requests
1062                .push(requester_id.to_proto());
1063            for connection_id in store.connection_ids_for_user(responder_id) {
1064                self.peer.send(connection_id, update.clone())?;
1065            }
1066
1067            // Update requester with new contact
1068            let mut update = proto::UpdateContacts::default();
1069            if accept {
1070                update
1071                    .contacts
1072                    .push(store.contact_for_user(responder_id, true));
1073            }
1074            update
1075                .remove_outgoing_requests
1076                .push(responder_id.to_proto());
1077            for connection_id in store.connection_ids_for_user(requester_id) {
1078                self.peer.send(connection_id, update.clone())?;
1079            }
1080        }
1081
1082        response.send(proto::Ack {})?;
1083        Ok(())
1084    }
1085
1086    async fn remove_contact(
1087        self: Arc<Server>,
1088        request: TypedEnvelope<proto::RemoveContact>,
1089        response: Response<proto::RemoveContact>,
1090    ) -> Result<()> {
1091        let requester_id = self
1092            .store()
1093            .await
1094            .user_id_for_connection(request.sender_id)?;
1095        let responder_id = UserId::from_proto(request.payload.user_id);
1096        self.app_state
1097            .db
1098            .remove_contact(requester_id, responder_id)
1099            .await?;
1100
1101        // Update outgoing contact requests of requester
1102        let mut update = proto::UpdateContacts::default();
1103        update
1104            .remove_outgoing_requests
1105            .push(responder_id.to_proto());
1106        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1107            self.peer.send(connection_id, update.clone())?;
1108        }
1109
1110        // Update incoming contact requests of responder
1111        let mut update = proto::UpdateContacts::default();
1112        update
1113            .remove_incoming_requests
1114            .push(requester_id.to_proto());
1115        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1116            self.peer.send(connection_id, update.clone())?;
1117        }
1118
1119        response.send(proto::Ack {})?;
1120        Ok(())
1121    }
1122
1123    async fn join_channel(
1124        self: Arc<Self>,
1125        request: TypedEnvelope<proto::JoinChannel>,
1126        response: Response<proto::JoinChannel>,
1127    ) -> Result<()> {
1128        let user_id = self
1129            .store()
1130            .await
1131            .user_id_for_connection(request.sender_id)?;
1132        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1133        if !self
1134            .app_state
1135            .db
1136            .can_user_access_channel(user_id, channel_id)
1137            .await?
1138        {
1139            Err(anyhow!("access denied"))?;
1140        }
1141
1142        self.store_mut()
1143            .await
1144            .join_channel(request.sender_id, channel_id);
1145        let messages = self
1146            .app_state
1147            .db
1148            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1149            .await?
1150            .into_iter()
1151            .map(|msg| proto::ChannelMessage {
1152                id: msg.id.to_proto(),
1153                body: msg.body,
1154                timestamp: msg.sent_at.unix_timestamp() as u64,
1155                sender_id: msg.sender_id.to_proto(),
1156                nonce: Some(msg.nonce.as_u128().into()),
1157            })
1158            .collect::<Vec<_>>();
1159        response.send(proto::JoinChannelResponse {
1160            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1161            messages,
1162        })?;
1163        Ok(())
1164    }
1165
1166    async fn leave_channel(
1167        self: Arc<Self>,
1168        request: TypedEnvelope<proto::LeaveChannel>,
1169    ) -> Result<()> {
1170        let user_id = self
1171            .store()
1172            .await
1173            .user_id_for_connection(request.sender_id)?;
1174        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1175        if !self
1176            .app_state
1177            .db
1178            .can_user_access_channel(user_id, channel_id)
1179            .await?
1180        {
1181            Err(anyhow!("access denied"))?;
1182        }
1183
1184        self.store_mut()
1185            .await
1186            .leave_channel(request.sender_id, channel_id);
1187
1188        Ok(())
1189    }
1190
1191    async fn send_channel_message(
1192        self: Arc<Self>,
1193        request: TypedEnvelope<proto::SendChannelMessage>,
1194        response: Response<proto::SendChannelMessage>,
1195    ) -> Result<()> {
1196        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1197        let user_id;
1198        let connection_ids;
1199        {
1200            let state = self.store().await;
1201            user_id = state.user_id_for_connection(request.sender_id)?;
1202            connection_ids = state.channel_connection_ids(channel_id)?;
1203        }
1204
1205        // Validate the message body.
1206        let body = request.payload.body.trim().to_string();
1207        if body.len() > MAX_MESSAGE_LEN {
1208            return Err(anyhow!("message is too long"))?;
1209        }
1210        if body.is_empty() {
1211            return Err(anyhow!("message can't be blank"))?;
1212        }
1213
1214        let timestamp = OffsetDateTime::now_utc();
1215        let nonce = request
1216            .payload
1217            .nonce
1218            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1219
1220        let message_id = self
1221            .app_state
1222            .db
1223            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1224            .await?
1225            .to_proto();
1226        let message = proto::ChannelMessage {
1227            sender_id: user_id.to_proto(),
1228            id: message_id,
1229            body,
1230            timestamp: timestamp.unix_timestamp() as u64,
1231            nonce: Some(nonce),
1232        };
1233        broadcast(request.sender_id, connection_ids, |conn_id| {
1234            self.peer.send(
1235                conn_id,
1236                proto::ChannelMessageSent {
1237                    channel_id: channel_id.to_proto(),
1238                    message: Some(message.clone()),
1239                },
1240            )
1241        });
1242        response.send(proto::SendChannelMessageResponse {
1243            message: Some(message),
1244        })?;
1245        Ok(())
1246    }
1247
1248    async fn get_channel_messages(
1249        self: Arc<Self>,
1250        request: TypedEnvelope<proto::GetChannelMessages>,
1251        response: Response<proto::GetChannelMessages>,
1252    ) -> Result<()> {
1253        let user_id = self
1254            .store()
1255            .await
1256            .user_id_for_connection(request.sender_id)?;
1257        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1258        if !self
1259            .app_state
1260            .db
1261            .can_user_access_channel(user_id, channel_id)
1262            .await?
1263        {
1264            Err(anyhow!("access denied"))?;
1265        }
1266
1267        let messages = self
1268            .app_state
1269            .db
1270            .get_channel_messages(
1271                channel_id,
1272                MESSAGE_COUNT_PER_PAGE,
1273                Some(MessageId::from_proto(request.payload.before_message_id)),
1274            )
1275            .await?
1276            .into_iter()
1277            .map(|msg| proto::ChannelMessage {
1278                id: msg.id.to_proto(),
1279                body: msg.body,
1280                timestamp: msg.sent_at.unix_timestamp() as u64,
1281                sender_id: msg.sender_id.to_proto(),
1282                nonce: Some(msg.nonce.as_u128().into()),
1283            })
1284            .collect::<Vec<_>>();
1285        response.send(proto::GetChannelMessagesResponse {
1286            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1287            messages,
1288        })?;
1289        Ok(())
1290    }
1291
1292    async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1293        #[cfg(test)]
1294        tokio::task::yield_now().await;
1295        let guard = self.store.read().await;
1296        #[cfg(test)]
1297        tokio::task::yield_now().await;
1298        StoreReadGuard {
1299            guard,
1300            _not_send: PhantomData,
1301        }
1302    }
1303
1304    async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1305        #[cfg(test)]
1306        tokio::task::yield_now().await;
1307        let guard = self.store.write().await;
1308        #[cfg(test)]
1309        tokio::task::yield_now().await;
1310        StoreWriteGuard {
1311            guard,
1312            _not_send: PhantomData,
1313        }
1314    }
1315}
1316
1317impl<'a> Deref for StoreReadGuard<'a> {
1318    type Target = Store;
1319
1320    fn deref(&self) -> &Self::Target {
1321        &*self.guard
1322    }
1323}
1324
1325impl<'a> Deref for StoreWriteGuard<'a> {
1326    type Target = Store;
1327
1328    fn deref(&self) -> &Self::Target {
1329        &*self.guard
1330    }
1331}
1332
1333impl<'a> DerefMut for StoreWriteGuard<'a> {
1334    fn deref_mut(&mut self) -> &mut Self::Target {
1335        &mut *self.guard
1336    }
1337}
1338
1339impl<'a> Drop for StoreWriteGuard<'a> {
1340    fn drop(&mut self) {
1341        #[cfg(test)]
1342        self.check_invariants();
1343
1344        let metrics = self.metrics();
1345        tracing::info!(
1346            connections = metrics.connections,
1347            registered_projects = metrics.registered_projects,
1348            shared_projects = metrics.shared_projects,
1349            "metrics"
1350        );
1351    }
1352}
1353
1354impl Executor for RealExecutor {
1355    type Sleep = Sleep;
1356
1357    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1358        tokio::task::spawn(future);
1359    }
1360
1361    fn sleep(&self, duration: Duration) -> Self::Sleep {
1362        tokio::time::sleep(duration)
1363    }
1364}
1365
1366fn broadcast<F>(
1367    sender_id: ConnectionId,
1368    receiver_ids: impl IntoIterator<Item = ConnectionId>,
1369    mut f: F,
1370) where
1371    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1372{
1373    for receiver_id in receiver_ids {
1374        if receiver_id != sender_id {
1375            f(receiver_id).trace_err();
1376        }
1377    }
1378}
1379
1380lazy_static! {
1381    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1382}
1383
1384pub struct ProtocolVersion(u32);
1385
1386impl Header for ProtocolVersion {
1387    fn name() -> &'static HeaderName {
1388        &ZED_PROTOCOL_VERSION
1389    }
1390
1391    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1392    where
1393        Self: Sized,
1394        I: Iterator<Item = &'i axum::http::HeaderValue>,
1395    {
1396        let version = values
1397            .next()
1398            .ok_or_else(|| axum::headers::Error::invalid())?
1399            .to_str()
1400            .map_err(|_| axum::headers::Error::invalid())?
1401            .parse()
1402            .map_err(|_| axum::headers::Error::invalid())?;
1403        Ok(Self(version))
1404    }
1405
1406    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1407        values.extend([self.0.to_string().parse().unwrap()]);
1408    }
1409}
1410
1411pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1412    let server = Server::new(app_state.clone(), None);
1413    Router::new()
1414        .route("/rpc", get(handle_websocket_request))
1415        .layer(
1416            ServiceBuilder::new()
1417                .layer(Extension(app_state))
1418                .layer(middleware::from_fn(auth::validate_header))
1419                .layer(Extension(server)),
1420        )
1421}
1422
1423pub async fn handle_websocket_request(
1424    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1425    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1426    Extension(server): Extension<Arc<Server>>,
1427    Extension(user): Extension<User>,
1428    ws: WebSocketUpgrade,
1429) -> axum::response::Response {
1430    if protocol_version != rpc::PROTOCOL_VERSION {
1431        return (
1432            StatusCode::UPGRADE_REQUIRED,
1433            "client must be upgraded".to_string(),
1434        )
1435            .into_response();
1436    }
1437    let socket_address = socket_address.to_string();
1438    ws.on_upgrade(move |socket| {
1439        use util::ResultExt;
1440        let socket = socket
1441            .map_ok(to_tungstenite_message)
1442            .err_into()
1443            .with(|message| async move { Ok(to_axum_message(message)) });
1444        let connection = Connection::new(Box::pin(socket));
1445        async move {
1446            server
1447                .handle_connection(connection, socket_address, user, None, RealExecutor)
1448                .await
1449                .log_err();
1450        }
1451    })
1452}
1453
1454fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1455    match message {
1456        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1457        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1458        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1459        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1460        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1461            code: frame.code.into(),
1462            reason: frame.reason,
1463        })),
1464    }
1465}
1466
1467fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1468    match message {
1469        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1470        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1471        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1472        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1473        AxumMessage::Close(frame) => {
1474            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1475                code: frame.code.into(),
1476                reason: frame.reason,
1477            }))
1478        }
1479    }
1480}
1481
1482pub trait ResultExt {
1483    type Ok;
1484
1485    fn trace_err(self) -> Option<Self::Ok>;
1486}
1487
1488impl<T, E> ResultExt for Result<T, E>
1489where
1490    E: std::fmt::Debug,
1491{
1492    type Ok = T;
1493
1494    fn trace_err(self) -> Option<T> {
1495        match self {
1496            Ok(value) => Some(value),
1497            Err(error) => {
1498                tracing::error!("{:?}", error);
1499                None
1500            }
1501        }
1502    }
1503}
1504
1505#[cfg(test)]
1506mod tests {
1507    use super::*;
1508    use crate::{
1509        db::{tests::TestDb, UserId},
1510        AppState,
1511    };
1512    use ::rpc::Peer;
1513    use client::{
1514        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1515        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1516    };
1517    use collections::{BTreeMap, HashSet};
1518    use editor::{
1519        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1520        ToOffset, ToggleCodeActions, Undo,
1521    };
1522    use gpui::{
1523        executor::{self, Deterministic},
1524        geometry::vector::vec2f,
1525        ModelHandle, TestAppContext, ViewHandle,
1526    };
1527    use language::{
1528        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1529        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1530    };
1531    use lsp::{self, FakeLanguageServer};
1532    use parking_lot::Mutex;
1533    use project::{
1534        fs::{FakeFs, Fs as _},
1535        search::SearchQuery,
1536        worktree::WorktreeHandle,
1537        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1538    };
1539    use rand::prelude::*;
1540    use rpc::PeerId;
1541    use serde_json::json;
1542    use settings::Settings;
1543    use sqlx::types::time::OffsetDateTime;
1544    use std::{
1545        env,
1546        ops::Deref,
1547        path::{Path, PathBuf},
1548        rc::Rc,
1549        sync::{
1550            atomic::{AtomicBool, Ordering::SeqCst},
1551            Arc,
1552        },
1553        time::Duration,
1554    };
1555    use theme::ThemeRegistry;
1556    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1557
1558    #[cfg(test)]
1559    #[ctor::ctor]
1560    fn init_logger() {
1561        if std::env::var("RUST_LOG").is_ok() {
1562            env_logger::init();
1563        }
1564    }
1565
1566    #[gpui::test(iterations = 10)]
1567    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1568        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1569        let lang_registry = Arc::new(LanguageRegistry::test());
1570        let fs = FakeFs::new(cx_a.background());
1571        cx_a.foreground().forbid_parking();
1572
1573        // Connect to a server as 2 clients.
1574        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1575        let client_a = server.create_client(cx_a, "user_a").await;
1576        let client_b = server.create_client(cx_b, "user_b").await;
1577        server
1578            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1579            .await;
1580
1581        // Share a project as client A
1582        fs.insert_tree(
1583            "/a",
1584            json!({
1585                "a.txt": "a-contents",
1586                "b.txt": "b-contents",
1587            }),
1588        )
1589        .await;
1590        let project_a = cx_a.update(|cx| {
1591            Project::local(
1592                client_a.clone(),
1593                client_a.user_store.clone(),
1594                lang_registry.clone(),
1595                fs.clone(),
1596                cx,
1597            )
1598        });
1599        let (worktree_a, _) = project_a
1600            .update(cx_a, |p, cx| {
1601                p.find_or_create_local_worktree("/a", true, cx)
1602            })
1603            .await
1604            .unwrap();
1605        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1606        worktree_a
1607            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1608            .await;
1609        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1610        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1611
1612        // Join that project as client B
1613        let project_b = Project::remote(
1614            project_id,
1615            client_b.clone(),
1616            client_b.user_store.clone(),
1617            lang_registry.clone(),
1618            fs.clone(),
1619            &mut cx_b.to_async(),
1620        )
1621        .await
1622        .unwrap();
1623
1624        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1625            assert_eq!(
1626                project
1627                    .collaborators()
1628                    .get(&client_a.peer_id)
1629                    .unwrap()
1630                    .user
1631                    .github_login,
1632                "user_a"
1633            );
1634            project.replica_id()
1635        });
1636        project_a
1637            .condition(&cx_a, |tree, _| {
1638                tree.collaborators()
1639                    .get(&client_b.peer_id)
1640                    .map_or(false, |collaborator| {
1641                        collaborator.replica_id == replica_id_b
1642                            && collaborator.user.github_login == "user_b"
1643                    })
1644            })
1645            .await;
1646
1647        // Open the same file as client B and client A.
1648        let buffer_b = project_b
1649            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1650            .await
1651            .unwrap();
1652        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1653        project_a.read_with(cx_a, |project, cx| {
1654            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1655        });
1656        let buffer_a = project_a
1657            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1658            .await
1659            .unwrap();
1660
1661        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1662
1663        // TODO
1664        // // Create a selection set as client B and see that selection set as client A.
1665        // buffer_a
1666        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1667        //     .await;
1668
1669        // Edit the buffer as client B and see that edit as client A.
1670        editor_b.update(cx_b, |editor, cx| {
1671            editor.handle_input(&Input("ok, ".into()), cx)
1672        });
1673        buffer_a
1674            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1675            .await;
1676
1677        // TODO
1678        // // Remove the selection set as client B, see those selections disappear as client A.
1679        cx_b.update(move |_| drop(editor_b));
1680        // buffer_a
1681        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1682        //     .await;
1683
1684        // Dropping the client B's project removes client B from client A's collaborators.
1685        cx_b.update(move |_| drop(project_b));
1686        project_a
1687            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1688            .await;
1689    }
1690
1691    #[gpui::test(iterations = 10)]
1692    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1693        let lang_registry = Arc::new(LanguageRegistry::test());
1694        let fs = FakeFs::new(cx_a.background());
1695        cx_a.foreground().forbid_parking();
1696
1697        // Connect to a server as 2 clients.
1698        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1699        let client_a = server.create_client(cx_a, "user_a").await;
1700        let client_b = server.create_client(cx_b, "user_b").await;
1701        server
1702            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1703            .await;
1704
1705        // Share a project as client A
1706        fs.insert_tree(
1707            "/a",
1708            json!({
1709                "a.txt": "a-contents",
1710                "b.txt": "b-contents",
1711            }),
1712        )
1713        .await;
1714        let project_a = cx_a.update(|cx| {
1715            Project::local(
1716                client_a.clone(),
1717                client_a.user_store.clone(),
1718                lang_registry.clone(),
1719                fs.clone(),
1720                cx,
1721            )
1722        });
1723        let (worktree_a, _) = project_a
1724            .update(cx_a, |p, cx| {
1725                p.find_or_create_local_worktree("/a", true, cx)
1726            })
1727            .await
1728            .unwrap();
1729        worktree_a
1730            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1731            .await;
1732        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1733        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1734        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1735        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1736
1737        // Join that project as client B
1738        let project_b = Project::remote(
1739            project_id,
1740            client_b.clone(),
1741            client_b.user_store.clone(),
1742            lang_registry.clone(),
1743            fs.clone(),
1744            &mut cx_b.to_async(),
1745        )
1746        .await
1747        .unwrap();
1748        project_b
1749            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1750            .await
1751            .unwrap();
1752
1753        // Unshare the project as client A
1754        project_a.update(cx_a, |project, cx| project.unshare(cx));
1755        project_b
1756            .condition(cx_b, |project, _| project.is_read_only())
1757            .await;
1758        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1759        cx_b.update(|_| {
1760            drop(project_b);
1761        });
1762
1763        // Share the project again and ensure guests can still join.
1764        project_a
1765            .update(cx_a, |project, cx| project.share(cx))
1766            .await
1767            .unwrap();
1768        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1769
1770        let project_b2 = Project::remote(
1771            project_id,
1772            client_b.clone(),
1773            client_b.user_store.clone(),
1774            lang_registry.clone(),
1775            fs.clone(),
1776            &mut cx_b.to_async(),
1777        )
1778        .await
1779        .unwrap();
1780        project_b2
1781            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1782            .await
1783            .unwrap();
1784    }
1785
1786    #[gpui::test(iterations = 10)]
1787    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1788        let lang_registry = Arc::new(LanguageRegistry::test());
1789        let fs = FakeFs::new(cx_a.background());
1790        cx_a.foreground().forbid_parking();
1791
1792        // Connect to a server as 2 clients.
1793        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1794        let client_a = server.create_client(cx_a, "user_a").await;
1795        let client_b = server.create_client(cx_b, "user_b").await;
1796        server
1797            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1798            .await;
1799
1800        // Share a project as client A
1801        fs.insert_tree(
1802            "/a",
1803            json!({
1804                "a.txt": "a-contents",
1805                "b.txt": "b-contents",
1806            }),
1807        )
1808        .await;
1809        let project_a = cx_a.update(|cx| {
1810            Project::local(
1811                client_a.clone(),
1812                client_a.user_store.clone(),
1813                lang_registry.clone(),
1814                fs.clone(),
1815                cx,
1816            )
1817        });
1818        let (worktree_a, _) = project_a
1819            .update(cx_a, |p, cx| {
1820                p.find_or_create_local_worktree("/a", true, cx)
1821            })
1822            .await
1823            .unwrap();
1824        worktree_a
1825            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1826            .await;
1827        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1828        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1829        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1830        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1831
1832        // Join that project as client B
1833        let project_b = Project::remote(
1834            project_id,
1835            client_b.clone(),
1836            client_b.user_store.clone(),
1837            lang_registry.clone(),
1838            fs.clone(),
1839            &mut cx_b.to_async(),
1840        )
1841        .await
1842        .unwrap();
1843        project_b
1844            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1845            .await
1846            .unwrap();
1847
1848        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1849        server.disconnect_client(client_a.current_user_id(cx_a));
1850        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1851        project_a
1852            .condition(cx_a, |project, _| project.collaborators().is_empty())
1853            .await;
1854        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1855        project_b
1856            .condition(cx_b, |project, _| project.is_read_only())
1857            .await;
1858        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1859        cx_b.update(|_| {
1860            drop(project_b);
1861        });
1862
1863        // Await reconnection
1864        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1865
1866        // Share the project again and ensure guests can still join.
1867        project_a
1868            .update(cx_a, |project, cx| project.share(cx))
1869            .await
1870            .unwrap();
1871        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1872
1873        let project_b2 = Project::remote(
1874            project_id,
1875            client_b.clone(),
1876            client_b.user_store.clone(),
1877            lang_registry.clone(),
1878            fs.clone(),
1879            &mut cx_b.to_async(),
1880        )
1881        .await
1882        .unwrap();
1883        project_b2
1884            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1885            .await
1886            .unwrap();
1887    }
1888
1889    #[gpui::test(iterations = 10)]
1890    async fn test_propagate_saves_and_fs_changes(
1891        cx_a: &mut TestAppContext,
1892        cx_b: &mut TestAppContext,
1893        cx_c: &mut TestAppContext,
1894    ) {
1895        let lang_registry = Arc::new(LanguageRegistry::test());
1896        let fs = FakeFs::new(cx_a.background());
1897        cx_a.foreground().forbid_parking();
1898
1899        // Connect to a server as 3 clients.
1900        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1901        let client_a = server.create_client(cx_a, "user_a").await;
1902        let client_b = server.create_client(cx_b, "user_b").await;
1903        let client_c = server.create_client(cx_c, "user_c").await;
1904        server
1905            .make_contacts(vec![
1906                (&client_a, cx_a),
1907                (&client_b, cx_b),
1908                (&client_c, cx_c),
1909            ])
1910            .await;
1911
1912        // Share a worktree as client A.
1913        fs.insert_tree(
1914            "/a",
1915            json!({
1916                "file1": "",
1917                "file2": ""
1918            }),
1919        )
1920        .await;
1921        let project_a = cx_a.update(|cx| {
1922            Project::local(
1923                client_a.clone(),
1924                client_a.user_store.clone(),
1925                lang_registry.clone(),
1926                fs.clone(),
1927                cx,
1928            )
1929        });
1930        let (worktree_a, _) = project_a
1931            .update(cx_a, |p, cx| {
1932                p.find_or_create_local_worktree("/a", true, cx)
1933            })
1934            .await
1935            .unwrap();
1936        worktree_a
1937            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1938            .await;
1939        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1940        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1941        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1942
1943        // Join that worktree as clients B and C.
1944        let project_b = Project::remote(
1945            project_id,
1946            client_b.clone(),
1947            client_b.user_store.clone(),
1948            lang_registry.clone(),
1949            fs.clone(),
1950            &mut cx_b.to_async(),
1951        )
1952        .await
1953        .unwrap();
1954        let project_c = Project::remote(
1955            project_id,
1956            client_c.clone(),
1957            client_c.user_store.clone(),
1958            lang_registry.clone(),
1959            fs.clone(),
1960            &mut cx_c.to_async(),
1961        )
1962        .await
1963        .unwrap();
1964        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1965        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1966
1967        // Open and edit a buffer as both guests B and C.
1968        let buffer_b = project_b
1969            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1970            .await
1971            .unwrap();
1972        let buffer_c = project_c
1973            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1974            .await
1975            .unwrap();
1976        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1977        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1978
1979        // Open and edit that buffer as the host.
1980        let buffer_a = project_a
1981            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1982            .await
1983            .unwrap();
1984
1985        buffer_a
1986            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1987            .await;
1988        buffer_a.update(cx_a, |buf, cx| {
1989            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1990        });
1991
1992        // Wait for edits to propagate
1993        buffer_a
1994            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1995            .await;
1996        buffer_b
1997            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1998            .await;
1999        buffer_c
2000            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2001            .await;
2002
2003        // Edit the buffer as the host and concurrently save as guest B.
2004        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2005        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2006        save_b.await.unwrap();
2007        assert_eq!(
2008            fs.load("/a/file1".as_ref()).await.unwrap(),
2009            "hi-a, i-am-c, i-am-b, i-am-a"
2010        );
2011        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2012        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2013        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2014
2015        worktree_a.flush_fs_events(cx_a).await;
2016
2017        // Make changes on host's file system, see those changes on guest worktrees.
2018        fs.rename(
2019            "/a/file1".as_ref(),
2020            "/a/file1-renamed".as_ref(),
2021            Default::default(),
2022        )
2023        .await
2024        .unwrap();
2025
2026        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2027            .await
2028            .unwrap();
2029        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2030
2031        worktree_a
2032            .condition(&cx_a, |tree, _| {
2033                tree.paths()
2034                    .map(|p| p.to_string_lossy())
2035                    .collect::<Vec<_>>()
2036                    == ["file1-renamed", "file3", "file4"]
2037            })
2038            .await;
2039        worktree_b
2040            .condition(&cx_b, |tree, _| {
2041                tree.paths()
2042                    .map(|p| p.to_string_lossy())
2043                    .collect::<Vec<_>>()
2044                    == ["file1-renamed", "file3", "file4"]
2045            })
2046            .await;
2047        worktree_c
2048            .condition(&cx_c, |tree, _| {
2049                tree.paths()
2050                    .map(|p| p.to_string_lossy())
2051                    .collect::<Vec<_>>()
2052                    == ["file1-renamed", "file3", "file4"]
2053            })
2054            .await;
2055
2056        // Ensure buffer files are updated as well.
2057        buffer_a
2058            .condition(&cx_a, |buf, _| {
2059                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2060            })
2061            .await;
2062        buffer_b
2063            .condition(&cx_b, |buf, _| {
2064                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2065            })
2066            .await;
2067        buffer_c
2068            .condition(&cx_c, |buf, _| {
2069                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2070            })
2071            .await;
2072    }
2073
2074    #[gpui::test(iterations = 10)]
2075    async fn test_fs_operations(
2076        executor: Arc<Deterministic>,
2077        cx_a: &mut TestAppContext,
2078        cx_b: &mut TestAppContext,
2079    ) {
2080        executor.forbid_parking();
2081        let fs = FakeFs::new(cx_a.background());
2082
2083        // Connect to a server as 2 clients.
2084        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2085        let mut client_a = server.create_client(cx_a, "user_a").await;
2086        let mut client_b = server.create_client(cx_b, "user_b").await;
2087        server
2088            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2089            .await;
2090
2091        // Share a project as client A
2092        fs.insert_tree(
2093            "/dir",
2094            json!({
2095                "a.txt": "a-contents",
2096                "b.txt": "b-contents",
2097            }),
2098        )
2099        .await;
2100
2101        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2102        let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
2103        project_a
2104            .update(cx_a, |project, cx| project.share(cx))
2105            .await
2106            .unwrap();
2107
2108        let project_b = client_b.build_remote_project(project_id, cx_b).await;
2109
2110        let worktree_a =
2111            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2112        let worktree_b =
2113            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2114
2115        let entry = project_b
2116            .update(cx_b, |project, cx| {
2117                project
2118                    .create_entry((worktree_id, "c.txt"), false, cx)
2119                    .unwrap()
2120            })
2121            .await
2122            .unwrap();
2123        worktree_a.read_with(cx_a, |worktree, _| {
2124            assert_eq!(
2125                worktree
2126                    .paths()
2127                    .map(|p| p.to_string_lossy())
2128                    .collect::<Vec<_>>(),
2129                ["a.txt", "b.txt", "c.txt"]
2130            );
2131        });
2132        worktree_b.read_with(cx_b, |worktree, _| {
2133            assert_eq!(
2134                worktree
2135                    .paths()
2136                    .map(|p| p.to_string_lossy())
2137                    .collect::<Vec<_>>(),
2138                ["a.txt", "b.txt", "c.txt"]
2139            );
2140        });
2141
2142        project_b
2143            .update(cx_b, |project, cx| {
2144                project.rename_entry(entry.id, Path::new("d.txt"), cx)
2145            })
2146            .unwrap()
2147            .await
2148            .unwrap();
2149        worktree_a.read_with(cx_a, |worktree, _| {
2150            assert_eq!(
2151                worktree
2152                    .paths()
2153                    .map(|p| p.to_string_lossy())
2154                    .collect::<Vec<_>>(),
2155                ["a.txt", "b.txt", "d.txt"]
2156            );
2157        });
2158        worktree_b.read_with(cx_b, |worktree, _| {
2159            assert_eq!(
2160                worktree
2161                    .paths()
2162                    .map(|p| p.to_string_lossy())
2163                    .collect::<Vec<_>>(),
2164                ["a.txt", "b.txt", "d.txt"]
2165            );
2166        });
2167
2168        let dir_entry = project_b
2169            .update(cx_b, |project, cx| {
2170                project
2171                    .create_entry((worktree_id, "DIR"), true, cx)
2172                    .unwrap()
2173            })
2174            .await
2175            .unwrap();
2176        worktree_a.read_with(cx_a, |worktree, _| {
2177            assert_eq!(
2178                worktree
2179                    .paths()
2180                    .map(|p| p.to_string_lossy())
2181                    .collect::<Vec<_>>(),
2182                ["DIR", "a.txt", "b.txt", "d.txt"]
2183            );
2184        });
2185        worktree_b.read_with(cx_b, |worktree, _| {
2186            assert_eq!(
2187                worktree
2188                    .paths()
2189                    .map(|p| p.to_string_lossy())
2190                    .collect::<Vec<_>>(),
2191                ["DIR", "a.txt", "b.txt", "d.txt"]
2192            );
2193        });
2194
2195        project_b
2196            .update(cx_b, |project, cx| {
2197                project.delete_entry(dir_entry.id, cx).unwrap()
2198            })
2199            .await
2200            .unwrap();
2201        worktree_a.read_with(cx_a, |worktree, _| {
2202            assert_eq!(
2203                worktree
2204                    .paths()
2205                    .map(|p| p.to_string_lossy())
2206                    .collect::<Vec<_>>(),
2207                ["a.txt", "b.txt", "d.txt"]
2208            );
2209        });
2210        worktree_b.read_with(cx_b, |worktree, _| {
2211            assert_eq!(
2212                worktree
2213                    .paths()
2214                    .map(|p| p.to_string_lossy())
2215                    .collect::<Vec<_>>(),
2216                ["a.txt", "b.txt", "d.txt"]
2217            );
2218        });
2219
2220        project_b
2221            .update(cx_b, |project, cx| {
2222                project.delete_entry(entry.id, cx).unwrap()
2223            })
2224            .await
2225            .unwrap();
2226        worktree_a.read_with(cx_a, |worktree, _| {
2227            assert_eq!(
2228                worktree
2229                    .paths()
2230                    .map(|p| p.to_string_lossy())
2231                    .collect::<Vec<_>>(),
2232                ["a.txt", "b.txt"]
2233            );
2234        });
2235        worktree_b.read_with(cx_b, |worktree, _| {
2236            assert_eq!(
2237                worktree
2238                    .paths()
2239                    .map(|p| p.to_string_lossy())
2240                    .collect::<Vec<_>>(),
2241                ["a.txt", "b.txt"]
2242            );
2243        });
2244    }
2245
2246    #[gpui::test(iterations = 10)]
2247    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2248        cx_a.foreground().forbid_parking();
2249        let lang_registry = Arc::new(LanguageRegistry::test());
2250        let fs = FakeFs::new(cx_a.background());
2251
2252        // Connect to a server as 2 clients.
2253        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2254        let client_a = server.create_client(cx_a, "user_a").await;
2255        let client_b = server.create_client(cx_b, "user_b").await;
2256        server
2257            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2258            .await;
2259
2260        // Share a project as client A
2261        fs.insert_tree(
2262            "/dir",
2263            json!({
2264                "a.txt": "a-contents",
2265            }),
2266        )
2267        .await;
2268
2269        let project_a = cx_a.update(|cx| {
2270            Project::local(
2271                client_a.clone(),
2272                client_a.user_store.clone(),
2273                lang_registry.clone(),
2274                fs.clone(),
2275                cx,
2276            )
2277        });
2278        let (worktree_a, _) = project_a
2279            .update(cx_a, |p, cx| {
2280                p.find_or_create_local_worktree("/dir", true, cx)
2281            })
2282            .await
2283            .unwrap();
2284        worktree_a
2285            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2286            .await;
2287        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2288        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2289        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2290
2291        // Join that project as client B
2292        let project_b = Project::remote(
2293            project_id,
2294            client_b.clone(),
2295            client_b.user_store.clone(),
2296            lang_registry.clone(),
2297            fs.clone(),
2298            &mut cx_b.to_async(),
2299        )
2300        .await
2301        .unwrap();
2302
2303        // Open a buffer as client B
2304        let buffer_b = project_b
2305            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2306            .await
2307            .unwrap();
2308
2309        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2310        buffer_b.read_with(cx_b, |buf, _| {
2311            assert!(buf.is_dirty());
2312            assert!(!buf.has_conflict());
2313        });
2314
2315        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2316        buffer_b
2317            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2318            .await;
2319        buffer_b.read_with(cx_b, |buf, _| {
2320            assert!(!buf.has_conflict());
2321        });
2322
2323        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2324        buffer_b.read_with(cx_b, |buf, _| {
2325            assert!(buf.is_dirty());
2326            assert!(!buf.has_conflict());
2327        });
2328    }
2329
2330    #[gpui::test(iterations = 10)]
2331    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2332        cx_a.foreground().forbid_parking();
2333        let lang_registry = Arc::new(LanguageRegistry::test());
2334        let fs = FakeFs::new(cx_a.background());
2335
2336        // Connect to a server as 2 clients.
2337        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2338        let client_a = server.create_client(cx_a, "user_a").await;
2339        let client_b = server.create_client(cx_b, "user_b").await;
2340        server
2341            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2342            .await;
2343
2344        // Share a project as client A
2345        fs.insert_tree(
2346            "/dir",
2347            json!({
2348                "a.txt": "a-contents",
2349            }),
2350        )
2351        .await;
2352
2353        let project_a = cx_a.update(|cx| {
2354            Project::local(
2355                client_a.clone(),
2356                client_a.user_store.clone(),
2357                lang_registry.clone(),
2358                fs.clone(),
2359                cx,
2360            )
2361        });
2362        let (worktree_a, _) = project_a
2363            .update(cx_a, |p, cx| {
2364                p.find_or_create_local_worktree("/dir", true, cx)
2365            })
2366            .await
2367            .unwrap();
2368        worktree_a
2369            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2370            .await;
2371        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2372        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2373        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2374
2375        // Join that project as client B
2376        let project_b = Project::remote(
2377            project_id,
2378            client_b.clone(),
2379            client_b.user_store.clone(),
2380            lang_registry.clone(),
2381            fs.clone(),
2382            &mut cx_b.to_async(),
2383        )
2384        .await
2385        .unwrap();
2386        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2387
2388        // Open a buffer as client B
2389        let buffer_b = project_b
2390            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2391            .await
2392            .unwrap();
2393        buffer_b.read_with(cx_b, |buf, _| {
2394            assert!(!buf.is_dirty());
2395            assert!(!buf.has_conflict());
2396        });
2397
2398        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2399            .await
2400            .unwrap();
2401        buffer_b
2402            .condition(&cx_b, |buf, _| {
2403                buf.text() == "new contents" && !buf.is_dirty()
2404            })
2405            .await;
2406        buffer_b.read_with(cx_b, |buf, _| {
2407            assert!(!buf.has_conflict());
2408        });
2409    }
2410
2411    #[gpui::test(iterations = 10)]
2412    async fn test_editing_while_guest_opens_buffer(
2413        cx_a: &mut TestAppContext,
2414        cx_b: &mut TestAppContext,
2415    ) {
2416        cx_a.foreground().forbid_parking();
2417        let lang_registry = Arc::new(LanguageRegistry::test());
2418        let fs = FakeFs::new(cx_a.background());
2419
2420        // Connect to a server as 2 clients.
2421        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2422        let client_a = server.create_client(cx_a, "user_a").await;
2423        let client_b = server.create_client(cx_b, "user_b").await;
2424        server
2425            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2426            .await;
2427
2428        // Share a project as client A
2429        fs.insert_tree(
2430            "/dir",
2431            json!({
2432                "a.txt": "a-contents",
2433            }),
2434        )
2435        .await;
2436        let project_a = cx_a.update(|cx| {
2437            Project::local(
2438                client_a.clone(),
2439                client_a.user_store.clone(),
2440                lang_registry.clone(),
2441                fs.clone(),
2442                cx,
2443            )
2444        });
2445        let (worktree_a, _) = project_a
2446            .update(cx_a, |p, cx| {
2447                p.find_or_create_local_worktree("/dir", true, cx)
2448            })
2449            .await
2450            .unwrap();
2451        worktree_a
2452            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2453            .await;
2454        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2455        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2456        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2457
2458        // Join that project as client B
2459        let project_b = Project::remote(
2460            project_id,
2461            client_b.clone(),
2462            client_b.user_store.clone(),
2463            lang_registry.clone(),
2464            fs.clone(),
2465            &mut cx_b.to_async(),
2466        )
2467        .await
2468        .unwrap();
2469
2470        // Open a buffer as client A
2471        let buffer_a = project_a
2472            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2473            .await
2474            .unwrap();
2475
2476        // Start opening the same buffer as client B
2477        let buffer_b = cx_b
2478            .background()
2479            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2480
2481        // Edit the buffer as client A while client B is still opening it.
2482        cx_b.background().simulate_random_delay().await;
2483        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2484        cx_b.background().simulate_random_delay().await;
2485        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2486
2487        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2488        let buffer_b = buffer_b.await.unwrap();
2489        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2490    }
2491
2492    #[gpui::test(iterations = 10)]
2493    async fn test_leaving_worktree_while_opening_buffer(
2494        cx_a: &mut TestAppContext,
2495        cx_b: &mut TestAppContext,
2496    ) {
2497        cx_a.foreground().forbid_parking();
2498        let lang_registry = Arc::new(LanguageRegistry::test());
2499        let fs = FakeFs::new(cx_a.background());
2500
2501        // Connect to a server as 2 clients.
2502        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2503        let client_a = server.create_client(cx_a, "user_a").await;
2504        let client_b = server.create_client(cx_b, "user_b").await;
2505        server
2506            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2507            .await;
2508
2509        // Share a project as client A
2510        fs.insert_tree(
2511            "/dir",
2512            json!({
2513                "a.txt": "a-contents",
2514            }),
2515        )
2516        .await;
2517        let project_a = cx_a.update(|cx| {
2518            Project::local(
2519                client_a.clone(),
2520                client_a.user_store.clone(),
2521                lang_registry.clone(),
2522                fs.clone(),
2523                cx,
2524            )
2525        });
2526        let (worktree_a, _) = project_a
2527            .update(cx_a, |p, cx| {
2528                p.find_or_create_local_worktree("/dir", true, cx)
2529            })
2530            .await
2531            .unwrap();
2532        worktree_a
2533            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2534            .await;
2535        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2536        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2537        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2538
2539        // Join that project as client B
2540        let project_b = Project::remote(
2541            project_id,
2542            client_b.clone(),
2543            client_b.user_store.clone(),
2544            lang_registry.clone(),
2545            fs.clone(),
2546            &mut cx_b.to_async(),
2547        )
2548        .await
2549        .unwrap();
2550
2551        // See that a guest has joined as client A.
2552        project_a
2553            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2554            .await;
2555
2556        // Begin opening a buffer as client B, but leave the project before the open completes.
2557        let buffer_b = cx_b
2558            .background()
2559            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2560        cx_b.update(|_| drop(project_b));
2561        drop(buffer_b);
2562
2563        // See that the guest has left.
2564        project_a
2565            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2566            .await;
2567    }
2568
2569    #[gpui::test(iterations = 10)]
2570    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2571        cx_a.foreground().forbid_parking();
2572        let lang_registry = Arc::new(LanguageRegistry::test());
2573        let fs = FakeFs::new(cx_a.background());
2574
2575        // Connect to a server as 2 clients.
2576        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2577        let client_a = server.create_client(cx_a, "user_a").await;
2578        let client_b = server.create_client(cx_b, "user_b").await;
2579        server
2580            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2581            .await;
2582
2583        // Share a project as client A
2584        fs.insert_tree(
2585            "/a",
2586            json!({
2587                "a.txt": "a-contents",
2588                "b.txt": "b-contents",
2589            }),
2590        )
2591        .await;
2592        let project_a = cx_a.update(|cx| {
2593            Project::local(
2594                client_a.clone(),
2595                client_a.user_store.clone(),
2596                lang_registry.clone(),
2597                fs.clone(),
2598                cx,
2599            )
2600        });
2601        let (worktree_a, _) = project_a
2602            .update(cx_a, |p, cx| {
2603                p.find_or_create_local_worktree("/a", true, cx)
2604            })
2605            .await
2606            .unwrap();
2607        worktree_a
2608            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2609            .await;
2610        let project_id = project_a
2611            .update(cx_a, |project, _| project.next_remote_id())
2612            .await;
2613        project_a
2614            .update(cx_a, |project, cx| project.share(cx))
2615            .await
2616            .unwrap();
2617
2618        // Join that project as client B
2619        let _project_b = Project::remote(
2620            project_id,
2621            client_b.clone(),
2622            client_b.user_store.clone(),
2623            lang_registry.clone(),
2624            fs.clone(),
2625            &mut cx_b.to_async(),
2626        )
2627        .await
2628        .unwrap();
2629
2630        // Client A sees that a guest has joined.
2631        project_a
2632            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2633            .await;
2634
2635        // Drop client B's connection and ensure client A observes client B leaving the project.
2636        client_b.disconnect(&cx_b.to_async()).unwrap();
2637        project_a
2638            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2639            .await;
2640
2641        // Rejoin the project as client B
2642        let _project_b = Project::remote(
2643            project_id,
2644            client_b.clone(),
2645            client_b.user_store.clone(),
2646            lang_registry.clone(),
2647            fs.clone(),
2648            &mut cx_b.to_async(),
2649        )
2650        .await
2651        .unwrap();
2652
2653        // Client A sees that a guest has re-joined.
2654        project_a
2655            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2656            .await;
2657
2658        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2659        client_b.wait_for_current_user(cx_b).await;
2660        server.disconnect_client(client_b.current_user_id(cx_b));
2661        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2662        project_a
2663            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2664            .await;
2665    }
2666
2667    #[gpui::test(iterations = 10)]
2668    async fn test_collaborating_with_diagnostics(
2669        cx_a: &mut TestAppContext,
2670        cx_b: &mut TestAppContext,
2671    ) {
2672        cx_a.foreground().forbid_parking();
2673        let lang_registry = Arc::new(LanguageRegistry::test());
2674        let fs = FakeFs::new(cx_a.background());
2675
2676        // Set up a fake language server.
2677        let mut language = Language::new(
2678            LanguageConfig {
2679                name: "Rust".into(),
2680                path_suffixes: vec!["rs".to_string()],
2681                ..Default::default()
2682            },
2683            Some(tree_sitter_rust::language()),
2684        );
2685        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2686        lang_registry.add(Arc::new(language));
2687
2688        // Connect to a server as 2 clients.
2689        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2690        let client_a = server.create_client(cx_a, "user_a").await;
2691        let client_b = server.create_client(cx_b, "user_b").await;
2692        server
2693            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2694            .await;
2695
2696        // Share a project as client A
2697        fs.insert_tree(
2698            "/a",
2699            json!({
2700                "a.rs": "let one = two",
2701                "other.rs": "",
2702            }),
2703        )
2704        .await;
2705        let project_a = cx_a.update(|cx| {
2706            Project::local(
2707                client_a.clone(),
2708                client_a.user_store.clone(),
2709                lang_registry.clone(),
2710                fs.clone(),
2711                cx,
2712            )
2713        });
2714        let (worktree_a, _) = project_a
2715            .update(cx_a, |p, cx| {
2716                p.find_or_create_local_worktree("/a", true, cx)
2717            })
2718            .await
2719            .unwrap();
2720        worktree_a
2721            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2722            .await;
2723        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2724        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2725        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2726
2727        // Cause the language server to start.
2728        let _ = cx_a
2729            .background()
2730            .spawn(project_a.update(cx_a, |project, cx| {
2731                project.open_buffer(
2732                    ProjectPath {
2733                        worktree_id,
2734                        path: Path::new("other.rs").into(),
2735                    },
2736                    cx,
2737                )
2738            }))
2739            .await
2740            .unwrap();
2741
2742        // Simulate a language server reporting errors for a file.
2743        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2744        fake_language_server
2745            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2746            .await;
2747        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2748            lsp::PublishDiagnosticsParams {
2749                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2750                version: None,
2751                diagnostics: vec![lsp::Diagnostic {
2752                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2753                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2754                    message: "message 1".to_string(),
2755                    ..Default::default()
2756                }],
2757            },
2758        );
2759
2760        // Wait for server to see the diagnostics update.
2761        server
2762            .condition(|store| {
2763                let worktree = store
2764                    .project(project_id)
2765                    .unwrap()
2766                    .share
2767                    .as_ref()
2768                    .unwrap()
2769                    .worktrees
2770                    .get(&worktree_id.to_proto())
2771                    .unwrap();
2772
2773                !worktree.diagnostic_summaries.is_empty()
2774            })
2775            .await;
2776
2777        // Join the worktree as client B.
2778        let project_b = Project::remote(
2779            project_id,
2780            client_b.clone(),
2781            client_b.user_store.clone(),
2782            lang_registry.clone(),
2783            fs.clone(),
2784            &mut cx_b.to_async(),
2785        )
2786        .await
2787        .unwrap();
2788
2789        project_b.read_with(cx_b, |project, cx| {
2790            assert_eq!(
2791                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2792                &[(
2793                    ProjectPath {
2794                        worktree_id,
2795                        path: Arc::from(Path::new("a.rs")),
2796                    },
2797                    DiagnosticSummary {
2798                        error_count: 1,
2799                        warning_count: 0,
2800                        ..Default::default()
2801                    },
2802                )]
2803            )
2804        });
2805
2806        // Simulate a language server reporting more errors for a file.
2807        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2808            lsp::PublishDiagnosticsParams {
2809                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2810                version: None,
2811                diagnostics: vec![
2812                    lsp::Diagnostic {
2813                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2814                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2815                        message: "message 1".to_string(),
2816                        ..Default::default()
2817                    },
2818                    lsp::Diagnostic {
2819                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2820                        range: lsp::Range::new(
2821                            lsp::Position::new(0, 10),
2822                            lsp::Position::new(0, 13),
2823                        ),
2824                        message: "message 2".to_string(),
2825                        ..Default::default()
2826                    },
2827                ],
2828            },
2829        );
2830
2831        // Client b gets the updated summaries
2832        project_b
2833            .condition(&cx_b, |project, cx| {
2834                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2835                    == &[(
2836                        ProjectPath {
2837                            worktree_id,
2838                            path: Arc::from(Path::new("a.rs")),
2839                        },
2840                        DiagnosticSummary {
2841                            error_count: 1,
2842                            warning_count: 1,
2843                            ..Default::default()
2844                        },
2845                    )]
2846            })
2847            .await;
2848
2849        // Open the file with the errors on client B. They should be present.
2850        let buffer_b = cx_b
2851            .background()
2852            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2853            .await
2854            .unwrap();
2855
2856        buffer_b.read_with(cx_b, |buffer, _| {
2857            assert_eq!(
2858                buffer
2859                    .snapshot()
2860                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2861                    .map(|entry| entry)
2862                    .collect::<Vec<_>>(),
2863                &[
2864                    DiagnosticEntry {
2865                        range: Point::new(0, 4)..Point::new(0, 7),
2866                        diagnostic: Diagnostic {
2867                            group_id: 0,
2868                            message: "message 1".to_string(),
2869                            severity: lsp::DiagnosticSeverity::ERROR,
2870                            is_primary: true,
2871                            ..Default::default()
2872                        }
2873                    },
2874                    DiagnosticEntry {
2875                        range: Point::new(0, 10)..Point::new(0, 13),
2876                        diagnostic: Diagnostic {
2877                            group_id: 1,
2878                            severity: lsp::DiagnosticSeverity::WARNING,
2879                            message: "message 2".to_string(),
2880                            is_primary: true,
2881                            ..Default::default()
2882                        }
2883                    }
2884                ]
2885            );
2886        });
2887
2888        // Simulate a language server reporting no errors for a file.
2889        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2890            lsp::PublishDiagnosticsParams {
2891                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2892                version: None,
2893                diagnostics: vec![],
2894            },
2895        );
2896        project_a
2897            .condition(cx_a, |project, cx| {
2898                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2899            })
2900            .await;
2901        project_b
2902            .condition(cx_b, |project, cx| {
2903                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2904            })
2905            .await;
2906    }
2907
2908    #[gpui::test(iterations = 10)]
2909    async fn test_collaborating_with_completion(
2910        cx_a: &mut TestAppContext,
2911        cx_b: &mut TestAppContext,
2912    ) {
2913        cx_a.foreground().forbid_parking();
2914        let lang_registry = Arc::new(LanguageRegistry::test());
2915        let fs = FakeFs::new(cx_a.background());
2916
2917        // Set up a fake language server.
2918        let mut language = Language::new(
2919            LanguageConfig {
2920                name: "Rust".into(),
2921                path_suffixes: vec!["rs".to_string()],
2922                ..Default::default()
2923            },
2924            Some(tree_sitter_rust::language()),
2925        );
2926        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2927            capabilities: lsp::ServerCapabilities {
2928                completion_provider: Some(lsp::CompletionOptions {
2929                    trigger_characters: Some(vec![".".to_string()]),
2930                    ..Default::default()
2931                }),
2932                ..Default::default()
2933            },
2934            ..Default::default()
2935        });
2936        lang_registry.add(Arc::new(language));
2937
2938        // Connect to a server as 2 clients.
2939        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2940        let client_a = server.create_client(cx_a, "user_a").await;
2941        let client_b = server.create_client(cx_b, "user_b").await;
2942        server
2943            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2944            .await;
2945
2946        // Share a project as client A
2947        fs.insert_tree(
2948            "/a",
2949            json!({
2950                "main.rs": "fn main() { a }",
2951                "other.rs": "",
2952            }),
2953        )
2954        .await;
2955        let project_a = cx_a.update(|cx| {
2956            Project::local(
2957                client_a.clone(),
2958                client_a.user_store.clone(),
2959                lang_registry.clone(),
2960                fs.clone(),
2961                cx,
2962            )
2963        });
2964        let (worktree_a, _) = project_a
2965            .update(cx_a, |p, cx| {
2966                p.find_or_create_local_worktree("/a", true, cx)
2967            })
2968            .await
2969            .unwrap();
2970        worktree_a
2971            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2972            .await;
2973        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2974        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2975        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2976
2977        // Join the worktree as client B.
2978        let project_b = Project::remote(
2979            project_id,
2980            client_b.clone(),
2981            client_b.user_store.clone(),
2982            lang_registry.clone(),
2983            fs.clone(),
2984            &mut cx_b.to_async(),
2985        )
2986        .await
2987        .unwrap();
2988
2989        // Open a file in an editor as the guest.
2990        let buffer_b = project_b
2991            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2992            .await
2993            .unwrap();
2994        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2995        let editor_b = cx_b.add_view(window_b, |cx| {
2996            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2997        });
2998
2999        let fake_language_server = fake_language_servers.next().await.unwrap();
3000        buffer_b
3001            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3002            .await;
3003
3004        // Type a completion trigger character as the guest.
3005        editor_b.update(cx_b, |editor, cx| {
3006            editor.change_selections(None, cx, |s| s.select_ranges([13..13]));
3007            editor.handle_input(&Input(".".into()), cx);
3008            cx.focus(&editor_b);
3009        });
3010
3011        // Receive a completion request as the host's language server.
3012        // Return some completions from the host's language server.
3013        cx_a.foreground().start_waiting();
3014        fake_language_server
3015            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3016                assert_eq!(
3017                    params.text_document_position.text_document.uri,
3018                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3019                );
3020                assert_eq!(
3021                    params.text_document_position.position,
3022                    lsp::Position::new(0, 14),
3023                );
3024
3025                Ok(Some(lsp::CompletionResponse::Array(vec![
3026                    lsp::CompletionItem {
3027                        label: "first_method(…)".into(),
3028                        detail: Some("fn(&mut self, B) -> C".into()),
3029                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3030                            new_text: "first_method($1)".to_string(),
3031                            range: lsp::Range::new(
3032                                lsp::Position::new(0, 14),
3033                                lsp::Position::new(0, 14),
3034                            ),
3035                        })),
3036                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3037                        ..Default::default()
3038                    },
3039                    lsp::CompletionItem {
3040                        label: "second_method(…)".into(),
3041                        detail: Some("fn(&mut self, C) -> D<E>".into()),
3042                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3043                            new_text: "second_method()".to_string(),
3044                            range: lsp::Range::new(
3045                                lsp::Position::new(0, 14),
3046                                lsp::Position::new(0, 14),
3047                            ),
3048                        })),
3049                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3050                        ..Default::default()
3051                    },
3052                ])))
3053            })
3054            .next()
3055            .await
3056            .unwrap();
3057        cx_a.foreground().finish_waiting();
3058
3059        // Open the buffer on the host.
3060        let buffer_a = project_a
3061            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3062            .await
3063            .unwrap();
3064        buffer_a
3065            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3066            .await;
3067
3068        // Confirm a completion on the guest.
3069        editor_b
3070            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3071            .await;
3072        editor_b.update(cx_b, |editor, cx| {
3073            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3074            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3075        });
3076
3077        // Return a resolved completion from the host's language server.
3078        // The resolved completion has an additional text edit.
3079        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3080            |params, _| async move {
3081                assert_eq!(params.label, "first_method(…)");
3082                Ok(lsp::CompletionItem {
3083                    label: "first_method(…)".into(),
3084                    detail: Some("fn(&mut self, B) -> C".into()),
3085                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3086                        new_text: "first_method($1)".to_string(),
3087                        range: lsp::Range::new(
3088                            lsp::Position::new(0, 14),
3089                            lsp::Position::new(0, 14),
3090                        ),
3091                    })),
3092                    additional_text_edits: Some(vec![lsp::TextEdit {
3093                        new_text: "use d::SomeTrait;\n".to_string(),
3094                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3095                    }]),
3096                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3097                    ..Default::default()
3098                })
3099            },
3100        );
3101
3102        // The additional edit is applied.
3103        buffer_a
3104            .condition(&cx_a, |buffer, _| {
3105                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3106            })
3107            .await;
3108        buffer_b
3109            .condition(&cx_b, |buffer, _| {
3110                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3111            })
3112            .await;
3113    }
3114
3115    #[gpui::test(iterations = 10)]
3116    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3117        cx_a.foreground().forbid_parking();
3118        let lang_registry = Arc::new(LanguageRegistry::test());
3119        let fs = FakeFs::new(cx_a.background());
3120
3121        // Connect to a server as 2 clients.
3122        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3123        let client_a = server.create_client(cx_a, "user_a").await;
3124        let client_b = server.create_client(cx_b, "user_b").await;
3125        server
3126            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3127            .await;
3128
3129        // Share a project as client A
3130        fs.insert_tree(
3131            "/a",
3132            json!({
3133                "a.rs": "let one = 1;",
3134            }),
3135        )
3136        .await;
3137        let project_a = cx_a.update(|cx| {
3138            Project::local(
3139                client_a.clone(),
3140                client_a.user_store.clone(),
3141                lang_registry.clone(),
3142                fs.clone(),
3143                cx,
3144            )
3145        });
3146        let (worktree_a, _) = project_a
3147            .update(cx_a, |p, cx| {
3148                p.find_or_create_local_worktree("/a", true, cx)
3149            })
3150            .await
3151            .unwrap();
3152        worktree_a
3153            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3154            .await;
3155        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3156        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3157        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3158        let buffer_a = project_a
3159            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3160            .await
3161            .unwrap();
3162
3163        // Join the worktree as client B.
3164        let project_b = Project::remote(
3165            project_id,
3166            client_b.clone(),
3167            client_b.user_store.clone(),
3168            lang_registry.clone(),
3169            fs.clone(),
3170            &mut cx_b.to_async(),
3171        )
3172        .await
3173        .unwrap();
3174
3175        let buffer_b = cx_b
3176            .background()
3177            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3178            .await
3179            .unwrap();
3180        buffer_b.update(cx_b, |buffer, cx| {
3181            buffer.edit([(4..7, "six")], cx);
3182            buffer.edit([(10..11, "6")], cx);
3183            assert_eq!(buffer.text(), "let six = 6;");
3184            assert!(buffer.is_dirty());
3185            assert!(!buffer.has_conflict());
3186        });
3187        buffer_a
3188            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3189            .await;
3190
3191        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3192            .await
3193            .unwrap();
3194        buffer_a
3195            .condition(cx_a, |buffer, _| buffer.has_conflict())
3196            .await;
3197        buffer_b
3198            .condition(cx_b, |buffer, _| buffer.has_conflict())
3199            .await;
3200
3201        project_b
3202            .update(cx_b, |project, cx| {
3203                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3204            })
3205            .await
3206            .unwrap();
3207        buffer_a.read_with(cx_a, |buffer, _| {
3208            assert_eq!(buffer.text(), "let seven = 7;");
3209            assert!(!buffer.is_dirty());
3210            assert!(!buffer.has_conflict());
3211        });
3212        buffer_b.read_with(cx_b, |buffer, _| {
3213            assert_eq!(buffer.text(), "let seven = 7;");
3214            assert!(!buffer.is_dirty());
3215            assert!(!buffer.has_conflict());
3216        });
3217
3218        buffer_a.update(cx_a, |buffer, cx| {
3219            // Undoing on the host is a no-op when the reload was initiated by the guest.
3220            buffer.undo(cx);
3221            assert_eq!(buffer.text(), "let seven = 7;");
3222            assert!(!buffer.is_dirty());
3223            assert!(!buffer.has_conflict());
3224        });
3225        buffer_b.update(cx_b, |buffer, cx| {
3226            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3227            buffer.undo(cx);
3228            assert_eq!(buffer.text(), "let six = 6;");
3229            assert!(buffer.is_dirty());
3230            assert!(!buffer.has_conflict());
3231        });
3232    }
3233
3234    #[gpui::test(iterations = 10)]
3235    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3236        cx_a.foreground().forbid_parking();
3237        let lang_registry = Arc::new(LanguageRegistry::test());
3238        let fs = FakeFs::new(cx_a.background());
3239
3240        // Set up a fake language server.
3241        let mut language = Language::new(
3242            LanguageConfig {
3243                name: "Rust".into(),
3244                path_suffixes: vec!["rs".to_string()],
3245                ..Default::default()
3246            },
3247            Some(tree_sitter_rust::language()),
3248        );
3249        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3250        lang_registry.add(Arc::new(language));
3251
3252        // Connect to a server as 2 clients.
3253        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3254        let client_a = server.create_client(cx_a, "user_a").await;
3255        let client_b = server.create_client(cx_b, "user_b").await;
3256        server
3257            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3258            .await;
3259
3260        // Share a project as client A
3261        fs.insert_tree(
3262            "/a",
3263            json!({
3264                "a.rs": "let one = two",
3265            }),
3266        )
3267        .await;
3268        let project_a = cx_a.update(|cx| {
3269            Project::local(
3270                client_a.clone(),
3271                client_a.user_store.clone(),
3272                lang_registry.clone(),
3273                fs.clone(),
3274                cx,
3275            )
3276        });
3277        let (worktree_a, _) = project_a
3278            .update(cx_a, |p, cx| {
3279                p.find_or_create_local_worktree("/a", true, cx)
3280            })
3281            .await
3282            .unwrap();
3283        worktree_a
3284            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3285            .await;
3286        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3287        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3288        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3289
3290        // Join the worktree as client B.
3291        let project_b = Project::remote(
3292            project_id,
3293            client_b.clone(),
3294            client_b.user_store.clone(),
3295            lang_registry.clone(),
3296            fs.clone(),
3297            &mut cx_b.to_async(),
3298        )
3299        .await
3300        .unwrap();
3301
3302        let buffer_b = cx_b
3303            .background()
3304            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3305            .await
3306            .unwrap();
3307
3308        let fake_language_server = fake_language_servers.next().await.unwrap();
3309        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3310            Ok(Some(vec![
3311                lsp::TextEdit {
3312                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3313                    new_text: "h".to_string(),
3314                },
3315                lsp::TextEdit {
3316                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3317                    new_text: "y".to_string(),
3318                },
3319            ]))
3320        });
3321
3322        project_b
3323            .update(cx_b, |project, cx| {
3324                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3325            })
3326            .await
3327            .unwrap();
3328        assert_eq!(
3329            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3330            "let honey = two"
3331        );
3332    }
3333
3334    #[gpui::test(iterations = 10)]
3335    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3336        cx_a.foreground().forbid_parking();
3337        let lang_registry = Arc::new(LanguageRegistry::test());
3338        let fs = FakeFs::new(cx_a.background());
3339        fs.insert_tree(
3340            "/root-1",
3341            json!({
3342                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3343            }),
3344        )
3345        .await;
3346        fs.insert_tree(
3347            "/root-2",
3348            json!({
3349                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3350            }),
3351        )
3352        .await;
3353
3354        // Set up a fake language server.
3355        let mut language = Language::new(
3356            LanguageConfig {
3357                name: "Rust".into(),
3358                path_suffixes: vec!["rs".to_string()],
3359                ..Default::default()
3360            },
3361            Some(tree_sitter_rust::language()),
3362        );
3363        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3364        lang_registry.add(Arc::new(language));
3365
3366        // Connect to a server as 2 clients.
3367        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3368        let client_a = server.create_client(cx_a, "user_a").await;
3369        let client_b = server.create_client(cx_b, "user_b").await;
3370        server
3371            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3372            .await;
3373
3374        // Share a project as client A
3375        let project_a = cx_a.update(|cx| {
3376            Project::local(
3377                client_a.clone(),
3378                client_a.user_store.clone(),
3379                lang_registry.clone(),
3380                fs.clone(),
3381                cx,
3382            )
3383        });
3384        let (worktree_a, _) = project_a
3385            .update(cx_a, |p, cx| {
3386                p.find_or_create_local_worktree("/root-1", true, cx)
3387            })
3388            .await
3389            .unwrap();
3390        worktree_a
3391            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3392            .await;
3393        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3394        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3395        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3396
3397        // Join the worktree as client B.
3398        let project_b = Project::remote(
3399            project_id,
3400            client_b.clone(),
3401            client_b.user_store.clone(),
3402            lang_registry.clone(),
3403            fs.clone(),
3404            &mut cx_b.to_async(),
3405        )
3406        .await
3407        .unwrap();
3408
3409        // Open the file on client B.
3410        let buffer_b = cx_b
3411            .background()
3412            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3413            .await
3414            .unwrap();
3415
3416        // Request the definition of a symbol as the guest.
3417        let fake_language_server = fake_language_servers.next().await.unwrap();
3418        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3419            |_, _| async move {
3420                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3421                    lsp::Location::new(
3422                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3423                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3424                    ),
3425                )))
3426            },
3427        );
3428
3429        let definitions_1 = project_b
3430            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3431            .await
3432            .unwrap();
3433        cx_b.read(|cx| {
3434            assert_eq!(definitions_1.len(), 1);
3435            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3436            let target_buffer = definitions_1[0].buffer.read(cx);
3437            assert_eq!(
3438                target_buffer.text(),
3439                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3440            );
3441            assert_eq!(
3442                definitions_1[0].range.to_point(target_buffer),
3443                Point::new(0, 6)..Point::new(0, 9)
3444            );
3445        });
3446
3447        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3448        // the previous call to `definition`.
3449        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3450            |_, _| async move {
3451                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3452                    lsp::Location::new(
3453                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3454                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3455                    ),
3456                )))
3457            },
3458        );
3459
3460        let definitions_2 = project_b
3461            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3462            .await
3463            .unwrap();
3464        cx_b.read(|cx| {
3465            assert_eq!(definitions_2.len(), 1);
3466            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3467            let target_buffer = definitions_2[0].buffer.read(cx);
3468            assert_eq!(
3469                target_buffer.text(),
3470                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3471            );
3472            assert_eq!(
3473                definitions_2[0].range.to_point(target_buffer),
3474                Point::new(1, 6)..Point::new(1, 11)
3475            );
3476        });
3477        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3478    }
3479
3480    #[gpui::test(iterations = 10)]
3481    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3482        cx_a.foreground().forbid_parking();
3483        let lang_registry = Arc::new(LanguageRegistry::test());
3484        let fs = FakeFs::new(cx_a.background());
3485        fs.insert_tree(
3486            "/root-1",
3487            json!({
3488                "one.rs": "const ONE: usize = 1;",
3489                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3490            }),
3491        )
3492        .await;
3493        fs.insert_tree(
3494            "/root-2",
3495            json!({
3496                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3497            }),
3498        )
3499        .await;
3500
3501        // Set up a fake language server.
3502        let mut language = Language::new(
3503            LanguageConfig {
3504                name: "Rust".into(),
3505                path_suffixes: vec!["rs".to_string()],
3506                ..Default::default()
3507            },
3508            Some(tree_sitter_rust::language()),
3509        );
3510        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3511        lang_registry.add(Arc::new(language));
3512
3513        // Connect to a server as 2 clients.
3514        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3515        let client_a = server.create_client(cx_a, "user_a").await;
3516        let client_b = server.create_client(cx_b, "user_b").await;
3517        server
3518            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3519            .await;
3520
3521        // Share a project as client A
3522        let project_a = cx_a.update(|cx| {
3523            Project::local(
3524                client_a.clone(),
3525                client_a.user_store.clone(),
3526                lang_registry.clone(),
3527                fs.clone(),
3528                cx,
3529            )
3530        });
3531        let (worktree_a, _) = project_a
3532            .update(cx_a, |p, cx| {
3533                p.find_or_create_local_worktree("/root-1", true, cx)
3534            })
3535            .await
3536            .unwrap();
3537        worktree_a
3538            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3539            .await;
3540        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3541        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3542        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3543
3544        // Join the worktree as client B.
3545        let project_b = Project::remote(
3546            project_id,
3547            client_b.clone(),
3548            client_b.user_store.clone(),
3549            lang_registry.clone(),
3550            fs.clone(),
3551            &mut cx_b.to_async(),
3552        )
3553        .await
3554        .unwrap();
3555
3556        // Open the file on client B.
3557        let buffer_b = cx_b
3558            .background()
3559            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3560            .await
3561            .unwrap();
3562
3563        // Request references to a symbol as the guest.
3564        let fake_language_server = fake_language_servers.next().await.unwrap();
3565        fake_language_server.handle_request::<lsp::request::References, _, _>(
3566            |params, _| async move {
3567                assert_eq!(
3568                    params.text_document_position.text_document.uri.as_str(),
3569                    "file:///root-1/one.rs"
3570                );
3571                Ok(Some(vec![
3572                    lsp::Location {
3573                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3574                        range: lsp::Range::new(
3575                            lsp::Position::new(0, 24),
3576                            lsp::Position::new(0, 27),
3577                        ),
3578                    },
3579                    lsp::Location {
3580                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3581                        range: lsp::Range::new(
3582                            lsp::Position::new(0, 35),
3583                            lsp::Position::new(0, 38),
3584                        ),
3585                    },
3586                    lsp::Location {
3587                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3588                        range: lsp::Range::new(
3589                            lsp::Position::new(0, 37),
3590                            lsp::Position::new(0, 40),
3591                        ),
3592                    },
3593                ]))
3594            },
3595        );
3596
3597        let references = project_b
3598            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3599            .await
3600            .unwrap();
3601        cx_b.read(|cx| {
3602            assert_eq!(references.len(), 3);
3603            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3604
3605            let two_buffer = references[0].buffer.read(cx);
3606            let three_buffer = references[2].buffer.read(cx);
3607            assert_eq!(
3608                two_buffer.file().unwrap().path().as_ref(),
3609                Path::new("two.rs")
3610            );
3611            assert_eq!(references[1].buffer, references[0].buffer);
3612            assert_eq!(
3613                three_buffer.file().unwrap().full_path(cx),
3614                Path::new("three.rs")
3615            );
3616
3617            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3618            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3619            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3620        });
3621    }
3622
3623    #[gpui::test(iterations = 10)]
3624    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3625        cx_a.foreground().forbid_parking();
3626        let lang_registry = Arc::new(LanguageRegistry::test());
3627        let fs = FakeFs::new(cx_a.background());
3628        fs.insert_tree(
3629            "/root-1",
3630            json!({
3631                "a": "hello world",
3632                "b": "goodnight moon",
3633                "c": "a world of goo",
3634                "d": "world champion of clown world",
3635            }),
3636        )
3637        .await;
3638        fs.insert_tree(
3639            "/root-2",
3640            json!({
3641                "e": "disney world is fun",
3642            }),
3643        )
3644        .await;
3645
3646        // Connect to a server as 2 clients.
3647        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3648        let client_a = server.create_client(cx_a, "user_a").await;
3649        let client_b = server.create_client(cx_b, "user_b").await;
3650        server
3651            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3652            .await;
3653
3654        // Share a project as client A
3655        let project_a = cx_a.update(|cx| {
3656            Project::local(
3657                client_a.clone(),
3658                client_a.user_store.clone(),
3659                lang_registry.clone(),
3660                fs.clone(),
3661                cx,
3662            )
3663        });
3664        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3665
3666        let (worktree_1, _) = project_a
3667            .update(cx_a, |p, cx| {
3668                p.find_or_create_local_worktree("/root-1", true, cx)
3669            })
3670            .await
3671            .unwrap();
3672        worktree_1
3673            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3674            .await;
3675        let (worktree_2, _) = project_a
3676            .update(cx_a, |p, cx| {
3677                p.find_or_create_local_worktree("/root-2", true, cx)
3678            })
3679            .await
3680            .unwrap();
3681        worktree_2
3682            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3683            .await;
3684
3685        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3686
3687        // Join the worktree as client B.
3688        let project_b = Project::remote(
3689            project_id,
3690            client_b.clone(),
3691            client_b.user_store.clone(),
3692            lang_registry.clone(),
3693            fs.clone(),
3694            &mut cx_b.to_async(),
3695        )
3696        .await
3697        .unwrap();
3698
3699        let results = project_b
3700            .update(cx_b, |project, cx| {
3701                project.search(SearchQuery::text("world", false, false), cx)
3702            })
3703            .await
3704            .unwrap();
3705
3706        let mut ranges_by_path = results
3707            .into_iter()
3708            .map(|(buffer, ranges)| {
3709                buffer.read_with(cx_b, |buffer, cx| {
3710                    let path = buffer.file().unwrap().full_path(cx);
3711                    let offset_ranges = ranges
3712                        .into_iter()
3713                        .map(|range| range.to_offset(buffer))
3714                        .collect::<Vec<_>>();
3715                    (path, offset_ranges)
3716                })
3717            })
3718            .collect::<Vec<_>>();
3719        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3720
3721        assert_eq!(
3722            ranges_by_path,
3723            &[
3724                (PathBuf::from("root-1/a"), vec![6..11]),
3725                (PathBuf::from("root-1/c"), vec![2..7]),
3726                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3727                (PathBuf::from("root-2/e"), vec![7..12]),
3728            ]
3729        );
3730    }
3731
3732    #[gpui::test(iterations = 10)]
3733    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3734        cx_a.foreground().forbid_parking();
3735        let lang_registry = Arc::new(LanguageRegistry::test());
3736        let fs = FakeFs::new(cx_a.background());
3737        fs.insert_tree(
3738            "/root-1",
3739            json!({
3740                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3741            }),
3742        )
3743        .await;
3744
3745        // Set up a fake language server.
3746        let mut language = Language::new(
3747            LanguageConfig {
3748                name: "Rust".into(),
3749                path_suffixes: vec!["rs".to_string()],
3750                ..Default::default()
3751            },
3752            Some(tree_sitter_rust::language()),
3753        );
3754        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3755        lang_registry.add(Arc::new(language));
3756
3757        // Connect to a server as 2 clients.
3758        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3759        let client_a = server.create_client(cx_a, "user_a").await;
3760        let client_b = server.create_client(cx_b, "user_b").await;
3761        server
3762            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3763            .await;
3764
3765        // Share a project as client A
3766        let project_a = cx_a.update(|cx| {
3767            Project::local(
3768                client_a.clone(),
3769                client_a.user_store.clone(),
3770                lang_registry.clone(),
3771                fs.clone(),
3772                cx,
3773            )
3774        });
3775        let (worktree_a, _) = project_a
3776            .update(cx_a, |p, cx| {
3777                p.find_or_create_local_worktree("/root-1", true, cx)
3778            })
3779            .await
3780            .unwrap();
3781        worktree_a
3782            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3783            .await;
3784        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3785        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3786        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3787
3788        // Join the worktree as client B.
3789        let project_b = Project::remote(
3790            project_id,
3791            client_b.clone(),
3792            client_b.user_store.clone(),
3793            lang_registry.clone(),
3794            fs.clone(),
3795            &mut cx_b.to_async(),
3796        )
3797        .await
3798        .unwrap();
3799
3800        // Open the file on client B.
3801        let buffer_b = cx_b
3802            .background()
3803            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3804            .await
3805            .unwrap();
3806
3807        // Request document highlights as the guest.
3808        let fake_language_server = fake_language_servers.next().await.unwrap();
3809        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3810            |params, _| async move {
3811                assert_eq!(
3812                    params
3813                        .text_document_position_params
3814                        .text_document
3815                        .uri
3816                        .as_str(),
3817                    "file:///root-1/main.rs"
3818                );
3819                assert_eq!(
3820                    params.text_document_position_params.position,
3821                    lsp::Position::new(0, 34)
3822                );
3823                Ok(Some(vec![
3824                    lsp::DocumentHighlight {
3825                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3826                        range: lsp::Range::new(
3827                            lsp::Position::new(0, 10),
3828                            lsp::Position::new(0, 16),
3829                        ),
3830                    },
3831                    lsp::DocumentHighlight {
3832                        kind: Some(lsp::DocumentHighlightKind::READ),
3833                        range: lsp::Range::new(
3834                            lsp::Position::new(0, 32),
3835                            lsp::Position::new(0, 38),
3836                        ),
3837                    },
3838                    lsp::DocumentHighlight {
3839                        kind: Some(lsp::DocumentHighlightKind::READ),
3840                        range: lsp::Range::new(
3841                            lsp::Position::new(0, 41),
3842                            lsp::Position::new(0, 47),
3843                        ),
3844                    },
3845                ]))
3846            },
3847        );
3848
3849        let highlights = project_b
3850            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3851            .await
3852            .unwrap();
3853        buffer_b.read_with(cx_b, |buffer, _| {
3854            let snapshot = buffer.snapshot();
3855
3856            let highlights = highlights
3857                .into_iter()
3858                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3859                .collect::<Vec<_>>();
3860            assert_eq!(
3861                highlights,
3862                &[
3863                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3864                    (lsp::DocumentHighlightKind::READ, 32..38),
3865                    (lsp::DocumentHighlightKind::READ, 41..47)
3866                ]
3867            )
3868        });
3869    }
3870
3871    #[gpui::test(iterations = 10)]
3872    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3873        cx_a.foreground().forbid_parking();
3874        let lang_registry = Arc::new(LanguageRegistry::test());
3875        let fs = FakeFs::new(cx_a.background());
3876        fs.insert_tree(
3877            "/code",
3878            json!({
3879                "crate-1": {
3880                    "one.rs": "const ONE: usize = 1;",
3881                },
3882                "crate-2": {
3883                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3884                },
3885                "private": {
3886                    "passwords.txt": "the-password",
3887                }
3888            }),
3889        )
3890        .await;
3891
3892        // Set up a fake language server.
3893        let mut language = Language::new(
3894            LanguageConfig {
3895                name: "Rust".into(),
3896                path_suffixes: vec!["rs".to_string()],
3897                ..Default::default()
3898            },
3899            Some(tree_sitter_rust::language()),
3900        );
3901        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3902        lang_registry.add(Arc::new(language));
3903
3904        // Connect to a server as 2 clients.
3905        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3906        let client_a = server.create_client(cx_a, "user_a").await;
3907        let client_b = server.create_client(cx_b, "user_b").await;
3908        server
3909            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3910            .await;
3911
3912        // Share a project as client A
3913        let project_a = cx_a.update(|cx| {
3914            Project::local(
3915                client_a.clone(),
3916                client_a.user_store.clone(),
3917                lang_registry.clone(),
3918                fs.clone(),
3919                cx,
3920            )
3921        });
3922        let (worktree_a, _) = project_a
3923            .update(cx_a, |p, cx| {
3924                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3925            })
3926            .await
3927            .unwrap();
3928        worktree_a
3929            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3930            .await;
3931        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3932        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3933        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3934
3935        // Join the worktree as client B.
3936        let project_b = Project::remote(
3937            project_id,
3938            client_b.clone(),
3939            client_b.user_store.clone(),
3940            lang_registry.clone(),
3941            fs.clone(),
3942            &mut cx_b.to_async(),
3943        )
3944        .await
3945        .unwrap();
3946
3947        // Cause the language server to start.
3948        let _buffer = cx_b
3949            .background()
3950            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3951            .await
3952            .unwrap();
3953
3954        let fake_language_server = fake_language_servers.next().await.unwrap();
3955        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3956            |_, _| async move {
3957                #[allow(deprecated)]
3958                Ok(Some(vec![lsp::SymbolInformation {
3959                    name: "TWO".into(),
3960                    location: lsp::Location {
3961                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3962                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3963                    },
3964                    kind: lsp::SymbolKind::CONSTANT,
3965                    tags: None,
3966                    container_name: None,
3967                    deprecated: None,
3968                }]))
3969            },
3970        );
3971
3972        // Request the definition of a symbol as the guest.
3973        let symbols = project_b
3974            .update(cx_b, |p, cx| p.symbols("two", cx))
3975            .await
3976            .unwrap();
3977        assert_eq!(symbols.len(), 1);
3978        assert_eq!(symbols[0].name, "TWO");
3979
3980        // Open one of the returned symbols.
3981        let buffer_b_2 = project_b
3982            .update(cx_b, |project, cx| {
3983                project.open_buffer_for_symbol(&symbols[0], cx)
3984            })
3985            .await
3986            .unwrap();
3987        buffer_b_2.read_with(cx_b, |buffer, _| {
3988            assert_eq!(
3989                buffer.file().unwrap().path().as_ref(),
3990                Path::new("../crate-2/two.rs")
3991            );
3992        });
3993
3994        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3995        let mut fake_symbol = symbols[0].clone();
3996        fake_symbol.path = Path::new("/code/secrets").into();
3997        let error = project_b
3998            .update(cx_b, |project, cx| {
3999                project.open_buffer_for_symbol(&fake_symbol, cx)
4000            })
4001            .await
4002            .unwrap_err();
4003        assert!(error.to_string().contains("invalid symbol signature"));
4004    }
4005
4006    #[gpui::test(iterations = 10)]
4007    async fn test_open_buffer_while_getting_definition_pointing_to_it(
4008        cx_a: &mut TestAppContext,
4009        cx_b: &mut TestAppContext,
4010        mut rng: StdRng,
4011    ) {
4012        cx_a.foreground().forbid_parking();
4013        let lang_registry = Arc::new(LanguageRegistry::test());
4014        let fs = FakeFs::new(cx_a.background());
4015        fs.insert_tree(
4016            "/root",
4017            json!({
4018                "a.rs": "const ONE: usize = b::TWO;",
4019                "b.rs": "const TWO: usize = 2",
4020            }),
4021        )
4022        .await;
4023
4024        // Set up a fake language server.
4025        let mut language = Language::new(
4026            LanguageConfig {
4027                name: "Rust".into(),
4028                path_suffixes: vec!["rs".to_string()],
4029                ..Default::default()
4030            },
4031            Some(tree_sitter_rust::language()),
4032        );
4033        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4034        lang_registry.add(Arc::new(language));
4035
4036        // Connect to a server as 2 clients.
4037        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4038        let client_a = server.create_client(cx_a, "user_a").await;
4039        let client_b = server.create_client(cx_b, "user_b").await;
4040        server
4041            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4042            .await;
4043
4044        // Share a project as client A
4045        let project_a = cx_a.update(|cx| {
4046            Project::local(
4047                client_a.clone(),
4048                client_a.user_store.clone(),
4049                lang_registry.clone(),
4050                fs.clone(),
4051                cx,
4052            )
4053        });
4054
4055        let (worktree_a, _) = project_a
4056            .update(cx_a, |p, cx| {
4057                p.find_or_create_local_worktree("/root", true, cx)
4058            })
4059            .await
4060            .unwrap();
4061        worktree_a
4062            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4063            .await;
4064        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4065        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4066        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4067
4068        // Join the worktree as client B.
4069        let project_b = Project::remote(
4070            project_id,
4071            client_b.clone(),
4072            client_b.user_store.clone(),
4073            lang_registry.clone(),
4074            fs.clone(),
4075            &mut cx_b.to_async(),
4076        )
4077        .await
4078        .unwrap();
4079
4080        let buffer_b1 = cx_b
4081            .background()
4082            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4083            .await
4084            .unwrap();
4085
4086        let fake_language_server = fake_language_servers.next().await.unwrap();
4087        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4088            |_, _| async move {
4089                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4090                    lsp::Location::new(
4091                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
4092                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4093                    ),
4094                )))
4095            },
4096        );
4097
4098        let definitions;
4099        let buffer_b2;
4100        if rng.gen() {
4101            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4102            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4103        } else {
4104            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4105            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4106        }
4107
4108        let buffer_b2 = buffer_b2.await.unwrap();
4109        let definitions = definitions.await.unwrap();
4110        assert_eq!(definitions.len(), 1);
4111        assert_eq!(definitions[0].buffer, buffer_b2);
4112    }
4113
4114    #[gpui::test(iterations = 10)]
4115    async fn test_collaborating_with_code_actions(
4116        cx_a: &mut TestAppContext,
4117        cx_b: &mut TestAppContext,
4118    ) {
4119        cx_a.foreground().forbid_parking();
4120        let lang_registry = Arc::new(LanguageRegistry::test());
4121        let fs = FakeFs::new(cx_a.background());
4122        cx_b.update(|cx| editor::init(cx));
4123
4124        // Set up a fake language server.
4125        let mut language = Language::new(
4126            LanguageConfig {
4127                name: "Rust".into(),
4128                path_suffixes: vec!["rs".to_string()],
4129                ..Default::default()
4130            },
4131            Some(tree_sitter_rust::language()),
4132        );
4133        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4134        lang_registry.add(Arc::new(language));
4135
4136        // Connect to a server as 2 clients.
4137        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4138        let client_a = server.create_client(cx_a, "user_a").await;
4139        let client_b = server.create_client(cx_b, "user_b").await;
4140        server
4141            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4142            .await;
4143
4144        // Share a project as client A
4145        fs.insert_tree(
4146            "/a",
4147            json!({
4148                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4149                "other.rs": "pub fn foo() -> usize { 4 }",
4150            }),
4151        )
4152        .await;
4153        let project_a = cx_a.update(|cx| {
4154            Project::local(
4155                client_a.clone(),
4156                client_a.user_store.clone(),
4157                lang_registry.clone(),
4158                fs.clone(),
4159                cx,
4160            )
4161        });
4162        let (worktree_a, _) = project_a
4163            .update(cx_a, |p, cx| {
4164                p.find_or_create_local_worktree("/a", true, cx)
4165            })
4166            .await
4167            .unwrap();
4168        worktree_a
4169            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4170            .await;
4171        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4172        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4173        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4174
4175        // Join the worktree as client B.
4176        let project_b = Project::remote(
4177            project_id,
4178            client_b.clone(),
4179            client_b.user_store.clone(),
4180            lang_registry.clone(),
4181            fs.clone(),
4182            &mut cx_b.to_async(),
4183        )
4184        .await
4185        .unwrap();
4186        let mut params = cx_b.update(WorkspaceParams::test);
4187        params.languages = lang_registry.clone();
4188        params.client = client_b.client.clone();
4189        params.user_store = client_b.user_store.clone();
4190        params.project = project_b;
4191
4192        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4193        let editor_b = workspace_b
4194            .update(cx_b, |workspace, cx| {
4195                workspace.open_path((worktree_id, "main.rs"), true, cx)
4196            })
4197            .await
4198            .unwrap()
4199            .downcast::<Editor>()
4200            .unwrap();
4201
4202        let mut fake_language_server = fake_language_servers.next().await.unwrap();
4203        fake_language_server
4204            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4205                assert_eq!(
4206                    params.text_document.uri,
4207                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4208                );
4209                assert_eq!(params.range.start, lsp::Position::new(0, 0));
4210                assert_eq!(params.range.end, lsp::Position::new(0, 0));
4211                Ok(None)
4212            })
4213            .next()
4214            .await;
4215
4216        // Move cursor to a location that contains code actions.
4217        editor_b.update(cx_b, |editor, cx| {
4218            editor.change_selections(None, cx, |s| {
4219                s.select_ranges([Point::new(1, 31)..Point::new(1, 31)])
4220            });
4221            cx.focus(&editor_b);
4222        });
4223
4224        fake_language_server
4225            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4226                assert_eq!(
4227                    params.text_document.uri,
4228                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4229                );
4230                assert_eq!(params.range.start, lsp::Position::new(1, 31));
4231                assert_eq!(params.range.end, lsp::Position::new(1, 31));
4232
4233                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4234                    lsp::CodeAction {
4235                        title: "Inline into all callers".to_string(),
4236                        edit: Some(lsp::WorkspaceEdit {
4237                            changes: Some(
4238                                [
4239                                    (
4240                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
4241                                        vec![lsp::TextEdit::new(
4242                                            lsp::Range::new(
4243                                                lsp::Position::new(1, 22),
4244                                                lsp::Position::new(1, 34),
4245                                            ),
4246                                            "4".to_string(),
4247                                        )],
4248                                    ),
4249                                    (
4250                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
4251                                        vec![lsp::TextEdit::new(
4252                                            lsp::Range::new(
4253                                                lsp::Position::new(0, 0),
4254                                                lsp::Position::new(0, 27),
4255                                            ),
4256                                            "".to_string(),
4257                                        )],
4258                                    ),
4259                                ]
4260                                .into_iter()
4261                                .collect(),
4262                            ),
4263                            ..Default::default()
4264                        }),
4265                        data: Some(json!({
4266                            "codeActionParams": {
4267                                "range": {
4268                                    "start": {"line": 1, "column": 31},
4269                                    "end": {"line": 1, "column": 31},
4270                                }
4271                            }
4272                        })),
4273                        ..Default::default()
4274                    },
4275                )]))
4276            })
4277            .next()
4278            .await;
4279
4280        // Toggle code actions and wait for them to display.
4281        editor_b.update(cx_b, |editor, cx| {
4282            editor.toggle_code_actions(
4283                &ToggleCodeActions {
4284                    deployed_from_indicator: false,
4285                },
4286                cx,
4287            );
4288        });
4289        editor_b
4290            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4291            .await;
4292
4293        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4294
4295        // Confirming the code action will trigger a resolve request.
4296        let confirm_action = workspace_b
4297            .update(cx_b, |workspace, cx| {
4298                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4299            })
4300            .unwrap();
4301        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4302            |_, _| async move {
4303                Ok(lsp::CodeAction {
4304                    title: "Inline into all callers".to_string(),
4305                    edit: Some(lsp::WorkspaceEdit {
4306                        changes: Some(
4307                            [
4308                                (
4309                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4310                                    vec![lsp::TextEdit::new(
4311                                        lsp::Range::new(
4312                                            lsp::Position::new(1, 22),
4313                                            lsp::Position::new(1, 34),
4314                                        ),
4315                                        "4".to_string(),
4316                                    )],
4317                                ),
4318                                (
4319                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4320                                    vec![lsp::TextEdit::new(
4321                                        lsp::Range::new(
4322                                            lsp::Position::new(0, 0),
4323                                            lsp::Position::new(0, 27),
4324                                        ),
4325                                        "".to_string(),
4326                                    )],
4327                                ),
4328                            ]
4329                            .into_iter()
4330                            .collect(),
4331                        ),
4332                        ..Default::default()
4333                    }),
4334                    ..Default::default()
4335                })
4336            },
4337        );
4338
4339        // After the action is confirmed, an editor containing both modified files is opened.
4340        confirm_action.await.unwrap();
4341        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4342            workspace
4343                .active_item(cx)
4344                .unwrap()
4345                .downcast::<Editor>()
4346                .unwrap()
4347        });
4348        code_action_editor.update(cx_b, |editor, cx| {
4349            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4350            editor.undo(&Undo, cx);
4351            assert_eq!(
4352                editor.text(cx),
4353                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4354            );
4355            editor.redo(&Redo, cx);
4356            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4357        });
4358    }
4359
4360    #[gpui::test(iterations = 10)]
4361    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4362        cx_a.foreground().forbid_parking();
4363        let lang_registry = Arc::new(LanguageRegistry::test());
4364        let fs = FakeFs::new(cx_a.background());
4365        cx_b.update(|cx| editor::init(cx));
4366
4367        // Set up a fake language server.
4368        let mut language = Language::new(
4369            LanguageConfig {
4370                name: "Rust".into(),
4371                path_suffixes: vec!["rs".to_string()],
4372                ..Default::default()
4373            },
4374            Some(tree_sitter_rust::language()),
4375        );
4376        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4377            capabilities: lsp::ServerCapabilities {
4378                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4379                    prepare_provider: Some(true),
4380                    work_done_progress_options: Default::default(),
4381                })),
4382                ..Default::default()
4383            },
4384            ..Default::default()
4385        });
4386        lang_registry.add(Arc::new(language));
4387
4388        // Connect to a server as 2 clients.
4389        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4390        let client_a = server.create_client(cx_a, "user_a").await;
4391        let client_b = server.create_client(cx_b, "user_b").await;
4392        server
4393            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4394            .await;
4395
4396        // Share a project as client A
4397        fs.insert_tree(
4398            "/dir",
4399            json!({
4400                "one.rs": "const ONE: usize = 1;",
4401                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4402            }),
4403        )
4404        .await;
4405        let project_a = cx_a.update(|cx| {
4406            Project::local(
4407                client_a.clone(),
4408                client_a.user_store.clone(),
4409                lang_registry.clone(),
4410                fs.clone(),
4411                cx,
4412            )
4413        });
4414        let (worktree_a, _) = project_a
4415            .update(cx_a, |p, cx| {
4416                p.find_or_create_local_worktree("/dir", true, cx)
4417            })
4418            .await
4419            .unwrap();
4420        worktree_a
4421            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4422            .await;
4423        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4424        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4425        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4426
4427        // Join the worktree as client B.
4428        let project_b = Project::remote(
4429            project_id,
4430            client_b.clone(),
4431            client_b.user_store.clone(),
4432            lang_registry.clone(),
4433            fs.clone(),
4434            &mut cx_b.to_async(),
4435        )
4436        .await
4437        .unwrap();
4438        let mut params = cx_b.update(WorkspaceParams::test);
4439        params.languages = lang_registry.clone();
4440        params.client = client_b.client.clone();
4441        params.user_store = client_b.user_store.clone();
4442        params.project = project_b;
4443
4444        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4445        let editor_b = workspace_b
4446            .update(cx_b, |workspace, cx| {
4447                workspace.open_path((worktree_id, "one.rs"), true, cx)
4448            })
4449            .await
4450            .unwrap()
4451            .downcast::<Editor>()
4452            .unwrap();
4453        let fake_language_server = fake_language_servers.next().await.unwrap();
4454
4455        // Move cursor to a location that can be renamed.
4456        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4457            editor.change_selections(None, cx, |s| s.select_ranges([7..7]));
4458            editor.rename(&Rename, cx).unwrap()
4459        });
4460
4461        fake_language_server
4462            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4463                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4464                assert_eq!(params.position, lsp::Position::new(0, 7));
4465                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4466                    lsp::Position::new(0, 6),
4467                    lsp::Position::new(0, 9),
4468                ))))
4469            })
4470            .next()
4471            .await
4472            .unwrap();
4473        prepare_rename.await.unwrap();
4474        editor_b.update(cx_b, |editor, cx| {
4475            let rename = editor.pending_rename().unwrap();
4476            let buffer = editor.buffer().read(cx).snapshot(cx);
4477            assert_eq!(
4478                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4479                6..9
4480            );
4481            rename.editor.update(cx, |rename_editor, cx| {
4482                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4483                    rename_buffer.edit([(0..3, "THREE")], cx);
4484                });
4485            });
4486        });
4487
4488        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4489            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4490        });
4491        fake_language_server
4492            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4493                assert_eq!(
4494                    params.text_document_position.text_document.uri.as_str(),
4495                    "file:///dir/one.rs"
4496                );
4497                assert_eq!(
4498                    params.text_document_position.position,
4499                    lsp::Position::new(0, 6)
4500                );
4501                assert_eq!(params.new_name, "THREE");
4502                Ok(Some(lsp::WorkspaceEdit {
4503                    changes: Some(
4504                        [
4505                            (
4506                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4507                                vec![lsp::TextEdit::new(
4508                                    lsp::Range::new(
4509                                        lsp::Position::new(0, 6),
4510                                        lsp::Position::new(0, 9),
4511                                    ),
4512                                    "THREE".to_string(),
4513                                )],
4514                            ),
4515                            (
4516                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4517                                vec![
4518                                    lsp::TextEdit::new(
4519                                        lsp::Range::new(
4520                                            lsp::Position::new(0, 24),
4521                                            lsp::Position::new(0, 27),
4522                                        ),
4523                                        "THREE".to_string(),
4524                                    ),
4525                                    lsp::TextEdit::new(
4526                                        lsp::Range::new(
4527                                            lsp::Position::new(0, 35),
4528                                            lsp::Position::new(0, 38),
4529                                        ),
4530                                        "THREE".to_string(),
4531                                    ),
4532                                ],
4533                            ),
4534                        ]
4535                        .into_iter()
4536                        .collect(),
4537                    ),
4538                    ..Default::default()
4539                }))
4540            })
4541            .next()
4542            .await
4543            .unwrap();
4544        confirm_rename.await.unwrap();
4545
4546        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4547            workspace
4548                .active_item(cx)
4549                .unwrap()
4550                .downcast::<Editor>()
4551                .unwrap()
4552        });
4553        rename_editor.update(cx_b, |editor, cx| {
4554            assert_eq!(
4555                editor.text(cx),
4556                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4557            );
4558            editor.undo(&Undo, cx);
4559            assert_eq!(
4560                editor.text(cx),
4561                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4562            );
4563            editor.redo(&Redo, cx);
4564            assert_eq!(
4565                editor.text(cx),
4566                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4567            );
4568        });
4569
4570        // Ensure temporary rename edits cannot be undone/redone.
4571        editor_b.update(cx_b, |editor, cx| {
4572            editor.undo(&Undo, cx);
4573            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4574            editor.undo(&Undo, cx);
4575            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4576            editor.redo(&Redo, cx);
4577            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4578        })
4579    }
4580
4581    #[gpui::test(iterations = 10)]
4582    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4583        cx_a.foreground().forbid_parking();
4584
4585        // Connect to a server as 2 clients.
4586        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4587        let client_a = server.create_client(cx_a, "user_a").await;
4588        let client_b = server.create_client(cx_b, "user_b").await;
4589
4590        // Create an org that includes these 2 users.
4591        let db = &server.app_state.db;
4592        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4593        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4594            .await
4595            .unwrap();
4596        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4597            .await
4598            .unwrap();
4599
4600        // Create a channel that includes all the users.
4601        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4602        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4603            .await
4604            .unwrap();
4605        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4606            .await
4607            .unwrap();
4608        db.create_channel_message(
4609            channel_id,
4610            client_b.current_user_id(&cx_b),
4611            "hello A, it's B.",
4612            OffsetDateTime::now_utc(),
4613            1,
4614        )
4615        .await
4616        .unwrap();
4617
4618        let channels_a = cx_a
4619            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4620        channels_a
4621            .condition(cx_a, |list, _| list.available_channels().is_some())
4622            .await;
4623        channels_a.read_with(cx_a, |list, _| {
4624            assert_eq!(
4625                list.available_channels().unwrap(),
4626                &[ChannelDetails {
4627                    id: channel_id.to_proto(),
4628                    name: "test-channel".to_string()
4629                }]
4630            )
4631        });
4632        let channel_a = channels_a.update(cx_a, |this, cx| {
4633            this.get_channel(channel_id.to_proto(), cx).unwrap()
4634        });
4635        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4636        channel_a
4637            .condition(&cx_a, |channel, _| {
4638                channel_messages(channel)
4639                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4640            })
4641            .await;
4642
4643        let channels_b = cx_b
4644            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4645        channels_b
4646            .condition(cx_b, |list, _| list.available_channels().is_some())
4647            .await;
4648        channels_b.read_with(cx_b, |list, _| {
4649            assert_eq!(
4650                list.available_channels().unwrap(),
4651                &[ChannelDetails {
4652                    id: channel_id.to_proto(),
4653                    name: "test-channel".to_string()
4654                }]
4655            )
4656        });
4657
4658        let channel_b = channels_b.update(cx_b, |this, cx| {
4659            this.get_channel(channel_id.to_proto(), cx).unwrap()
4660        });
4661        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4662        channel_b
4663            .condition(&cx_b, |channel, _| {
4664                channel_messages(channel)
4665                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4666            })
4667            .await;
4668
4669        channel_a
4670            .update(cx_a, |channel, cx| {
4671                channel
4672                    .send_message("oh, hi B.".to_string(), cx)
4673                    .unwrap()
4674                    .detach();
4675                let task = channel.send_message("sup".to_string(), cx).unwrap();
4676                assert_eq!(
4677                    channel_messages(channel),
4678                    &[
4679                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4680                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4681                        ("user_a".to_string(), "sup".to_string(), true)
4682                    ]
4683                );
4684                task
4685            })
4686            .await
4687            .unwrap();
4688
4689        channel_b
4690            .condition(&cx_b, |channel, _| {
4691                channel_messages(channel)
4692                    == [
4693                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4694                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4695                        ("user_a".to_string(), "sup".to_string(), false),
4696                    ]
4697            })
4698            .await;
4699
4700        assert_eq!(
4701            server
4702                .state()
4703                .await
4704                .channel(channel_id)
4705                .unwrap()
4706                .connection_ids
4707                .len(),
4708            2
4709        );
4710        cx_b.update(|_| drop(channel_b));
4711        server
4712            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4713            .await;
4714
4715        cx_a.update(|_| drop(channel_a));
4716        server
4717            .condition(|state| state.channel(channel_id).is_none())
4718            .await;
4719    }
4720
4721    #[gpui::test(iterations = 10)]
4722    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4723        cx_a.foreground().forbid_parking();
4724
4725        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4726        let client_a = server.create_client(cx_a, "user_a").await;
4727
4728        let db = &server.app_state.db;
4729        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4730        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4731        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4732            .await
4733            .unwrap();
4734        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4735            .await
4736            .unwrap();
4737
4738        let channels_a = cx_a
4739            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4740        channels_a
4741            .condition(cx_a, |list, _| list.available_channels().is_some())
4742            .await;
4743        let channel_a = channels_a.update(cx_a, |this, cx| {
4744            this.get_channel(channel_id.to_proto(), cx).unwrap()
4745        });
4746
4747        // Messages aren't allowed to be too long.
4748        channel_a
4749            .update(cx_a, |channel, cx| {
4750                let long_body = "this is long.\n".repeat(1024);
4751                channel.send_message(long_body, cx).unwrap()
4752            })
4753            .await
4754            .unwrap_err();
4755
4756        // Messages aren't allowed to be blank.
4757        channel_a.update(cx_a, |channel, cx| {
4758            channel.send_message(String::new(), cx).unwrap_err()
4759        });
4760
4761        // Leading and trailing whitespace are trimmed.
4762        channel_a
4763            .update(cx_a, |channel, cx| {
4764                channel
4765                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4766                    .unwrap()
4767            })
4768            .await
4769            .unwrap();
4770        assert_eq!(
4771            db.get_channel_messages(channel_id, 10, None)
4772                .await
4773                .unwrap()
4774                .iter()
4775                .map(|m| &m.body)
4776                .collect::<Vec<_>>(),
4777            &["surrounded by whitespace"]
4778        );
4779    }
4780
4781    #[gpui::test(iterations = 10)]
4782    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4783        cx_a.foreground().forbid_parking();
4784
4785        // Connect to a server as 2 clients.
4786        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4787        let client_a = server.create_client(cx_a, "user_a").await;
4788        let client_b = server.create_client(cx_b, "user_b").await;
4789        let mut status_b = client_b.status();
4790
4791        // Create an org that includes these 2 users.
4792        let db = &server.app_state.db;
4793        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4794        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4795            .await
4796            .unwrap();
4797        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4798            .await
4799            .unwrap();
4800
4801        // Create a channel that includes all the users.
4802        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4803        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4804            .await
4805            .unwrap();
4806        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4807            .await
4808            .unwrap();
4809        db.create_channel_message(
4810            channel_id,
4811            client_b.current_user_id(&cx_b),
4812            "hello A, it's B.",
4813            OffsetDateTime::now_utc(),
4814            2,
4815        )
4816        .await
4817        .unwrap();
4818
4819        let channels_a = cx_a
4820            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4821        channels_a
4822            .condition(cx_a, |list, _| list.available_channels().is_some())
4823            .await;
4824
4825        channels_a.read_with(cx_a, |list, _| {
4826            assert_eq!(
4827                list.available_channels().unwrap(),
4828                &[ChannelDetails {
4829                    id: channel_id.to_proto(),
4830                    name: "test-channel".to_string()
4831                }]
4832            )
4833        });
4834        let channel_a = channels_a.update(cx_a, |this, cx| {
4835            this.get_channel(channel_id.to_proto(), cx).unwrap()
4836        });
4837        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4838        channel_a
4839            .condition(&cx_a, |channel, _| {
4840                channel_messages(channel)
4841                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4842            })
4843            .await;
4844
4845        let channels_b = cx_b
4846            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4847        channels_b
4848            .condition(cx_b, |list, _| list.available_channels().is_some())
4849            .await;
4850        channels_b.read_with(cx_b, |list, _| {
4851            assert_eq!(
4852                list.available_channels().unwrap(),
4853                &[ChannelDetails {
4854                    id: channel_id.to_proto(),
4855                    name: "test-channel".to_string()
4856                }]
4857            )
4858        });
4859
4860        let channel_b = channels_b.update(cx_b, |this, cx| {
4861            this.get_channel(channel_id.to_proto(), cx).unwrap()
4862        });
4863        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4864        channel_b
4865            .condition(&cx_b, |channel, _| {
4866                channel_messages(channel)
4867                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4868            })
4869            .await;
4870
4871        // Disconnect client B, ensuring we can still access its cached channel data.
4872        server.forbid_connections();
4873        server.disconnect_client(client_b.current_user_id(&cx_b));
4874        cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4875        while !matches!(
4876            status_b.next().await,
4877            Some(client::Status::ReconnectionError { .. })
4878        ) {}
4879
4880        channels_b.read_with(cx_b, |channels, _| {
4881            assert_eq!(
4882                channels.available_channels().unwrap(),
4883                [ChannelDetails {
4884                    id: channel_id.to_proto(),
4885                    name: "test-channel".to_string()
4886                }]
4887            )
4888        });
4889        channel_b.read_with(cx_b, |channel, _| {
4890            assert_eq!(
4891                channel_messages(channel),
4892                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4893            )
4894        });
4895
4896        // Send a message from client B while it is disconnected.
4897        channel_b
4898            .update(cx_b, |channel, cx| {
4899                let task = channel
4900                    .send_message("can you see this?".to_string(), cx)
4901                    .unwrap();
4902                assert_eq!(
4903                    channel_messages(channel),
4904                    &[
4905                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4906                        ("user_b".to_string(), "can you see this?".to_string(), true)
4907                    ]
4908                );
4909                task
4910            })
4911            .await
4912            .unwrap_err();
4913
4914        // Send a message from client A while B is disconnected.
4915        channel_a
4916            .update(cx_a, |channel, cx| {
4917                channel
4918                    .send_message("oh, hi B.".to_string(), cx)
4919                    .unwrap()
4920                    .detach();
4921                let task = channel.send_message("sup".to_string(), cx).unwrap();
4922                assert_eq!(
4923                    channel_messages(channel),
4924                    &[
4925                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4926                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4927                        ("user_a".to_string(), "sup".to_string(), true)
4928                    ]
4929                );
4930                task
4931            })
4932            .await
4933            .unwrap();
4934
4935        // Give client B a chance to reconnect.
4936        server.allow_connections();
4937        cx_b.foreground().advance_clock(Duration::from_secs(10));
4938
4939        // Verify that B sees the new messages upon reconnection, as well as the message client B
4940        // sent while offline.
4941        channel_b
4942            .condition(&cx_b, |channel, _| {
4943                channel_messages(channel)
4944                    == [
4945                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4946                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4947                        ("user_a".to_string(), "sup".to_string(), false),
4948                        ("user_b".to_string(), "can you see this?".to_string(), false),
4949                    ]
4950            })
4951            .await;
4952
4953        // Ensure client A and B can communicate normally after reconnection.
4954        channel_a
4955            .update(cx_a, |channel, cx| {
4956                channel.send_message("you online?".to_string(), cx).unwrap()
4957            })
4958            .await
4959            .unwrap();
4960        channel_b
4961            .condition(&cx_b, |channel, _| {
4962                channel_messages(channel)
4963                    == [
4964                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4965                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4966                        ("user_a".to_string(), "sup".to_string(), false),
4967                        ("user_b".to_string(), "can you see this?".to_string(), false),
4968                        ("user_a".to_string(), "you online?".to_string(), false),
4969                    ]
4970            })
4971            .await;
4972
4973        channel_b
4974            .update(cx_b, |channel, cx| {
4975                channel.send_message("yep".to_string(), cx).unwrap()
4976            })
4977            .await
4978            .unwrap();
4979        channel_a
4980            .condition(&cx_a, |channel, _| {
4981                channel_messages(channel)
4982                    == [
4983                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4984                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4985                        ("user_a".to_string(), "sup".to_string(), false),
4986                        ("user_b".to_string(), "can you see this?".to_string(), false),
4987                        ("user_a".to_string(), "you online?".to_string(), false),
4988                        ("user_b".to_string(), "yep".to_string(), false),
4989                    ]
4990            })
4991            .await;
4992    }
4993
4994    #[gpui::test(iterations = 10)]
4995    async fn test_contacts(
4996        deterministic: Arc<Deterministic>,
4997        cx_a: &mut TestAppContext,
4998        cx_b: &mut TestAppContext,
4999        cx_c: &mut TestAppContext,
5000    ) {
5001        cx_a.foreground().forbid_parking();
5002
5003        // Connect to a server as 3 clients.
5004        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5005        let mut client_a = server.create_client(cx_a, "user_a").await;
5006        let mut client_b = server.create_client(cx_b, "user_b").await;
5007        let client_c = server.create_client(cx_c, "user_c").await;
5008        server
5009            .make_contacts(vec![
5010                (&client_a, cx_a),
5011                (&client_b, cx_b),
5012                (&client_c, cx_c),
5013            ])
5014            .await;
5015
5016        deterministic.run_until_parked();
5017        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5018            client.user_store.read_with(*cx, |store, _| {
5019                assert_eq!(
5020                    contacts(store),
5021                    [
5022                        ("user_a", true, vec![]),
5023                        ("user_b", true, vec![]),
5024                        ("user_c", true, vec![])
5025                    ]
5026                )
5027            });
5028        }
5029
5030        // Share a project as client A.
5031        let fs = FakeFs::new(cx_a.background());
5032        fs.create_dir(Path::new("/a")).await.unwrap();
5033        let (project_a, _) = client_a.build_local_project(fs, "/a", cx_a).await;
5034
5035        deterministic.run_until_parked();
5036        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5037            client.user_store.read_with(*cx, |store, _| {
5038                assert_eq!(
5039                    contacts(store),
5040                    [
5041                        ("user_a", true, vec![("a", false, vec![])]),
5042                        ("user_b", true, vec![]),
5043                        ("user_c", true, vec![])
5044                    ]
5045                )
5046            });
5047        }
5048
5049        let project_id = project_a
5050            .update(cx_a, |project, _| project.next_remote_id())
5051            .await;
5052        project_a
5053            .update(cx_a, |project, cx| project.share(cx))
5054            .await
5055            .unwrap();
5056
5057        deterministic.run_until_parked();
5058        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5059            client.user_store.read_with(*cx, |store, _| {
5060                assert_eq!(
5061                    contacts(store),
5062                    [
5063                        ("user_a", true, vec![("a", true, vec![])]),
5064                        ("user_b", true, vec![]),
5065                        ("user_c", true, vec![])
5066                    ]
5067                )
5068            });
5069        }
5070
5071        let _project_b = client_b.build_remote_project(project_id, cx_b).await;
5072
5073        deterministic.run_until_parked();
5074        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5075            client.user_store.read_with(*cx, |store, _| {
5076                assert_eq!(
5077                    contacts(store),
5078                    [
5079                        ("user_a", true, vec![("a", true, vec!["user_b"])]),
5080                        ("user_b", true, vec![]),
5081                        ("user_c", true, vec![])
5082                    ]
5083                )
5084            });
5085        }
5086
5087        // Add a local project as client B
5088        let fs = FakeFs::new(cx_b.background());
5089        fs.create_dir(Path::new("/b")).await.unwrap();
5090        let (_project_b, _) = client_b.build_local_project(fs, "/b", cx_a).await;
5091
5092        deterministic.run_until_parked();
5093        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5094            client.user_store.read_with(*cx, |store, _| {
5095                assert_eq!(
5096                    contacts(store),
5097                    [
5098                        ("user_a", true, vec![("a", true, vec!["user_b"])]),
5099                        ("user_b", true, vec![("b", false, vec![])]),
5100                        ("user_c", true, vec![])
5101                    ]
5102                )
5103            });
5104        }
5105
5106        project_a
5107            .condition(&cx_a, |project, _| {
5108                project.collaborators().contains_key(&client_b.peer_id)
5109            })
5110            .await;
5111
5112        client_a.project.take();
5113        cx_a.update(move |_| drop(project_a));
5114        deterministic.run_until_parked();
5115        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5116            client.user_store.read_with(*cx, |store, _| {
5117                assert_eq!(
5118                    contacts(store),
5119                    [
5120                        ("user_a", true, vec![]),
5121                        ("user_b", true, vec![("b", false, vec![])]),
5122                        ("user_c", true, vec![])
5123                    ]
5124                )
5125            });
5126        }
5127
5128        server.disconnect_client(client_c.current_user_id(cx_c));
5129        server.forbid_connections();
5130        deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5131        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5132            client.user_store.read_with(*cx, |store, _| {
5133                assert_eq!(
5134                    contacts(store),
5135                    [
5136                        ("user_a", true, vec![]),
5137                        ("user_b", true, vec![("b", false, vec![])]),
5138                        ("user_c", false, vec![])
5139                    ]
5140                )
5141            });
5142        }
5143        client_c
5144            .user_store
5145            .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5146
5147        server.allow_connections();
5148        client_c
5149            .authenticate_and_connect(false, &cx_c.to_async())
5150            .await
5151            .unwrap();
5152
5153        deterministic.run_until_parked();
5154        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5155            client.user_store.read_with(*cx, |store, _| {
5156                assert_eq!(
5157                    contacts(store),
5158                    [
5159                        ("user_a", true, vec![]),
5160                        ("user_b", true, vec![("b", false, vec![])]),
5161                        ("user_c", true, vec![])
5162                    ]
5163                )
5164            });
5165        }
5166
5167        fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, bool, Vec<&str>)>)> {
5168            user_store
5169                .contacts()
5170                .iter()
5171                .map(|contact| {
5172                    let projects = contact
5173                        .projects
5174                        .iter()
5175                        .map(|p| {
5176                            (
5177                                p.worktree_root_names[0].as_str(),
5178                                p.is_shared,
5179                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5180                            )
5181                        })
5182                        .collect();
5183                    (contact.user.github_login.as_str(), contact.online, projects)
5184                })
5185                .collect()
5186        }
5187    }
5188
5189    #[gpui::test(iterations = 10)]
5190    async fn test_contact_requests(
5191        executor: Arc<Deterministic>,
5192        cx_a: &mut TestAppContext,
5193        cx_a2: &mut TestAppContext,
5194        cx_b: &mut TestAppContext,
5195        cx_b2: &mut TestAppContext,
5196        cx_c: &mut TestAppContext,
5197        cx_c2: &mut TestAppContext,
5198    ) {
5199        cx_a.foreground().forbid_parking();
5200
5201        // Connect to a server as 3 clients.
5202        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5203        let client_a = server.create_client(cx_a, "user_a").await;
5204        let client_a2 = server.create_client(cx_a2, "user_a").await;
5205        let client_b = server.create_client(cx_b, "user_b").await;
5206        let client_b2 = server.create_client(cx_b2, "user_b").await;
5207        let client_c = server.create_client(cx_c, "user_c").await;
5208        let client_c2 = server.create_client(cx_c2, "user_c").await;
5209
5210        assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5211        assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5212        assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5213
5214        // User A and User C request that user B become their contact.
5215        client_a
5216            .user_store
5217            .update(cx_a, |store, cx| {
5218                store.request_contact(client_b.user_id().unwrap(), cx)
5219            })
5220            .await
5221            .unwrap();
5222        client_c
5223            .user_store
5224            .update(cx_c, |store, cx| {
5225                store.request_contact(client_b.user_id().unwrap(), cx)
5226            })
5227            .await
5228            .unwrap();
5229        executor.run_until_parked();
5230
5231        // All users see the pending request appear in all their clients.
5232        assert_eq!(
5233            client_a.summarize_contacts(&cx_a).outgoing_requests,
5234            &["user_b"]
5235        );
5236        assert_eq!(
5237            client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5238            &["user_b"]
5239        );
5240        assert_eq!(
5241            client_b.summarize_contacts(&cx_b).incoming_requests,
5242            &["user_a", "user_c"]
5243        );
5244        assert_eq!(
5245            client_b2.summarize_contacts(&cx_b2).incoming_requests,
5246            &["user_a", "user_c"]
5247        );
5248        assert_eq!(
5249            client_c.summarize_contacts(&cx_c).outgoing_requests,
5250            &["user_b"]
5251        );
5252        assert_eq!(
5253            client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5254            &["user_b"]
5255        );
5256
5257        // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5258        disconnect_and_reconnect(&client_a, cx_a).await;
5259        disconnect_and_reconnect(&client_b, cx_b).await;
5260        disconnect_and_reconnect(&client_c, cx_c).await;
5261        executor.run_until_parked();
5262        assert_eq!(
5263            client_a.summarize_contacts(&cx_a).outgoing_requests,
5264            &["user_b"]
5265        );
5266        assert_eq!(
5267            client_b.summarize_contacts(&cx_b).incoming_requests,
5268            &["user_a", "user_c"]
5269        );
5270        assert_eq!(
5271            client_c.summarize_contacts(&cx_c).outgoing_requests,
5272            &["user_b"]
5273        );
5274
5275        // User B accepts the request from user A.
5276        client_b
5277            .user_store
5278            .update(cx_b, |store, cx| {
5279                store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5280            })
5281            .await
5282            .unwrap();
5283
5284        executor.run_until_parked();
5285
5286        // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5287        let contacts_b = client_b.summarize_contacts(&cx_b);
5288        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5289        assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5290        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5291        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5292        assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5293
5294        // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5295        let contacts_a = client_a.summarize_contacts(&cx_a);
5296        assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5297        assert!(contacts_a.outgoing_requests.is_empty());
5298        let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5299        assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5300        assert!(contacts_a2.outgoing_requests.is_empty());
5301
5302        // Contacts are present upon connecting (tested here via disconnect/reconnect)
5303        disconnect_and_reconnect(&client_a, cx_a).await;
5304        disconnect_and_reconnect(&client_b, cx_b).await;
5305        disconnect_and_reconnect(&client_c, cx_c).await;
5306        executor.run_until_parked();
5307        assert_eq!(
5308            client_a.summarize_contacts(&cx_a).current,
5309            &["user_a", "user_b"]
5310        );
5311        assert_eq!(
5312            client_b.summarize_contacts(&cx_b).current,
5313            &["user_a", "user_b"]
5314        );
5315        assert_eq!(
5316            client_b.summarize_contacts(&cx_b).incoming_requests,
5317            &["user_c"]
5318        );
5319        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5320        assert_eq!(
5321            client_c.summarize_contacts(&cx_c).outgoing_requests,
5322            &["user_b"]
5323        );
5324
5325        // User B rejects the request from user C.
5326        client_b
5327            .user_store
5328            .update(cx_b, |store, cx| {
5329                store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5330            })
5331            .await
5332            .unwrap();
5333
5334        executor.run_until_parked();
5335
5336        // User B doesn't see user C as their contact, and the incoming request from them is removed.
5337        let contacts_b = client_b.summarize_contacts(&cx_b);
5338        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5339        assert!(contacts_b.incoming_requests.is_empty());
5340        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5341        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5342        assert!(contacts_b2.incoming_requests.is_empty());
5343
5344        // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5345        let contacts_c = client_c.summarize_contacts(&cx_c);
5346        assert_eq!(contacts_c.current, &["user_c"]);
5347        assert!(contacts_c.outgoing_requests.is_empty());
5348        let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5349        assert_eq!(contacts_c2.current, &["user_c"]);
5350        assert!(contacts_c2.outgoing_requests.is_empty());
5351
5352        // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5353        disconnect_and_reconnect(&client_a, cx_a).await;
5354        disconnect_and_reconnect(&client_b, cx_b).await;
5355        disconnect_and_reconnect(&client_c, cx_c).await;
5356        executor.run_until_parked();
5357        assert_eq!(
5358            client_a.summarize_contacts(&cx_a).current,
5359            &["user_a", "user_b"]
5360        );
5361        assert_eq!(
5362            client_b.summarize_contacts(&cx_b).current,
5363            &["user_a", "user_b"]
5364        );
5365        assert!(client_b
5366            .summarize_contacts(&cx_b)
5367            .incoming_requests
5368            .is_empty());
5369        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5370        assert!(client_c
5371            .summarize_contacts(&cx_c)
5372            .outgoing_requests
5373            .is_empty());
5374
5375        async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5376            client.disconnect(&cx.to_async()).unwrap();
5377            client.clear_contacts(cx).await;
5378            client
5379                .authenticate_and_connect(false, &cx.to_async())
5380                .await
5381                .unwrap();
5382        }
5383    }
5384
5385    #[gpui::test(iterations = 10)]
5386    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5387        cx_a.foreground().forbid_parking();
5388        let fs = FakeFs::new(cx_a.background());
5389
5390        // 2 clients connect to a server.
5391        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5392        let mut client_a = server.create_client(cx_a, "user_a").await;
5393        let mut client_b = server.create_client(cx_b, "user_b").await;
5394        server
5395            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5396            .await;
5397        cx_a.update(editor::init);
5398        cx_b.update(editor::init);
5399
5400        // Client A shares a project.
5401        fs.insert_tree(
5402            "/a",
5403            json!({
5404                "1.txt": "one",
5405                "2.txt": "two",
5406                "3.txt": "three",
5407            }),
5408        )
5409        .await;
5410        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5411        project_a
5412            .update(cx_a, |project, cx| project.share(cx))
5413            .await
5414            .unwrap();
5415
5416        // Client B joins the project.
5417        let project_b = client_b
5418            .build_remote_project(
5419                project_a
5420                    .read_with(cx_a, |project, _| project.remote_id())
5421                    .unwrap(),
5422                cx_b,
5423            )
5424            .await;
5425
5426        // Client A opens some editors.
5427        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5428        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5429        let editor_a1 = workspace_a
5430            .update(cx_a, |workspace, cx| {
5431                workspace.open_path((worktree_id, "1.txt"), true, cx)
5432            })
5433            .await
5434            .unwrap()
5435            .downcast::<Editor>()
5436            .unwrap();
5437        let editor_a2 = workspace_a
5438            .update(cx_a, |workspace, cx| {
5439                workspace.open_path((worktree_id, "2.txt"), true, cx)
5440            })
5441            .await
5442            .unwrap()
5443            .downcast::<Editor>()
5444            .unwrap();
5445
5446        // Client B opens an editor.
5447        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5448        let editor_b1 = workspace_b
5449            .update(cx_b, |workspace, cx| {
5450                workspace.open_path((worktree_id, "1.txt"), true, cx)
5451            })
5452            .await
5453            .unwrap()
5454            .downcast::<Editor>()
5455            .unwrap();
5456
5457        let client_a_id = project_b.read_with(cx_b, |project, _| {
5458            project.collaborators().values().next().unwrap().peer_id
5459        });
5460        let client_b_id = project_a.read_with(cx_a, |project, _| {
5461            project.collaborators().values().next().unwrap().peer_id
5462        });
5463
5464        // When client B starts following client A, all visible view states are replicated to client B.
5465        editor_a1.update(cx_a, |editor, cx| {
5466            editor.change_selections(None, cx, |s| s.select_ranges([0..1]))
5467        });
5468        editor_a2.update(cx_a, |editor, cx| {
5469            editor.change_selections(None, cx, |s| s.select_ranges([2..3]))
5470        });
5471        workspace_b
5472            .update(cx_b, |workspace, cx| {
5473                workspace
5474                    .toggle_follow(&ToggleFollow(client_a_id), cx)
5475                    .unwrap()
5476            })
5477            .await
5478            .unwrap();
5479        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5480            workspace
5481                .active_item(cx)
5482                .unwrap()
5483                .downcast::<Editor>()
5484                .unwrap()
5485        });
5486        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5487        assert_eq!(
5488            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5489            Some((worktree_id, "2.txt").into())
5490        );
5491        assert_eq!(
5492            editor_b2.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5493            vec![2..3]
5494        );
5495        assert_eq!(
5496            editor_b1.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)),
5497            vec![0..1]
5498        );
5499
5500        // When client A activates a different editor, client B does so as well.
5501        workspace_a.update(cx_a, |workspace, cx| {
5502            workspace.activate_item(&editor_a1, cx)
5503        });
5504        workspace_b
5505            .condition(cx_b, |workspace, cx| {
5506                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5507            })
5508            .await;
5509
5510        // When client A navigates back and forth, client B does so as well.
5511        workspace_a
5512            .update(cx_a, |workspace, cx| {
5513                workspace::Pane::go_back(workspace, None, cx)
5514            })
5515            .await;
5516        workspace_b
5517            .condition(cx_b, |workspace, cx| {
5518                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5519            })
5520            .await;
5521
5522        workspace_a
5523            .update(cx_a, |workspace, cx| {
5524                workspace::Pane::go_forward(workspace, None, cx)
5525            })
5526            .await;
5527        workspace_b
5528            .condition(cx_b, |workspace, cx| {
5529                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5530            })
5531            .await;
5532
5533        // Changes to client A's editor are reflected on client B.
5534        editor_a1.update(cx_a, |editor, cx| {
5535            editor.change_selections(None, cx, |s| s.select_ranges([1..1, 2..2]));
5536        });
5537        editor_b1
5538            .condition(cx_b, |editor, cx| {
5539                editor.selections.ranges(cx) == vec![1..1, 2..2]
5540            })
5541            .await;
5542
5543        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5544        editor_b1
5545            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5546            .await;
5547
5548        editor_a1.update(cx_a, |editor, cx| {
5549            editor.change_selections(None, cx, |s| s.select_ranges([3..3]));
5550            editor.set_scroll_position(vec2f(0., 100.), cx);
5551        });
5552        editor_b1
5553            .condition(cx_b, |editor, cx| {
5554                editor.selections.ranges(cx) == vec![3..3]
5555            })
5556            .await;
5557
5558        // After unfollowing, client B stops receiving updates from client A.
5559        workspace_b.update(cx_b, |workspace, cx| {
5560            workspace.unfollow(&workspace.active_pane().clone(), cx)
5561        });
5562        workspace_a.update(cx_a, |workspace, cx| {
5563            workspace.activate_item(&editor_a2, cx)
5564        });
5565        cx_a.foreground().run_until_parked();
5566        assert_eq!(
5567            workspace_b.read_with(cx_b, |workspace, cx| workspace
5568                .active_item(cx)
5569                .unwrap()
5570                .id()),
5571            editor_b1.id()
5572        );
5573
5574        // Client A starts following client B.
5575        workspace_a
5576            .update(cx_a, |workspace, cx| {
5577                workspace
5578                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5579                    .unwrap()
5580            })
5581            .await
5582            .unwrap();
5583        assert_eq!(
5584            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5585            Some(client_b_id)
5586        );
5587        assert_eq!(
5588            workspace_a.read_with(cx_a, |workspace, cx| workspace
5589                .active_item(cx)
5590                .unwrap()
5591                .id()),
5592            editor_a1.id()
5593        );
5594
5595        // Following interrupts when client B disconnects.
5596        client_b.disconnect(&cx_b.to_async()).unwrap();
5597        cx_a.foreground().run_until_parked();
5598        assert_eq!(
5599            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5600            None
5601        );
5602    }
5603
5604    #[gpui::test(iterations = 10)]
5605    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5606        cx_a.foreground().forbid_parking();
5607        let fs = FakeFs::new(cx_a.background());
5608
5609        // 2 clients connect to a server.
5610        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5611        let mut client_a = server.create_client(cx_a, "user_a").await;
5612        let mut client_b = server.create_client(cx_b, "user_b").await;
5613        server
5614            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5615            .await;
5616        cx_a.update(editor::init);
5617        cx_b.update(editor::init);
5618
5619        // Client A shares a project.
5620        fs.insert_tree(
5621            "/a",
5622            json!({
5623                "1.txt": "one",
5624                "2.txt": "two",
5625                "3.txt": "three",
5626                "4.txt": "four",
5627            }),
5628        )
5629        .await;
5630        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5631        project_a
5632            .update(cx_a, |project, cx| project.share(cx))
5633            .await
5634            .unwrap();
5635
5636        // Client B joins the project.
5637        let project_b = client_b
5638            .build_remote_project(
5639                project_a
5640                    .read_with(cx_a, |project, _| project.remote_id())
5641                    .unwrap(),
5642                cx_b,
5643            )
5644            .await;
5645
5646        // Client A opens some editors.
5647        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5648        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5649        let _editor_a1 = workspace_a
5650            .update(cx_a, |workspace, cx| {
5651                workspace.open_path((worktree_id, "1.txt"), true, cx)
5652            })
5653            .await
5654            .unwrap()
5655            .downcast::<Editor>()
5656            .unwrap();
5657
5658        // Client B opens an editor.
5659        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5660        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5661        let _editor_b1 = workspace_b
5662            .update(cx_b, |workspace, cx| {
5663                workspace.open_path((worktree_id, "2.txt"), true, cx)
5664            })
5665            .await
5666            .unwrap()
5667            .downcast::<Editor>()
5668            .unwrap();
5669
5670        // Clients A and B follow each other in split panes
5671        workspace_a
5672            .update(cx_a, |workspace, cx| {
5673                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5674                assert_ne!(*workspace.active_pane(), pane_a1);
5675                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5676                workspace
5677                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5678                    .unwrap()
5679            })
5680            .await
5681            .unwrap();
5682        workspace_b
5683            .update(cx_b, |workspace, cx| {
5684                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5685                assert_ne!(*workspace.active_pane(), pane_b1);
5686                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5687                workspace
5688                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5689                    .unwrap()
5690            })
5691            .await
5692            .unwrap();
5693
5694        workspace_a
5695            .update(cx_a, |workspace, cx| {
5696                workspace.activate_next_pane(cx);
5697                assert_eq!(*workspace.active_pane(), pane_a1);
5698                workspace.open_path((worktree_id, "3.txt"), true, cx)
5699            })
5700            .await
5701            .unwrap();
5702        workspace_b
5703            .update(cx_b, |workspace, cx| {
5704                workspace.activate_next_pane(cx);
5705                assert_eq!(*workspace.active_pane(), pane_b1);
5706                workspace.open_path((worktree_id, "4.txt"), true, cx)
5707            })
5708            .await
5709            .unwrap();
5710        cx_a.foreground().run_until_parked();
5711
5712        // Ensure leader updates don't change the active pane of followers
5713        workspace_a.read_with(cx_a, |workspace, _| {
5714            assert_eq!(*workspace.active_pane(), pane_a1);
5715        });
5716        workspace_b.read_with(cx_b, |workspace, _| {
5717            assert_eq!(*workspace.active_pane(), pane_b1);
5718        });
5719
5720        // Ensure peers following each other doesn't cause an infinite loop.
5721        assert_eq!(
5722            workspace_a.read_with(cx_a, |workspace, cx| workspace
5723                .active_item(cx)
5724                .unwrap()
5725                .project_path(cx)),
5726            Some((worktree_id, "3.txt").into())
5727        );
5728        workspace_a.update(cx_a, |workspace, cx| {
5729            assert_eq!(
5730                workspace.active_item(cx).unwrap().project_path(cx),
5731                Some((worktree_id, "3.txt").into())
5732            );
5733            workspace.activate_next_pane(cx);
5734            assert_eq!(
5735                workspace.active_item(cx).unwrap().project_path(cx),
5736                Some((worktree_id, "4.txt").into())
5737            );
5738        });
5739        workspace_b.update(cx_b, |workspace, cx| {
5740            assert_eq!(
5741                workspace.active_item(cx).unwrap().project_path(cx),
5742                Some((worktree_id, "4.txt").into())
5743            );
5744            workspace.activate_next_pane(cx);
5745            assert_eq!(
5746                workspace.active_item(cx).unwrap().project_path(cx),
5747                Some((worktree_id, "3.txt").into())
5748            );
5749        });
5750    }
5751
5752    #[gpui::test(iterations = 10)]
5753    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5754        cx_a.foreground().forbid_parking();
5755        let fs = FakeFs::new(cx_a.background());
5756
5757        // 2 clients connect to a server.
5758        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5759        let mut client_a = server.create_client(cx_a, "user_a").await;
5760        let mut client_b = server.create_client(cx_b, "user_b").await;
5761        server
5762            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5763            .await;
5764        cx_a.update(editor::init);
5765        cx_b.update(editor::init);
5766
5767        // Client A shares a project.
5768        fs.insert_tree(
5769            "/a",
5770            json!({
5771                "1.txt": "one",
5772                "2.txt": "two",
5773                "3.txt": "three",
5774            }),
5775        )
5776        .await;
5777        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5778        project_a
5779            .update(cx_a, |project, cx| project.share(cx))
5780            .await
5781            .unwrap();
5782
5783        // Client B joins the project.
5784        let project_b = client_b
5785            .build_remote_project(
5786                project_a
5787                    .read_with(cx_a, |project, _| project.remote_id())
5788                    .unwrap(),
5789                cx_b,
5790            )
5791            .await;
5792
5793        // Client A opens some editors.
5794        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5795        let _editor_a1 = workspace_a
5796            .update(cx_a, |workspace, cx| {
5797                workspace.open_path((worktree_id, "1.txt"), true, cx)
5798            })
5799            .await
5800            .unwrap()
5801            .downcast::<Editor>()
5802            .unwrap();
5803
5804        // Client B starts following client A.
5805        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5806        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5807        let leader_id = project_b.read_with(cx_b, |project, _| {
5808            project.collaborators().values().next().unwrap().peer_id
5809        });
5810        workspace_b
5811            .update(cx_b, |workspace, cx| {
5812                workspace
5813                    .toggle_follow(&ToggleFollow(leader_id), cx)
5814                    .unwrap()
5815            })
5816            .await
5817            .unwrap();
5818        assert_eq!(
5819            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5820            Some(leader_id)
5821        );
5822        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5823            workspace
5824                .active_item(cx)
5825                .unwrap()
5826                .downcast::<Editor>()
5827                .unwrap()
5828        });
5829
5830        // When client B moves, it automatically stops following client A.
5831        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5832        assert_eq!(
5833            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5834            None
5835        );
5836
5837        workspace_b
5838            .update(cx_b, |workspace, cx| {
5839                workspace
5840                    .toggle_follow(&ToggleFollow(leader_id), cx)
5841                    .unwrap()
5842            })
5843            .await
5844            .unwrap();
5845        assert_eq!(
5846            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5847            Some(leader_id)
5848        );
5849
5850        // When client B edits, it automatically stops following client A.
5851        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5852        assert_eq!(
5853            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5854            None
5855        );
5856
5857        workspace_b
5858            .update(cx_b, |workspace, cx| {
5859                workspace
5860                    .toggle_follow(&ToggleFollow(leader_id), cx)
5861                    .unwrap()
5862            })
5863            .await
5864            .unwrap();
5865        assert_eq!(
5866            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5867            Some(leader_id)
5868        );
5869
5870        // When client B scrolls, it automatically stops following client A.
5871        editor_b2.update(cx_b, |editor, cx| {
5872            editor.set_scroll_position(vec2f(0., 3.), cx)
5873        });
5874        assert_eq!(
5875            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5876            None
5877        );
5878
5879        workspace_b
5880            .update(cx_b, |workspace, cx| {
5881                workspace
5882                    .toggle_follow(&ToggleFollow(leader_id), cx)
5883                    .unwrap()
5884            })
5885            .await
5886            .unwrap();
5887        assert_eq!(
5888            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5889            Some(leader_id)
5890        );
5891
5892        // When client B activates a different pane, it continues following client A in the original pane.
5893        workspace_b.update(cx_b, |workspace, cx| {
5894            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5895        });
5896        assert_eq!(
5897            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5898            Some(leader_id)
5899        );
5900
5901        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5902        assert_eq!(
5903            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5904            Some(leader_id)
5905        );
5906
5907        // When client B activates a different item in the original pane, it automatically stops following client A.
5908        workspace_b
5909            .update(cx_b, |workspace, cx| {
5910                workspace.open_path((worktree_id, "2.txt"), true, cx)
5911            })
5912            .await
5913            .unwrap();
5914        assert_eq!(
5915            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5916            None
5917        );
5918    }
5919
5920    #[gpui::test(iterations = 100)]
5921    async fn test_random_collaboration(
5922        cx: &mut TestAppContext,
5923        deterministic: Arc<Deterministic>,
5924        rng: StdRng,
5925    ) {
5926        cx.foreground().forbid_parking();
5927        let max_peers = env::var("MAX_PEERS")
5928            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5929            .unwrap_or(5);
5930        assert!(max_peers <= 5);
5931
5932        let max_operations = env::var("OPERATIONS")
5933            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5934            .unwrap_or(10);
5935
5936        let rng = Arc::new(Mutex::new(rng));
5937
5938        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5939        let host_language_registry = Arc::new(LanguageRegistry::test());
5940
5941        let fs = FakeFs::new(cx.background());
5942        fs.insert_tree("/_collab", json!({"init": ""})).await;
5943
5944        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5945        let db = server.app_state.db.clone();
5946        let host_user_id = db.create_user("host", false).await.unwrap();
5947        for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
5948            let guest_user_id = db.create_user(username, false).await.unwrap();
5949            server
5950                .app_state
5951                .db
5952                .send_contact_request(guest_user_id, host_user_id)
5953                .await
5954                .unwrap();
5955            server
5956                .app_state
5957                .db
5958                .respond_to_contact_request(host_user_id, guest_user_id, true)
5959                .await
5960                .unwrap();
5961        }
5962
5963        let mut clients = Vec::new();
5964        let mut user_ids = Vec::new();
5965        let mut op_start_signals = Vec::new();
5966
5967        let mut next_entity_id = 100000;
5968        let mut host_cx = TestAppContext::new(
5969            cx.foreground_platform(),
5970            cx.platform(),
5971            deterministic.build_foreground(next_entity_id),
5972            deterministic.build_background(),
5973            cx.font_cache(),
5974            cx.leak_detector(),
5975            next_entity_id,
5976        );
5977        let host = server.create_client(&mut host_cx, "host").await;
5978        let host_project = host_cx.update(|cx| {
5979            Project::local(
5980                host.client.clone(),
5981                host.user_store.clone(),
5982                host_language_registry.clone(),
5983                fs.clone(),
5984                cx,
5985            )
5986        });
5987        let host_project_id = host_project
5988            .update(&mut host_cx, |p, _| p.next_remote_id())
5989            .await;
5990
5991        let (collab_worktree, _) = host_project
5992            .update(&mut host_cx, |project, cx| {
5993                project.find_or_create_local_worktree("/_collab", true, cx)
5994            })
5995            .await
5996            .unwrap();
5997        collab_worktree
5998            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5999            .await;
6000        host_project
6001            .update(&mut host_cx, |project, cx| project.share(cx))
6002            .await
6003            .unwrap();
6004
6005        // Set up fake language servers.
6006        let mut language = Language::new(
6007            LanguageConfig {
6008                name: "Rust".into(),
6009                path_suffixes: vec!["rs".to_string()],
6010                ..Default::default()
6011            },
6012            None,
6013        );
6014        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6015            name: "the-fake-language-server",
6016            capabilities: lsp::LanguageServer::full_capabilities(),
6017            initializer: Some(Box::new({
6018                let rng = rng.clone();
6019                let fs = fs.clone();
6020                let project = host_project.downgrade();
6021                move |fake_server: &mut FakeLanguageServer| {
6022                    fake_server.handle_request::<lsp::request::Completion, _, _>(
6023                        |_, _| async move {
6024                            Ok(Some(lsp::CompletionResponse::Array(vec![
6025                                lsp::CompletionItem {
6026                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6027                                        range: lsp::Range::new(
6028                                            lsp::Position::new(0, 0),
6029                                            lsp::Position::new(0, 0),
6030                                        ),
6031                                        new_text: "the-new-text".to_string(),
6032                                    })),
6033                                    ..Default::default()
6034                                },
6035                            ])))
6036                        },
6037                    );
6038
6039                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6040                        |_, _| async move {
6041                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6042                                lsp::CodeAction {
6043                                    title: "the-code-action".to_string(),
6044                                    ..Default::default()
6045                                },
6046                            )]))
6047                        },
6048                    );
6049
6050                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6051                        |params, _| async move {
6052                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6053                                params.position,
6054                                params.position,
6055                            ))))
6056                        },
6057                    );
6058
6059                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6060                        let fs = fs.clone();
6061                        let rng = rng.clone();
6062                        move |_, _| {
6063                            let fs = fs.clone();
6064                            let rng = rng.clone();
6065                            async move {
6066                                let files = fs.files().await;
6067                                let mut rng = rng.lock();
6068                                let count = rng.gen_range::<usize, _>(1..3);
6069                                let files = (0..count)
6070                                    .map(|_| files.choose(&mut *rng).unwrap())
6071                                    .collect::<Vec<_>>();
6072                                log::info!("LSP: Returning definitions in files {:?}", &files);
6073                                Ok(Some(lsp::GotoDefinitionResponse::Array(
6074                                    files
6075                                        .into_iter()
6076                                        .map(|file| lsp::Location {
6077                                            uri: lsp::Url::from_file_path(file).unwrap(),
6078                                            range: Default::default(),
6079                                        })
6080                                        .collect(),
6081                                )))
6082                            }
6083                        }
6084                    });
6085
6086                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6087                        let rng = rng.clone();
6088                        let project = project.clone();
6089                        move |params, mut cx| {
6090                            let highlights = if let Some(project) = project.upgrade(&cx) {
6091                                project.update(&mut cx, |project, cx| {
6092                                    let path = params
6093                                        .text_document_position_params
6094                                        .text_document
6095                                        .uri
6096                                        .to_file_path()
6097                                        .unwrap();
6098                                    let (worktree, relative_path) =
6099                                        project.find_local_worktree(&path, cx)?;
6100                                    let project_path =
6101                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
6102                                    let buffer =
6103                                        project.get_open_buffer(&project_path, cx)?.read(cx);
6104
6105                                    let mut highlights = Vec::new();
6106                                    let highlight_count = rng.lock().gen_range(1..=5);
6107                                    let mut prev_end = 0;
6108                                    for _ in 0..highlight_count {
6109                                        let range =
6110                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
6111
6112                                        highlights.push(lsp::DocumentHighlight {
6113                                            range: range_to_lsp(range.to_point_utf16(buffer)),
6114                                            kind: Some(lsp::DocumentHighlightKind::READ),
6115                                        });
6116                                        prev_end = range.end;
6117                                    }
6118                                    Some(highlights)
6119                                })
6120                            } else {
6121                                None
6122                            };
6123                            async move { Ok(highlights) }
6124                        }
6125                    });
6126                }
6127            })),
6128            ..Default::default()
6129        });
6130        host_language_registry.add(Arc::new(language));
6131
6132        let op_start_signal = futures::channel::mpsc::unbounded();
6133        user_ids.push(host.current_user_id(&host_cx));
6134        op_start_signals.push(op_start_signal.0);
6135        clients.push(host_cx.foreground().spawn(host.simulate_host(
6136            host_project,
6137            op_start_signal.1,
6138            rng.clone(),
6139            host_cx,
6140        )));
6141
6142        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6143            rng.lock().gen_range(0..max_operations)
6144        } else {
6145            max_operations
6146        };
6147        let mut available_guests = vec![
6148            "guest-1".to_string(),
6149            "guest-2".to_string(),
6150            "guest-3".to_string(),
6151            "guest-4".to_string(),
6152        ];
6153        let mut operations = 0;
6154        while operations < max_operations {
6155            if operations == disconnect_host_at {
6156                server.disconnect_client(user_ids[0]);
6157                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6158                drop(op_start_signals);
6159                let mut clients = futures::future::join_all(clients).await;
6160                cx.foreground().run_until_parked();
6161
6162                let (host, mut host_cx, host_err) = clients.remove(0);
6163                if let Some(host_err) = host_err {
6164                    log::error!("host error - {}", host_err);
6165                }
6166                host.project
6167                    .as_ref()
6168                    .unwrap()
6169                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6170                for (guest, mut guest_cx, guest_err) in clients {
6171                    if let Some(guest_err) = guest_err {
6172                        log::error!("{} error - {}", guest.username, guest_err);
6173                    }
6174                    // TODO
6175                    // let contacts = server
6176                    //     .store
6177                    //     .read()
6178                    //     .await
6179                    //     .contacts_for_user(guest.current_user_id(&guest_cx));
6180                    // assert!(!contacts
6181                    //     .iter()
6182                    //     .flat_map(|contact| &contact.projects)
6183                    //     .any(|project| project.id == host_project_id));
6184                    guest
6185                        .project
6186                        .as_ref()
6187                        .unwrap()
6188                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6189                    guest_cx.update(|_| drop(guest));
6190                }
6191                host_cx.update(|_| drop(host));
6192
6193                return;
6194            }
6195
6196            let distribution = rng.lock().gen_range(0..100);
6197            match distribution {
6198                0..=19 if !available_guests.is_empty() => {
6199                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
6200                    let guest_username = available_guests.remove(guest_ix);
6201                    log::info!("Adding new connection for {}", guest_username);
6202                    next_entity_id += 100000;
6203                    let mut guest_cx = TestAppContext::new(
6204                        cx.foreground_platform(),
6205                        cx.platform(),
6206                        deterministic.build_foreground(next_entity_id),
6207                        deterministic.build_background(),
6208                        cx.font_cache(),
6209                        cx.leak_detector(),
6210                        next_entity_id,
6211                    );
6212                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
6213                    let guest_project = Project::remote(
6214                        host_project_id,
6215                        guest.client.clone(),
6216                        guest.user_store.clone(),
6217                        guest_lang_registry.clone(),
6218                        FakeFs::new(cx.background()),
6219                        &mut guest_cx.to_async(),
6220                    )
6221                    .await
6222                    .unwrap();
6223                    let op_start_signal = futures::channel::mpsc::unbounded();
6224                    user_ids.push(guest.current_user_id(&guest_cx));
6225                    op_start_signals.push(op_start_signal.0);
6226                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6227                        guest_username.clone(),
6228                        guest_project,
6229                        op_start_signal.1,
6230                        rng.clone(),
6231                        guest_cx,
6232                    )));
6233
6234                    log::info!("Added connection for {}", guest_username);
6235                    operations += 1;
6236                }
6237                20..=29 if clients.len() > 1 => {
6238                    let guest_ix = rng.lock().gen_range(1..clients.len());
6239                    log::info!("Removing guest {}", user_ids[guest_ix]);
6240                    let removed_guest_id = user_ids.remove(guest_ix);
6241                    let guest = clients.remove(guest_ix);
6242                    op_start_signals.remove(guest_ix);
6243                    server.disconnect_client(removed_guest_id);
6244                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6245                    let (guest, mut guest_cx, guest_err) = guest.await;
6246                    if let Some(guest_err) = guest_err {
6247                        log::error!("{} error - {}", guest.username, guest_err);
6248                    }
6249                    guest
6250                        .project
6251                        .as_ref()
6252                        .unwrap()
6253                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6254                    // TODO
6255                    // for user_id in &user_ids {
6256                    //     for contact in server.store.read().await.contacts_for_user(*user_id) {
6257                    //         assert_ne!(
6258                    //             contact.user_id, removed_guest_id.0 as u64,
6259                    //             "removed guest is still a contact of another peer"
6260                    //         );
6261                    //         for project in contact.projects {
6262                    //             for project_guest_id in project.guests {
6263                    //                 assert_ne!(
6264                    //                     project_guest_id, removed_guest_id.0 as u64,
6265                    //                     "removed guest appears as still participating on a project"
6266                    //                 );
6267                    //             }
6268                    //         }
6269                    //     }
6270                    // }
6271
6272                    log::info!("{} removed", guest.username);
6273                    available_guests.push(guest.username.clone());
6274                    guest_cx.update(|_| drop(guest));
6275
6276                    operations += 1;
6277                }
6278                _ => {
6279                    while operations < max_operations && rng.lock().gen_bool(0.7) {
6280                        op_start_signals
6281                            .choose(&mut *rng.lock())
6282                            .unwrap()
6283                            .unbounded_send(())
6284                            .unwrap();
6285                        operations += 1;
6286                    }
6287
6288                    if rng.lock().gen_bool(0.8) {
6289                        cx.foreground().run_until_parked();
6290                    }
6291                }
6292            }
6293        }
6294
6295        drop(op_start_signals);
6296        let mut clients = futures::future::join_all(clients).await;
6297        cx.foreground().run_until_parked();
6298
6299        let (host_client, mut host_cx, host_err) = clients.remove(0);
6300        if let Some(host_err) = host_err {
6301            panic!("host error - {}", host_err);
6302        }
6303        let host_project = host_client.project.as_ref().unwrap();
6304        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6305            project
6306                .worktrees(cx)
6307                .map(|worktree| {
6308                    let snapshot = worktree.read(cx).snapshot();
6309                    (snapshot.id(), snapshot)
6310                })
6311                .collect::<BTreeMap<_, _>>()
6312        });
6313
6314        host_client
6315            .project
6316            .as_ref()
6317            .unwrap()
6318            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6319
6320        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6321            if let Some(guest_err) = guest_err {
6322                panic!("{} error - {}", guest_client.username, guest_err);
6323            }
6324            let worktree_snapshots =
6325                guest_client
6326                    .project
6327                    .as_ref()
6328                    .unwrap()
6329                    .read_with(&guest_cx, |project, cx| {
6330                        project
6331                            .worktrees(cx)
6332                            .map(|worktree| {
6333                                let worktree = worktree.read(cx);
6334                                (worktree.id(), worktree.snapshot())
6335                            })
6336                            .collect::<BTreeMap<_, _>>()
6337                    });
6338
6339            assert_eq!(
6340                worktree_snapshots.keys().collect::<Vec<_>>(),
6341                host_worktree_snapshots.keys().collect::<Vec<_>>(),
6342                "{} has different worktrees than the host",
6343                guest_client.username
6344            );
6345            for (id, host_snapshot) in &host_worktree_snapshots {
6346                let guest_snapshot = &worktree_snapshots[id];
6347                assert_eq!(
6348                    guest_snapshot.root_name(),
6349                    host_snapshot.root_name(),
6350                    "{} has different root name than the host for worktree {}",
6351                    guest_client.username,
6352                    id
6353                );
6354                assert_eq!(
6355                    guest_snapshot.entries(false).collect::<Vec<_>>(),
6356                    host_snapshot.entries(false).collect::<Vec<_>>(),
6357                    "{} has different snapshot than the host for worktree {}",
6358                    guest_client.username,
6359                    id
6360                );
6361                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6362            }
6363
6364            guest_client
6365                .project
6366                .as_ref()
6367                .unwrap()
6368                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6369
6370            for guest_buffer in &guest_client.buffers {
6371                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6372                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6373                    project.buffer_for_id(buffer_id, cx).expect(&format!(
6374                        "host does not have buffer for guest:{}, peer:{}, id:{}",
6375                        guest_client.username, guest_client.peer_id, buffer_id
6376                    ))
6377                });
6378                let path = host_buffer
6379                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6380
6381                assert_eq!(
6382                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6383                    0,
6384                    "{}, buffer {}, path {:?} has deferred operations",
6385                    guest_client.username,
6386                    buffer_id,
6387                    path,
6388                );
6389                assert_eq!(
6390                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6391                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6392                    "{}, buffer {}, path {:?}, differs from the host's buffer",
6393                    guest_client.username,
6394                    buffer_id,
6395                    path
6396                );
6397            }
6398
6399            guest_cx.update(|_| drop(guest_client));
6400        }
6401
6402        host_cx.update(|_| drop(host_client));
6403    }
6404
6405    struct TestServer {
6406        peer: Arc<Peer>,
6407        app_state: Arc<AppState>,
6408        server: Arc<Server>,
6409        foreground: Rc<executor::Foreground>,
6410        notifications: mpsc::UnboundedReceiver<()>,
6411        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6412        forbid_connections: Arc<AtomicBool>,
6413        _test_db: TestDb,
6414    }
6415
6416    impl TestServer {
6417        async fn start(
6418            foreground: Rc<executor::Foreground>,
6419            background: Arc<executor::Background>,
6420        ) -> Self {
6421            let test_db = TestDb::fake(background);
6422            let app_state = Self::build_app_state(&test_db).await;
6423            let peer = Peer::new();
6424            let notifications = mpsc::unbounded();
6425            let server = Server::new(app_state.clone(), Some(notifications.0));
6426            Self {
6427                peer,
6428                app_state,
6429                server,
6430                foreground,
6431                notifications: notifications.1,
6432                connection_killers: Default::default(),
6433                forbid_connections: Default::default(),
6434                _test_db: test_db,
6435            }
6436        }
6437
6438        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6439            cx.update(|cx| {
6440                let settings = Settings::test(cx);
6441                cx.set_global(settings);
6442            });
6443
6444            let http = FakeHttpClient::with_404_response();
6445            let user_id =
6446                if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6447                    user.id
6448                } else {
6449                    self.app_state.db.create_user(name, false).await.unwrap()
6450                };
6451            let client_name = name.to_string();
6452            let mut client = Client::new(http.clone());
6453            let server = self.server.clone();
6454            let db = self.app_state.db.clone();
6455            let connection_killers = self.connection_killers.clone();
6456            let forbid_connections = self.forbid_connections.clone();
6457            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6458
6459            Arc::get_mut(&mut client)
6460                .unwrap()
6461                .override_authenticate(move |cx| {
6462                    cx.spawn(|_| async move {
6463                        let access_token = "the-token".to_string();
6464                        Ok(Credentials {
6465                            user_id: user_id.0 as u64,
6466                            access_token,
6467                        })
6468                    })
6469                })
6470                .override_establish_connection(move |credentials, cx| {
6471                    assert_eq!(credentials.user_id, user_id.0 as u64);
6472                    assert_eq!(credentials.access_token, "the-token");
6473
6474                    let server = server.clone();
6475                    let db = db.clone();
6476                    let connection_killers = connection_killers.clone();
6477                    let forbid_connections = forbid_connections.clone();
6478                    let client_name = client_name.clone();
6479                    let connection_id_tx = connection_id_tx.clone();
6480                    cx.spawn(move |cx| async move {
6481                        if forbid_connections.load(SeqCst) {
6482                            Err(EstablishConnectionError::other(anyhow!(
6483                                "server is forbidding connections"
6484                            )))
6485                        } else {
6486                            let (client_conn, server_conn, killed) =
6487                                Connection::in_memory(cx.background());
6488                            connection_killers.lock().insert(user_id, killed);
6489                            let user = db.get_user_by_id(user_id).await.unwrap().unwrap();
6490                            cx.background()
6491                                .spawn(server.handle_connection(
6492                                    server_conn,
6493                                    client_name,
6494                                    user,
6495                                    Some(connection_id_tx),
6496                                    cx.background(),
6497                                ))
6498                                .detach();
6499                            Ok(client_conn)
6500                        }
6501                    })
6502                });
6503
6504            client
6505                .authenticate_and_connect(false, &cx.to_async())
6506                .await
6507                .unwrap();
6508
6509            Channel::init(&client);
6510            Project::init(&client);
6511            cx.update(|cx| {
6512                workspace::init(&client, cx);
6513            });
6514
6515            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6516            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6517
6518            let client = TestClient {
6519                client,
6520                peer_id,
6521                username: name.to_string(),
6522                user_store,
6523                language_registry: Arc::new(LanguageRegistry::test()),
6524                project: Default::default(),
6525                buffers: Default::default(),
6526            };
6527            client.wait_for_current_user(cx).await;
6528            client
6529        }
6530
6531        fn disconnect_client(&self, user_id: UserId) {
6532            self.connection_killers
6533                .lock()
6534                .remove(&user_id)
6535                .unwrap()
6536                .store(true, SeqCst);
6537        }
6538
6539        fn forbid_connections(&self) {
6540            self.forbid_connections.store(true, SeqCst);
6541        }
6542
6543        fn allow_connections(&self) {
6544            self.forbid_connections.store(false, SeqCst);
6545        }
6546
6547        async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6548            while let Some((client_a, cx_a)) = clients.pop() {
6549                for (client_b, cx_b) in &mut clients {
6550                    client_a
6551                        .user_store
6552                        .update(cx_a, |store, cx| {
6553                            store.request_contact(client_b.user_id().unwrap(), cx)
6554                        })
6555                        .await
6556                        .unwrap();
6557                    cx_a.foreground().run_until_parked();
6558                    client_b
6559                        .user_store
6560                        .update(*cx_b, |store, cx| {
6561                            store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6562                        })
6563                        .await
6564                        .unwrap();
6565                }
6566            }
6567        }
6568
6569        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6570            Arc::new(AppState {
6571                db: test_db.db().clone(),
6572                api_token: Default::default(),
6573            })
6574        }
6575
6576        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6577            self.server.store.read().await
6578        }
6579
6580        async fn condition<F>(&mut self, mut predicate: F)
6581        where
6582            F: FnMut(&Store) -> bool,
6583        {
6584            assert!(
6585                self.foreground.parking_forbidden(),
6586                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6587            );
6588            while !(predicate)(&*self.server.store.read().await) {
6589                self.foreground.start_waiting();
6590                self.notifications.next().await;
6591                self.foreground.finish_waiting();
6592            }
6593        }
6594    }
6595
6596    impl Deref for TestServer {
6597        type Target = Server;
6598
6599        fn deref(&self) -> &Self::Target {
6600            &self.server
6601        }
6602    }
6603
6604    impl Drop for TestServer {
6605        fn drop(&mut self) {
6606            self.peer.reset();
6607        }
6608    }
6609
6610    struct TestClient {
6611        client: Arc<Client>,
6612        username: String,
6613        pub peer_id: PeerId,
6614        pub user_store: ModelHandle<UserStore>,
6615        language_registry: Arc<LanguageRegistry>,
6616        project: Option<ModelHandle<Project>>,
6617        buffers: HashSet<ModelHandle<language::Buffer>>,
6618    }
6619
6620    impl Deref for TestClient {
6621        type Target = Arc<Client>;
6622
6623        fn deref(&self) -> &Self::Target {
6624            &self.client
6625        }
6626    }
6627
6628    struct ContactsSummary {
6629        pub current: Vec<String>,
6630        pub outgoing_requests: Vec<String>,
6631        pub incoming_requests: Vec<String>,
6632    }
6633
6634    impl TestClient {
6635        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6636            UserId::from_proto(
6637                self.user_store
6638                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6639            )
6640        }
6641
6642        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6643            let mut authed_user = self
6644                .user_store
6645                .read_with(cx, |user_store, _| user_store.watch_current_user());
6646            while authed_user.next().await.unwrap().is_none() {}
6647        }
6648
6649        async fn clear_contacts(&self, cx: &mut TestAppContext) {
6650            self.user_store
6651                .update(cx, |store, _| store.clear_contacts())
6652                .await;
6653        }
6654
6655        fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6656            self.user_store.read_with(cx, |store, _| ContactsSummary {
6657                current: store
6658                    .contacts()
6659                    .iter()
6660                    .map(|contact| contact.user.github_login.clone())
6661                    .collect(),
6662                outgoing_requests: store
6663                    .outgoing_contact_requests()
6664                    .iter()
6665                    .map(|user| user.github_login.clone())
6666                    .collect(),
6667                incoming_requests: store
6668                    .incoming_contact_requests()
6669                    .iter()
6670                    .map(|user| user.github_login.clone())
6671                    .collect(),
6672            })
6673        }
6674
6675        async fn build_local_project(
6676            &mut self,
6677            fs: Arc<FakeFs>,
6678            root_path: impl AsRef<Path>,
6679            cx: &mut TestAppContext,
6680        ) -> (ModelHandle<Project>, WorktreeId) {
6681            let project = cx.update(|cx| {
6682                Project::local(
6683                    self.client.clone(),
6684                    self.user_store.clone(),
6685                    self.language_registry.clone(),
6686                    fs,
6687                    cx,
6688                )
6689            });
6690            self.project = Some(project.clone());
6691            let (worktree, _) = project
6692                .update(cx, |p, cx| {
6693                    p.find_or_create_local_worktree(root_path, true, cx)
6694                })
6695                .await
6696                .unwrap();
6697            worktree
6698                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6699                .await;
6700            project
6701                .update(cx, |project, _| project.next_remote_id())
6702                .await;
6703            (project, worktree.read_with(cx, |tree, _| tree.id()))
6704        }
6705
6706        async fn build_remote_project(
6707            &mut self,
6708            project_id: u64,
6709            cx: &mut TestAppContext,
6710        ) -> ModelHandle<Project> {
6711            let project = Project::remote(
6712                project_id,
6713                self.client.clone(),
6714                self.user_store.clone(),
6715                self.language_registry.clone(),
6716                FakeFs::new(cx.background()),
6717                &mut cx.to_async(),
6718            )
6719            .await
6720            .unwrap();
6721            self.project = Some(project.clone());
6722            project
6723        }
6724
6725        fn build_workspace(
6726            &self,
6727            project: &ModelHandle<Project>,
6728            cx: &mut TestAppContext,
6729        ) -> ViewHandle<Workspace> {
6730            let (window_id, _) = cx.add_window(|_| EmptyView);
6731            cx.add_view(window_id, |cx| {
6732                let fs = project.read(cx).fs().clone();
6733                Workspace::new(
6734                    &WorkspaceParams {
6735                        fs,
6736                        project: project.clone(),
6737                        user_store: self.user_store.clone(),
6738                        languages: self.language_registry.clone(),
6739                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6740                        channel_list: cx.add_model(|cx| {
6741                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6742                        }),
6743                        client: self.client.clone(),
6744                    },
6745                    cx,
6746                )
6747            })
6748        }
6749
6750        async fn simulate_host(
6751            mut self,
6752            project: ModelHandle<Project>,
6753            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6754            rng: Arc<Mutex<StdRng>>,
6755            mut cx: TestAppContext,
6756        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6757            async fn simulate_host_internal(
6758                client: &mut TestClient,
6759                project: ModelHandle<Project>,
6760                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6761                rng: Arc<Mutex<StdRng>>,
6762                cx: &mut TestAppContext,
6763            ) -> anyhow::Result<()> {
6764                let fs = project.read_with(cx, |project, _| project.fs().clone());
6765
6766                while op_start_signal.next().await.is_some() {
6767                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6768                    let files = fs.as_fake().files().await;
6769                    match distribution {
6770                        0..=20 if !files.is_empty() => {
6771                            let path = files.choose(&mut *rng.lock()).unwrap();
6772                            let mut path = path.as_path();
6773                            while let Some(parent_path) = path.parent() {
6774                                path = parent_path;
6775                                if rng.lock().gen() {
6776                                    break;
6777                                }
6778                            }
6779
6780                            log::info!("Host: find/create local worktree {:?}", path);
6781                            let find_or_create_worktree = project.update(cx, |project, cx| {
6782                                project.find_or_create_local_worktree(path, true, cx)
6783                            });
6784                            if rng.lock().gen() {
6785                                cx.background().spawn(find_or_create_worktree).detach();
6786                            } else {
6787                                find_or_create_worktree.await?;
6788                            }
6789                        }
6790                        10..=80 if !files.is_empty() => {
6791                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6792                                let file = files.choose(&mut *rng.lock()).unwrap();
6793                                let (worktree, path) = project
6794                                    .update(cx, |project, cx| {
6795                                        project.find_or_create_local_worktree(
6796                                            file.clone(),
6797                                            true,
6798                                            cx,
6799                                        )
6800                                    })
6801                                    .await?;
6802                                let project_path =
6803                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6804                                log::info!(
6805                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6806                                    file,
6807                                    project_path.0,
6808                                    project_path.1
6809                                );
6810                                let buffer = project
6811                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6812                                    .await
6813                                    .unwrap();
6814                                client.buffers.insert(buffer.clone());
6815                                buffer
6816                            } else {
6817                                client
6818                                    .buffers
6819                                    .iter()
6820                                    .choose(&mut *rng.lock())
6821                                    .unwrap()
6822                                    .clone()
6823                            };
6824
6825                            if rng.lock().gen_bool(0.1) {
6826                                cx.update(|cx| {
6827                                    log::info!(
6828                                        "Host: dropping buffer {:?}",
6829                                        buffer.read(cx).file().unwrap().full_path(cx)
6830                                    );
6831                                    client.buffers.remove(&buffer);
6832                                    drop(buffer);
6833                                });
6834                            } else {
6835                                buffer.update(cx, |buffer, cx| {
6836                                    log::info!(
6837                                        "Host: updating buffer {:?} ({})",
6838                                        buffer.file().unwrap().full_path(cx),
6839                                        buffer.remote_id()
6840                                    );
6841
6842                                    if rng.lock().gen_bool(0.7) {
6843                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6844                                    } else {
6845                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6846                                    }
6847                                });
6848                            }
6849                        }
6850                        _ => loop {
6851                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6852                            let mut path = PathBuf::new();
6853                            path.push("/");
6854                            for _ in 0..path_component_count {
6855                                let letter = rng.lock().gen_range(b'a'..=b'z');
6856                                path.push(std::str::from_utf8(&[letter]).unwrap());
6857                            }
6858                            path.set_extension("rs");
6859                            let parent_path = path.parent().unwrap();
6860
6861                            log::info!("Host: creating file {:?}", path,);
6862
6863                            if fs.create_dir(&parent_path).await.is_ok()
6864                                && fs.create_file(&path, Default::default()).await.is_ok()
6865                            {
6866                                break;
6867                            } else {
6868                                log::info!("Host: cannot create file");
6869                            }
6870                        },
6871                    }
6872
6873                    cx.background().simulate_random_delay().await;
6874                }
6875
6876                Ok(())
6877            }
6878
6879            let result =
6880                simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
6881                    .await;
6882            log::info!("Host done");
6883            self.project = Some(project);
6884            (self, cx, result.err())
6885        }
6886
6887        pub async fn simulate_guest(
6888            mut self,
6889            guest_username: String,
6890            project: ModelHandle<Project>,
6891            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6892            rng: Arc<Mutex<StdRng>>,
6893            mut cx: TestAppContext,
6894        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6895            async fn simulate_guest_internal(
6896                client: &mut TestClient,
6897                guest_username: &str,
6898                project: ModelHandle<Project>,
6899                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6900                rng: Arc<Mutex<StdRng>>,
6901                cx: &mut TestAppContext,
6902            ) -> anyhow::Result<()> {
6903                while op_start_signal.next().await.is_some() {
6904                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6905                        let worktree = if let Some(worktree) =
6906                            project.read_with(cx, |project, cx| {
6907                                project
6908                                    .worktrees(&cx)
6909                                    .filter(|worktree| {
6910                                        let worktree = worktree.read(cx);
6911                                        worktree.is_visible()
6912                                            && worktree.entries(false).any(|e| e.is_file())
6913                                    })
6914                                    .choose(&mut *rng.lock())
6915                            }) {
6916                            worktree
6917                        } else {
6918                            cx.background().simulate_random_delay().await;
6919                            continue;
6920                        };
6921
6922                        let (worktree_root_name, project_path) =
6923                            worktree.read_with(cx, |worktree, _| {
6924                                let entry = worktree
6925                                    .entries(false)
6926                                    .filter(|e| e.is_file())
6927                                    .choose(&mut *rng.lock())
6928                                    .unwrap();
6929                                (
6930                                    worktree.root_name().to_string(),
6931                                    (worktree.id(), entry.path.clone()),
6932                                )
6933                            });
6934                        log::info!(
6935                            "{}: opening path {:?} in worktree {} ({})",
6936                            guest_username,
6937                            project_path.1,
6938                            project_path.0,
6939                            worktree_root_name,
6940                        );
6941                        let buffer = project
6942                            .update(cx, |project, cx| {
6943                                project.open_buffer(project_path.clone(), cx)
6944                            })
6945                            .await?;
6946                        log::info!(
6947                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6948                            guest_username,
6949                            project_path.1,
6950                            project_path.0,
6951                            worktree_root_name,
6952                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6953                        );
6954                        client.buffers.insert(buffer.clone());
6955                        buffer
6956                    } else {
6957                        client
6958                            .buffers
6959                            .iter()
6960                            .choose(&mut *rng.lock())
6961                            .unwrap()
6962                            .clone()
6963                    };
6964
6965                    let choice = rng.lock().gen_range(0..100);
6966                    match choice {
6967                        0..=9 => {
6968                            cx.update(|cx| {
6969                                log::info!(
6970                                    "{}: dropping buffer {:?}",
6971                                    guest_username,
6972                                    buffer.read(cx).file().unwrap().full_path(cx)
6973                                );
6974                                client.buffers.remove(&buffer);
6975                                drop(buffer);
6976                            });
6977                        }
6978                        10..=19 => {
6979                            let completions = project.update(cx, |project, cx| {
6980                                log::info!(
6981                                    "{}: requesting completions for buffer {} ({:?})",
6982                                    guest_username,
6983                                    buffer.read(cx).remote_id(),
6984                                    buffer.read(cx).file().unwrap().full_path(cx)
6985                                );
6986                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6987                                project.completions(&buffer, offset, cx)
6988                            });
6989                            let completions = cx.background().spawn(async move {
6990                                completions
6991                                    .await
6992                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6993                            });
6994                            if rng.lock().gen_bool(0.3) {
6995                                log::info!("{}: detaching completions request", guest_username);
6996                                cx.update(|cx| completions.detach_and_log_err(cx));
6997                            } else {
6998                                completions.await?;
6999                            }
7000                        }
7001                        20..=29 => {
7002                            let code_actions = project.update(cx, |project, cx| {
7003                                log::info!(
7004                                    "{}: requesting code actions for buffer {} ({:?})",
7005                                    guest_username,
7006                                    buffer.read(cx).remote_id(),
7007                                    buffer.read(cx).file().unwrap().full_path(cx)
7008                                );
7009                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7010                                project.code_actions(&buffer, range, cx)
7011                            });
7012                            let code_actions = cx.background().spawn(async move {
7013                                code_actions.await.map_err(|err| {
7014                                    anyhow!("code actions request failed: {:?}", err)
7015                                })
7016                            });
7017                            if rng.lock().gen_bool(0.3) {
7018                                log::info!("{}: detaching code actions request", guest_username);
7019                                cx.update(|cx| code_actions.detach_and_log_err(cx));
7020                            } else {
7021                                code_actions.await?;
7022                            }
7023                        }
7024                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7025                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7026                                log::info!(
7027                                    "{}: saving buffer {} ({:?})",
7028                                    guest_username,
7029                                    buffer.remote_id(),
7030                                    buffer.file().unwrap().full_path(cx)
7031                                );
7032                                (buffer.version(), buffer.save(cx))
7033                            });
7034                            let save = cx.background().spawn(async move {
7035                                let (saved_version, _) = save
7036                                    .await
7037                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7038                                assert!(saved_version.observed_all(&requested_version));
7039                                Ok::<_, anyhow::Error>(())
7040                            });
7041                            if rng.lock().gen_bool(0.3) {
7042                                log::info!("{}: detaching save request", guest_username);
7043                                cx.update(|cx| save.detach_and_log_err(cx));
7044                            } else {
7045                                save.await?;
7046                            }
7047                        }
7048                        40..=44 => {
7049                            let prepare_rename = project.update(cx, |project, cx| {
7050                                log::info!(
7051                                    "{}: preparing rename for buffer {} ({:?})",
7052                                    guest_username,
7053                                    buffer.read(cx).remote_id(),
7054                                    buffer.read(cx).file().unwrap().full_path(cx)
7055                                );
7056                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7057                                project.prepare_rename(buffer, offset, cx)
7058                            });
7059                            let prepare_rename = cx.background().spawn(async move {
7060                                prepare_rename.await.map_err(|err| {
7061                                    anyhow!("prepare rename request failed: {:?}", err)
7062                                })
7063                            });
7064                            if rng.lock().gen_bool(0.3) {
7065                                log::info!("{}: detaching prepare rename request", guest_username);
7066                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7067                            } else {
7068                                prepare_rename.await?;
7069                            }
7070                        }
7071                        45..=49 => {
7072                            let definitions = project.update(cx, |project, cx| {
7073                                log::info!(
7074                                    "{}: requesting definitions for buffer {} ({:?})",
7075                                    guest_username,
7076                                    buffer.read(cx).remote_id(),
7077                                    buffer.read(cx).file().unwrap().full_path(cx)
7078                                );
7079                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7080                                project.definition(&buffer, offset, cx)
7081                            });
7082                            let definitions = cx.background().spawn(async move {
7083                                definitions
7084                                    .await
7085                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7086                            });
7087                            if rng.lock().gen_bool(0.3) {
7088                                log::info!("{}: detaching definitions request", guest_username);
7089                                cx.update(|cx| definitions.detach_and_log_err(cx));
7090                            } else {
7091                                client
7092                                    .buffers
7093                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7094                            }
7095                        }
7096                        50..=54 => {
7097                            let highlights = project.update(cx, |project, cx| {
7098                                log::info!(
7099                                    "{}: requesting highlights for buffer {} ({:?})",
7100                                    guest_username,
7101                                    buffer.read(cx).remote_id(),
7102                                    buffer.read(cx).file().unwrap().full_path(cx)
7103                                );
7104                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7105                                project.document_highlights(&buffer, offset, cx)
7106                            });
7107                            let highlights = cx.background().spawn(async move {
7108                                highlights
7109                                    .await
7110                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7111                            });
7112                            if rng.lock().gen_bool(0.3) {
7113                                log::info!("{}: detaching highlights request", guest_username);
7114                                cx.update(|cx| highlights.detach_and_log_err(cx));
7115                            } else {
7116                                highlights.await?;
7117                            }
7118                        }
7119                        55..=59 => {
7120                            let search = project.update(cx, |project, cx| {
7121                                let query = rng.lock().gen_range('a'..='z');
7122                                log::info!("{}: project-wide search {:?}", guest_username, query);
7123                                project.search(SearchQuery::text(query, false, false), cx)
7124                            });
7125                            let search = cx.background().spawn(async move {
7126                                search
7127                                    .await
7128                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
7129                            });
7130                            if rng.lock().gen_bool(0.3) {
7131                                log::info!("{}: detaching search request", guest_username);
7132                                cx.update(|cx| search.detach_and_log_err(cx));
7133                            } else {
7134                                client.buffers.extend(search.await?.into_keys());
7135                            }
7136                        }
7137                        60..=69 => {
7138                            let worktree = project
7139                                .read_with(cx, |project, cx| {
7140                                    project
7141                                        .worktrees(&cx)
7142                                        .filter(|worktree| {
7143                                            let worktree = worktree.read(cx);
7144                                            worktree.is_visible()
7145                                                && worktree.entries(false).any(|e| e.is_file())
7146                                                && worktree
7147                                                    .root_entry()
7148                                                    .map_or(false, |e| e.is_dir())
7149                                        })
7150                                        .choose(&mut *rng.lock())
7151                                })
7152                                .unwrap();
7153                            let (worktree_id, worktree_root_name) = worktree
7154                                .read_with(cx, |worktree, _| {
7155                                    (worktree.id(), worktree.root_name().to_string())
7156                                });
7157
7158                            let mut new_name = String::new();
7159                            for _ in 0..10 {
7160                                let letter = rng.lock().gen_range('a'..='z');
7161                                new_name.push(letter);
7162                            }
7163                            let mut new_path = PathBuf::new();
7164                            new_path.push(new_name);
7165                            new_path.set_extension("rs");
7166                            log::info!(
7167                                "{}: creating {:?} in worktree {} ({})",
7168                                guest_username,
7169                                new_path,
7170                                worktree_id,
7171                                worktree_root_name,
7172                            );
7173                            project
7174                                .update(cx, |project, cx| {
7175                                    project.create_entry((worktree_id, new_path), false, cx)
7176                                })
7177                                .unwrap()
7178                                .await?;
7179                        }
7180                        _ => {
7181                            buffer.update(cx, |buffer, cx| {
7182                                log::info!(
7183                                    "{}: updating buffer {} ({:?})",
7184                                    guest_username,
7185                                    buffer.remote_id(),
7186                                    buffer.file().unwrap().full_path(cx)
7187                                );
7188                                if rng.lock().gen_bool(0.7) {
7189                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7190                                } else {
7191                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7192                                }
7193                            });
7194                        }
7195                    }
7196                    cx.background().simulate_random_delay().await;
7197                }
7198                Ok(())
7199            }
7200
7201            let result = simulate_guest_internal(
7202                &mut self,
7203                &guest_username,
7204                project.clone(),
7205                op_start_signal,
7206                rng,
7207                &mut cx,
7208            )
7209            .await;
7210            log::info!("{}: done", guest_username);
7211
7212            self.project = Some(project);
7213            (self, cx, result.err())
7214        }
7215    }
7216
7217    impl Drop for TestClient {
7218        fn drop(&mut self) {
7219            self.client.tear_down();
7220        }
7221    }
7222
7223    impl Executor for Arc<gpui::executor::Background> {
7224        type Sleep = gpui::executor::Timer;
7225
7226        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7227            self.spawn(future).detach();
7228        }
7229
7230        fn sleep(&self, duration: Duration) -> Self::Sleep {
7231            self.as_ref().timer(duration)
7232        }
7233    }
7234
7235    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7236        channel
7237            .messages()
7238            .cursor::<()>()
7239            .map(|m| {
7240                (
7241                    m.sender.github_login.clone(),
7242                    m.body.clone(),
7243                    m.is_pending(),
7244                )
7245            })
7246            .collect()
7247    }
7248
7249    struct EmptyView;
7250
7251    impl gpui::Entity for EmptyView {
7252        type Event = ();
7253    }
7254
7255    impl gpui::View for EmptyView {
7256        fn ui_name() -> &'static str {
7257            "empty view"
7258        }
7259
7260        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7261            gpui::Element::boxed(gpui::elements::Empty::new())
7262        }
7263    }
7264}