rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{self, ChannelId, MessageId, User, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::IntoResponse,
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::HashMap;
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::{
  40        atomic::{AtomicBool, Ordering::SeqCst},
  41        Arc,
  42    },
  43    time::Duration,
  44};
  45use store::{Store, Worktree};
  46use time::OffsetDateTime;
  47use tokio::{
  48    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  49    time::Sleep,
  50};
  51use tower::ServiceBuilder;
  52use tracing::{info_span, instrument, Instrument};
  53
  54type MessageHandler =
  55    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  56
  57struct Response<R> {
  58    server: Arc<Server>,
  59    receipt: Receipt<R>,
  60    responded: Arc<AtomicBool>,
  61}
  62
  63impl<R: RequestMessage> Response<R> {
  64    fn send(self, payload: R::Response) -> Result<()> {
  65        self.responded.store(true, SeqCst);
  66        self.server.peer.respond(self.receipt, payload)?;
  67        Ok(())
  68    }
  69}
  70
  71pub struct Server {
  72    peer: Arc<Peer>,
  73    store: RwLock<Store>,
  74    app_state: Arc<AppState>,
  75    handlers: HashMap<TypeId, MessageHandler>,
  76    notifications: Option<mpsc::UnboundedSender<()>>,
  77}
  78
  79pub trait Executor: Send + Clone {
  80    type Sleep: Send + Future;
  81    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  82    fn sleep(&self, duration: Duration) -> Self::Sleep;
  83}
  84
  85#[derive(Clone)]
  86pub struct RealExecutor;
  87
  88const MESSAGE_COUNT_PER_PAGE: usize = 100;
  89const MAX_MESSAGE_LEN: usize = 1024;
  90
  91struct StoreReadGuard<'a> {
  92    guard: RwLockReadGuard<'a, Store>,
  93    _not_send: PhantomData<Rc<()>>,
  94}
  95
  96struct StoreWriteGuard<'a> {
  97    guard: RwLockWriteGuard<'a, Store>,
  98    _not_send: PhantomData<Rc<()>>,
  99}
 100
 101impl Server {
 102    pub fn new(
 103        app_state: Arc<AppState>,
 104        notifications: Option<mpsc::UnboundedSender<()>>,
 105    ) -> Arc<Self> {
 106        let mut server = Self {
 107            peer: Peer::new(),
 108            app_state,
 109            store: Default::default(),
 110            handlers: Default::default(),
 111            notifications,
 112        };
 113
 114        server
 115            .add_request_handler(Server::ping)
 116            .add_request_handler(Server::register_project)
 117            .add_message_handler(Server::unregister_project)
 118            .add_request_handler(Server::share_project)
 119            .add_message_handler(Server::unshare_project)
 120            .add_request_handler(Server::join_project)
 121            .add_message_handler(Server::leave_project)
 122            .add_request_handler(Server::register_worktree)
 123            .add_message_handler(Server::unregister_worktree)
 124            .add_request_handler(Server::update_worktree)
 125            .add_message_handler(Server::start_language_server)
 126            .add_message_handler(Server::update_language_server)
 127            .add_message_handler(Server::update_diagnostic_summary)
 128            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 129            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 130            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 131            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 132            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 133            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 134            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 135            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 136            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 137            .add_request_handler(
 138                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 139            )
 140            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 141            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 142            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 143            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 144            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 145            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 146            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 147            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 148            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 149            .add_request_handler(Server::update_buffer)
 150            .add_message_handler(Server::update_buffer_file)
 151            .add_message_handler(Server::buffer_reloaded)
 152            .add_message_handler(Server::buffer_saved)
 153            .add_request_handler(Server::save_buffer)
 154            .add_request_handler(Server::get_channels)
 155            .add_request_handler(Server::get_users)
 156            .add_request_handler(Server::fuzzy_search_users)
 157            .add_request_handler(Server::request_contact)
 158            .add_request_handler(Server::remove_contact)
 159            .add_request_handler(Server::respond_to_contact_request)
 160            .add_request_handler(Server::join_channel)
 161            .add_message_handler(Server::leave_channel)
 162            .add_request_handler(Server::send_channel_message)
 163            .add_request_handler(Server::follow)
 164            .add_message_handler(Server::unfollow)
 165            .add_message_handler(Server::update_followers)
 166            .add_request_handler(Server::get_channel_messages);
 167
 168        Arc::new(server)
 169    }
 170
 171    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 172    where
 173        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 174        Fut: 'static + Send + Future<Output = Result<()>>,
 175        M: EnvelopedMessage,
 176    {
 177        let prev_handler = self.handlers.insert(
 178            TypeId::of::<M>(),
 179            Box::new(move |server, envelope| {
 180                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 181                let span = info_span!(
 182                    "handle message",
 183                    payload_type = envelope.payload_type_name(),
 184                    payload = format!("{:?}", envelope.payload).as_str(),
 185                );
 186                let future = (handler)(server, *envelope);
 187                async move {
 188                    if let Err(error) = future.await {
 189                        tracing::error!(%error, "error handling message");
 190                    }
 191                }
 192                .instrument(span)
 193                .boxed()
 194            }),
 195        );
 196        if prev_handler.is_some() {
 197            panic!("registered a handler for the same message twice");
 198        }
 199        self
 200    }
 201
 202    /// Handle a request while holding a lock to the store. This is useful when we're registering
 203    /// a connection but we want to respond on the connection before anybody else can send on it.
 204    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 205    where
 206        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
 207        Fut: Send + Future<Output = Result<()>>,
 208        M: RequestMessage,
 209    {
 210        let handler = Arc::new(handler);
 211        self.add_message_handler(move |server, envelope| {
 212            let receipt = envelope.receipt();
 213            let handler = handler.clone();
 214            async move {
 215                let responded = Arc::new(AtomicBool::default());
 216                let response = Response {
 217                    server: server.clone(),
 218                    responded: responded.clone(),
 219                    receipt: envelope.receipt(),
 220                };
 221                match (handler)(server.clone(), envelope, response).await {
 222                    Ok(()) => {
 223                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 224                            Ok(())
 225                        } else {
 226                            Err(anyhow!("handler did not send a response"))?
 227                        }
 228                    }
 229                    Err(error) => {
 230                        server.peer.respond_with_error(
 231                            receipt,
 232                            proto::Error {
 233                                message: error.to_string(),
 234                            },
 235                        )?;
 236                        Err(error)
 237                    }
 238                }
 239            }
 240        })
 241    }
 242
 243    pub fn handle_connection<E: Executor>(
 244        self: &Arc<Self>,
 245        connection: Connection,
 246        address: String,
 247        user: User,
 248        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 249        executor: E,
 250    ) -> impl Future<Output = Result<()>> {
 251        let mut this = self.clone();
 252        let user_id = user.id;
 253        let login = user.github_login;
 254        let span = info_span!("handle connection", %user_id, %login, %address);
 255        async move {
 256            let (connection_id, handle_io, mut incoming_rx) = this
 257                .peer
 258                .add_connection(connection, {
 259                    let executor = executor.clone();
 260                    move |duration| {
 261                        let timer = executor.sleep(duration);
 262                        async move {
 263                            timer.await;
 264                        }
 265                    }
 266                })
 267                .await;
 268
 269            tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
 270
 271            if let Some(send_connection_id) = send_connection_id.as_mut() {
 272                let _ = send_connection_id.send(connection_id).await;
 273            }
 274
 275            let contacts = this.app_state.db.get_contacts(user_id).await?;
 276
 277            {
 278                let mut store = this.store_mut().await;
 279                store.add_connection(connection_id, user_id);
 280                this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 281            }
 282            this.update_user_contacts(user_id).await?;
 283
 284            let handle_io = handle_io.fuse();
 285            futures::pin_mut!(handle_io);
 286            loop {
 287                let next_message = incoming_rx.next().fuse();
 288                futures::pin_mut!(next_message);
 289                futures::select_biased! {
 290                    result = handle_io => {
 291                        if let Err(error) = result {
 292                            tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
 293                        }
 294                        break;
 295                    }
 296                    message = next_message => {
 297                        if let Some(message) = message {
 298                            let type_name = message.payload_type_name();
 299                            let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
 300                            async {
 301                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 302                                    let notifications = this.notifications.clone();
 303                                    let is_background = message.is_background();
 304                                    let handle_message = (handler)(this.clone(), message);
 305                                    let handle_message = async move {
 306                                        handle_message.await;
 307                                        if let Some(mut notifications) = notifications {
 308                                            let _ = notifications.send(()).await;
 309                                        }
 310                                    };
 311                                    if is_background {
 312                                        executor.spawn_detached(handle_message);
 313                                    } else {
 314                                        handle_message.await;
 315                                    }
 316                                } else {
 317                                    tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
 318                                }
 319                            }.instrument(span).await;
 320                        } else {
 321                            tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
 322                            break;
 323                        }
 324                    }
 325                }
 326            }
 327
 328            tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
 329            if let Err(error) = this.sign_out(connection_id).await {
 330                tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
 331            }
 332
 333            Ok(())
 334        }.instrument(span)
 335    }
 336
 337    #[instrument(skip(self), err)]
 338    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 339        self.peer.disconnect(connection_id);
 340        let removed_connection = self.store_mut().await.remove_connection(connection_id)?;
 341
 342        for (project_id, project) in removed_connection.hosted_projects {
 343            if let Some(share) = project.share {
 344                broadcast(connection_id, share.guests.keys().copied(), |conn_id| {
 345                    self.peer
 346                        .send(conn_id, proto::UnshareProject { project_id })
 347                });
 348            }
 349        }
 350
 351        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 352            broadcast(connection_id, peer_ids, |conn_id| {
 353                self.peer.send(
 354                    conn_id,
 355                    proto::RemoveProjectCollaborator {
 356                        project_id,
 357                        peer_id: connection_id.0,
 358                    },
 359                )
 360            });
 361        }
 362
 363        self.update_user_contacts(removed_connection.user_id)
 364            .await?;
 365
 366        Ok(())
 367    }
 368
 369    async fn ping(
 370        self: Arc<Server>,
 371        _: TypedEnvelope<proto::Ping>,
 372        response: Response<proto::Ping>,
 373    ) -> Result<()> {
 374        response.send(proto::Ack {})?;
 375        Ok(())
 376    }
 377
 378    async fn register_project(
 379        self: Arc<Server>,
 380        request: TypedEnvelope<proto::RegisterProject>,
 381        response: Response<proto::RegisterProject>,
 382    ) -> Result<()> {
 383        let user_id;
 384        let project_id;
 385        {
 386            let mut state = self.store_mut().await;
 387            user_id = state.user_id_for_connection(request.sender_id)?;
 388            project_id = state.register_project(request.sender_id, user_id);
 389        };
 390        self.update_user_contacts(user_id).await?;
 391        response.send(proto::RegisterProjectResponse { project_id })?;
 392        Ok(())
 393    }
 394
 395    async fn unregister_project(
 396        self: Arc<Server>,
 397        request: TypedEnvelope<proto::UnregisterProject>,
 398    ) -> Result<()> {
 399        let user_id = {
 400            let mut state = self.store_mut().await;
 401            state.unregister_project(request.payload.project_id, request.sender_id)?;
 402            state.user_id_for_connection(request.sender_id)?
 403        };
 404
 405        self.update_user_contacts(user_id).await?;
 406        Ok(())
 407    }
 408
 409    async fn share_project(
 410        self: Arc<Server>,
 411        request: TypedEnvelope<proto::ShareProject>,
 412        response: Response<proto::ShareProject>,
 413    ) -> Result<()> {
 414        let user_id = {
 415            let mut state = self.store_mut().await;
 416            state.share_project(request.payload.project_id, request.sender_id)?;
 417            state.user_id_for_connection(request.sender_id)?
 418        };
 419        self.update_user_contacts(user_id).await?;
 420        response.send(proto::Ack {})?;
 421        Ok(())
 422    }
 423
 424    async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
 425        let contacts = self.app_state.db.get_contacts(user_id).await?;
 426        let store = self.store().await;
 427        let updated_contact = store.contact_for_user(user_id, false);
 428        for contact in contacts {
 429            if let db::Contact::Accepted {
 430                user_id: contact_user_id,
 431                ..
 432            } = contact
 433            {
 434                for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
 435                    self.peer
 436                        .send(
 437                            contact_conn_id,
 438                            proto::UpdateContacts {
 439                                contacts: vec![updated_contact.clone()],
 440                                remove_contacts: Default::default(),
 441                                incoming_requests: Default::default(),
 442                                remove_incoming_requests: Default::default(),
 443                                outgoing_requests: Default::default(),
 444                                remove_outgoing_requests: Default::default(),
 445                            },
 446                        )
 447                        .trace_err();
 448                }
 449            }
 450        }
 451        Ok(())
 452    }
 453
 454    async fn unshare_project(
 455        self: Arc<Server>,
 456        request: TypedEnvelope<proto::UnshareProject>,
 457    ) -> Result<()> {
 458        let project_id = request.payload.project_id;
 459        let project;
 460        {
 461            let mut state = self.store_mut().await;
 462            project = state.unshare_project(project_id, request.sender_id)?;
 463            broadcast(request.sender_id, project.connection_ids, |conn_id| {
 464                self.peer
 465                    .send(conn_id, proto::UnshareProject { project_id })
 466            });
 467        }
 468        self.update_user_contacts(project.host_user_id).await?;
 469        Ok(())
 470    }
 471
 472    async fn join_project(
 473        self: Arc<Server>,
 474        request: TypedEnvelope<proto::JoinProject>,
 475        response: Response<proto::JoinProject>,
 476    ) -> Result<()> {
 477        let project_id = request.payload.project_id;
 478        let host_user_id;
 479        let guest_user_id;
 480        {
 481            let state = self.store().await;
 482            host_user_id = state.project(project_id)?.host_user_id;
 483            guest_user_id = state.user_id_for_connection(request.sender_id)?;
 484        };
 485
 486        let has_contact = self
 487            .app_state
 488            .db
 489            .has_contact(guest_user_id, host_user_id)
 490            .await?;
 491        if !has_contact {
 492            return Err(anyhow!("no such project"))?;
 493        }
 494
 495        {
 496            let state = &mut *self.store_mut().await;
 497            let joined = state.join_project(request.sender_id, guest_user_id, project_id)?;
 498            let share = joined.project.share()?;
 499            let peer_count = share.guests.len();
 500            let mut collaborators = Vec::with_capacity(peer_count);
 501            collaborators.push(proto::Collaborator {
 502                peer_id: joined.project.host_connection_id.0,
 503                replica_id: 0,
 504                user_id: joined.project.host_user_id.to_proto(),
 505            });
 506            let worktrees = share
 507                .worktrees
 508                .iter()
 509                .filter_map(|(id, shared_worktree)| {
 510                    let worktree = joined.project.worktrees.get(&id)?;
 511                    Some(proto::Worktree {
 512                        id: *id,
 513                        root_name: worktree.root_name.clone(),
 514                        entries: shared_worktree.entries.values().cloned().collect(),
 515                        diagnostic_summaries: shared_worktree
 516                            .diagnostic_summaries
 517                            .values()
 518                            .cloned()
 519                            .collect(),
 520                        visible: worktree.visible,
 521                        scan_id: shared_worktree.scan_id,
 522                    })
 523                })
 524                .collect();
 525            for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 526                if *peer_conn_id != request.sender_id {
 527                    collaborators.push(proto::Collaborator {
 528                        peer_id: peer_conn_id.0,
 529                        replica_id: *peer_replica_id as u32,
 530                        user_id: peer_user_id.to_proto(),
 531                    });
 532                }
 533            }
 534            broadcast(
 535                request.sender_id,
 536                joined.project.connection_ids(),
 537                |conn_id| {
 538                    self.peer.send(
 539                        conn_id,
 540                        proto::AddProjectCollaborator {
 541                            project_id,
 542                            collaborator: Some(proto::Collaborator {
 543                                peer_id: request.sender_id.0,
 544                                replica_id: joined.replica_id as u32,
 545                                user_id: guest_user_id.to_proto(),
 546                            }),
 547                        },
 548                    )
 549                },
 550            );
 551            response.send(proto::JoinProjectResponse {
 552                worktrees,
 553                replica_id: joined.replica_id as u32,
 554                collaborators,
 555                language_servers: joined.project.language_servers.clone(),
 556            })?;
 557        }
 558        self.update_user_contacts(host_user_id).await?;
 559        Ok(())
 560    }
 561
 562    async fn leave_project(
 563        self: Arc<Server>,
 564        request: TypedEnvelope<proto::LeaveProject>,
 565    ) -> Result<()> {
 566        let sender_id = request.sender_id;
 567        let project_id = request.payload.project_id;
 568        let project;
 569        {
 570            let mut state = self.store_mut().await;
 571            project = state.leave_project(sender_id, project_id)?;
 572            broadcast(sender_id, project.connection_ids, |conn_id| {
 573                self.peer.send(
 574                    conn_id,
 575                    proto::RemoveProjectCollaborator {
 576                        project_id,
 577                        peer_id: sender_id.0,
 578                    },
 579                )
 580            });
 581        }
 582        self.update_user_contacts(project.host_user_id).await?;
 583        Ok(())
 584    }
 585
 586    async fn register_worktree(
 587        self: Arc<Server>,
 588        request: TypedEnvelope<proto::RegisterWorktree>,
 589        response: Response<proto::RegisterWorktree>,
 590    ) -> Result<()> {
 591        let host_user_id;
 592        {
 593            let mut state = self.store_mut().await;
 594            host_user_id = state.user_id_for_connection(request.sender_id)?;
 595
 596            let guest_connection_ids = state
 597                .read_project(request.payload.project_id, request.sender_id)?
 598                .guest_connection_ids();
 599            state.register_worktree(
 600                request.payload.project_id,
 601                request.payload.worktree_id,
 602                request.sender_id,
 603                Worktree {
 604                    root_name: request.payload.root_name.clone(),
 605                    visible: request.payload.visible,
 606                },
 607            )?;
 608
 609            broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 610                self.peer
 611                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 612            });
 613        }
 614        self.update_user_contacts(host_user_id).await?;
 615        response.send(proto::Ack {})?;
 616        Ok(())
 617    }
 618
 619    async fn unregister_worktree(
 620        self: Arc<Server>,
 621        request: TypedEnvelope<proto::UnregisterWorktree>,
 622    ) -> Result<()> {
 623        let host_user_id;
 624        let project_id = request.payload.project_id;
 625        let worktree_id = request.payload.worktree_id;
 626        {
 627            let mut state = self.store_mut().await;
 628            let (_, guest_connection_ids) =
 629                state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 630            host_user_id = state.user_id_for_connection(request.sender_id)?;
 631            broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 632                self.peer.send(
 633                    conn_id,
 634                    proto::UnregisterWorktree {
 635                        project_id,
 636                        worktree_id,
 637                    },
 638                )
 639            });
 640        }
 641        self.update_user_contacts(host_user_id).await?;
 642        Ok(())
 643    }
 644
 645    async fn update_worktree(
 646        self: Arc<Server>,
 647        request: TypedEnvelope<proto::UpdateWorktree>,
 648        response: Response<proto::UpdateWorktree>,
 649    ) -> Result<()> {
 650        let connection_ids = self.store_mut().await.update_worktree(
 651            request.sender_id,
 652            request.payload.project_id,
 653            request.payload.worktree_id,
 654            &request.payload.removed_entries,
 655            &request.payload.updated_entries,
 656            request.payload.scan_id,
 657        )?;
 658
 659        broadcast(request.sender_id, connection_ids, |connection_id| {
 660            self.peer
 661                .forward_send(request.sender_id, connection_id, request.payload.clone())
 662        });
 663        response.send(proto::Ack {})?;
 664        Ok(())
 665    }
 666
 667    async fn update_diagnostic_summary(
 668        self: Arc<Server>,
 669        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 670    ) -> Result<()> {
 671        let summary = request
 672            .payload
 673            .summary
 674            .clone()
 675            .ok_or_else(|| anyhow!("invalid summary"))?;
 676        let receiver_ids = self.store_mut().await.update_diagnostic_summary(
 677            request.payload.project_id,
 678            request.payload.worktree_id,
 679            request.sender_id,
 680            summary,
 681        )?;
 682
 683        broadcast(request.sender_id, receiver_ids, |connection_id| {
 684            self.peer
 685                .forward_send(request.sender_id, connection_id, request.payload.clone())
 686        });
 687        Ok(())
 688    }
 689
 690    async fn start_language_server(
 691        self: Arc<Server>,
 692        request: TypedEnvelope<proto::StartLanguageServer>,
 693    ) -> Result<()> {
 694        let receiver_ids = self.store_mut().await.start_language_server(
 695            request.payload.project_id,
 696            request.sender_id,
 697            request
 698                .payload
 699                .server
 700                .clone()
 701                .ok_or_else(|| anyhow!("invalid language server"))?,
 702        )?;
 703        broadcast(request.sender_id, receiver_ids, |connection_id| {
 704            self.peer
 705                .forward_send(request.sender_id, connection_id, request.payload.clone())
 706        });
 707        Ok(())
 708    }
 709
 710    async fn update_language_server(
 711        self: Arc<Server>,
 712        request: TypedEnvelope<proto::UpdateLanguageServer>,
 713    ) -> Result<()> {
 714        let receiver_ids = self
 715            .store()
 716            .await
 717            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 718        broadcast(request.sender_id, receiver_ids, |connection_id| {
 719            self.peer
 720                .forward_send(request.sender_id, connection_id, request.payload.clone())
 721        });
 722        Ok(())
 723    }
 724
 725    async fn forward_project_request<T>(
 726        self: Arc<Server>,
 727        request: TypedEnvelope<T>,
 728        response: Response<T>,
 729    ) -> Result<()>
 730    where
 731        T: EntityMessage + RequestMessage,
 732    {
 733        let host_connection_id = self
 734            .store()
 735            .await
 736            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 737            .host_connection_id;
 738
 739        response.send(
 740            self.peer
 741                .forward_request(request.sender_id, host_connection_id, request.payload)
 742                .await?,
 743        )?;
 744        Ok(())
 745    }
 746
 747    async fn save_buffer(
 748        self: Arc<Server>,
 749        request: TypedEnvelope<proto::SaveBuffer>,
 750        response: Response<proto::SaveBuffer>,
 751    ) -> Result<()> {
 752        let host = self
 753            .store()
 754            .await
 755            .read_project(request.payload.project_id, request.sender_id)?
 756            .host_connection_id;
 757        let response_payload = self
 758            .peer
 759            .forward_request(request.sender_id, host, request.payload.clone())
 760            .await?;
 761
 762        let mut guests = self
 763            .store()
 764            .await
 765            .read_project(request.payload.project_id, request.sender_id)?
 766            .connection_ids();
 767        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 768        broadcast(host, guests, |conn_id| {
 769            self.peer
 770                .forward_send(host, conn_id, response_payload.clone())
 771        });
 772        response.send(response_payload)?;
 773        Ok(())
 774    }
 775
 776    async fn update_buffer(
 777        self: Arc<Server>,
 778        request: TypedEnvelope<proto::UpdateBuffer>,
 779        response: Response<proto::UpdateBuffer>,
 780    ) -> Result<()> {
 781        let receiver_ids = self
 782            .store()
 783            .await
 784            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 785        broadcast(request.sender_id, receiver_ids, |connection_id| {
 786            self.peer
 787                .forward_send(request.sender_id, connection_id, request.payload.clone())
 788        });
 789        response.send(proto::Ack {})?;
 790        Ok(())
 791    }
 792
 793    async fn update_buffer_file(
 794        self: Arc<Server>,
 795        request: TypedEnvelope<proto::UpdateBufferFile>,
 796    ) -> Result<()> {
 797        let receiver_ids = self
 798            .store()
 799            .await
 800            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 801        broadcast(request.sender_id, receiver_ids, |connection_id| {
 802            self.peer
 803                .forward_send(request.sender_id, connection_id, request.payload.clone())
 804        });
 805        Ok(())
 806    }
 807
 808    async fn buffer_reloaded(
 809        self: Arc<Server>,
 810        request: TypedEnvelope<proto::BufferReloaded>,
 811    ) -> Result<()> {
 812        let receiver_ids = self
 813            .store()
 814            .await
 815            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 816        broadcast(request.sender_id, receiver_ids, |connection_id| {
 817            self.peer
 818                .forward_send(request.sender_id, connection_id, request.payload.clone())
 819        });
 820        Ok(())
 821    }
 822
 823    async fn buffer_saved(
 824        self: Arc<Server>,
 825        request: TypedEnvelope<proto::BufferSaved>,
 826    ) -> Result<()> {
 827        let receiver_ids = self
 828            .store()
 829            .await
 830            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 831        broadcast(request.sender_id, receiver_ids, |connection_id| {
 832            self.peer
 833                .forward_send(request.sender_id, connection_id, request.payload.clone())
 834        });
 835        Ok(())
 836    }
 837
 838    async fn follow(
 839        self: Arc<Self>,
 840        request: TypedEnvelope<proto::Follow>,
 841        response: Response<proto::Follow>,
 842    ) -> Result<()> {
 843        let leader_id = ConnectionId(request.payload.leader_id);
 844        let follower_id = request.sender_id;
 845        if !self
 846            .store()
 847            .await
 848            .project_connection_ids(request.payload.project_id, follower_id)?
 849            .contains(&leader_id)
 850        {
 851            Err(anyhow!("no such peer"))?;
 852        }
 853        let mut response_payload = self
 854            .peer
 855            .forward_request(request.sender_id, leader_id, request.payload)
 856            .await?;
 857        response_payload
 858            .views
 859            .retain(|view| view.leader_id != Some(follower_id.0));
 860        response.send(response_payload)?;
 861        Ok(())
 862    }
 863
 864    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 865        let leader_id = ConnectionId(request.payload.leader_id);
 866        if !self
 867            .store()
 868            .await
 869            .project_connection_ids(request.payload.project_id, request.sender_id)?
 870            .contains(&leader_id)
 871        {
 872            Err(anyhow!("no such peer"))?;
 873        }
 874        self.peer
 875            .forward_send(request.sender_id, leader_id, request.payload)?;
 876        Ok(())
 877    }
 878
 879    async fn update_followers(
 880        self: Arc<Self>,
 881        request: TypedEnvelope<proto::UpdateFollowers>,
 882    ) -> Result<()> {
 883        let connection_ids = self
 884            .store()
 885            .await
 886            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 887        let leader_id = request
 888            .payload
 889            .variant
 890            .as_ref()
 891            .and_then(|variant| match variant {
 892                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 893                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 894                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 895            });
 896        for follower_id in &request.payload.follower_ids {
 897            let follower_id = ConnectionId(*follower_id);
 898            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 899                self.peer
 900                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 901            }
 902        }
 903        Ok(())
 904    }
 905
 906    async fn get_channels(
 907        self: Arc<Server>,
 908        request: TypedEnvelope<proto::GetChannels>,
 909        response: Response<proto::GetChannels>,
 910    ) -> Result<()> {
 911        let user_id = self
 912            .store()
 913            .await
 914            .user_id_for_connection(request.sender_id)?;
 915        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 916        response.send(proto::GetChannelsResponse {
 917            channels: channels
 918                .into_iter()
 919                .map(|chan| proto::Channel {
 920                    id: chan.id.to_proto(),
 921                    name: chan.name,
 922                })
 923                .collect(),
 924        })?;
 925        Ok(())
 926    }
 927
 928    async fn get_users(
 929        self: Arc<Server>,
 930        request: TypedEnvelope<proto::GetUsers>,
 931        response: Response<proto::GetUsers>,
 932    ) -> Result<()> {
 933        let user_ids = request
 934            .payload
 935            .user_ids
 936            .into_iter()
 937            .map(UserId::from_proto)
 938            .collect();
 939        let users = self
 940            .app_state
 941            .db
 942            .get_users_by_ids(user_ids)
 943            .await?
 944            .into_iter()
 945            .map(|user| proto::User {
 946                id: user.id.to_proto(),
 947                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 948                github_login: user.github_login,
 949            })
 950            .collect();
 951        response.send(proto::UsersResponse { users })?;
 952        Ok(())
 953    }
 954
 955    async fn fuzzy_search_users(
 956        self: Arc<Server>,
 957        request: TypedEnvelope<proto::FuzzySearchUsers>,
 958        response: Response<proto::FuzzySearchUsers>,
 959    ) -> Result<()> {
 960        let user_id = self
 961            .store()
 962            .await
 963            .user_id_for_connection(request.sender_id)?;
 964        let query = request.payload.query;
 965        let db = &self.app_state.db;
 966        let users = match query.len() {
 967            0 => vec![],
 968            1 | 2 => db
 969                .get_user_by_github_login(&query)
 970                .await?
 971                .into_iter()
 972                .collect(),
 973            _ => db.fuzzy_search_users(&query, 10).await?,
 974        };
 975        let users = users
 976            .into_iter()
 977            .filter(|user| user.id != user_id)
 978            .map(|user| proto::User {
 979                id: user.id.to_proto(),
 980                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 981                github_login: user.github_login,
 982            })
 983            .collect();
 984        response.send(proto::UsersResponse { users })?;
 985        Ok(())
 986    }
 987
 988    async fn request_contact(
 989        self: Arc<Server>,
 990        request: TypedEnvelope<proto::RequestContact>,
 991        response: Response<proto::RequestContact>,
 992    ) -> Result<()> {
 993        let requester_id = self
 994            .store()
 995            .await
 996            .user_id_for_connection(request.sender_id)?;
 997        let responder_id = UserId::from_proto(request.payload.responder_id);
 998        if requester_id == responder_id {
 999            return Err(anyhow!("cannot add yourself as a contact"))?;
1000        }
1001
1002        self.app_state
1003            .db
1004            .send_contact_request(requester_id, responder_id)
1005            .await?;
1006
1007        // Update outgoing contact requests of requester
1008        let mut update = proto::UpdateContacts::default();
1009        update.outgoing_requests.push(responder_id.to_proto());
1010        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1011            self.peer.send(connection_id, update.clone())?;
1012        }
1013
1014        // Update incoming contact requests of responder
1015        let mut update = proto::UpdateContacts::default();
1016        update
1017            .incoming_requests
1018            .push(proto::IncomingContactRequest {
1019                requester_id: requester_id.to_proto(),
1020                should_notify: true,
1021            });
1022        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1023            self.peer.send(connection_id, update.clone())?;
1024        }
1025
1026        response.send(proto::Ack {})?;
1027        Ok(())
1028    }
1029
1030    async fn respond_to_contact_request(
1031        self: Arc<Server>,
1032        request: TypedEnvelope<proto::RespondToContactRequest>,
1033        response: Response<proto::RespondToContactRequest>,
1034    ) -> Result<()> {
1035        let responder_id = self
1036            .store()
1037            .await
1038            .user_id_for_connection(request.sender_id)?;
1039        let requester_id = UserId::from_proto(request.payload.requester_id);
1040        if request.payload.response == proto::ContactRequestResponse::Dismiss as i32 {
1041            self.app_state
1042                .db
1043                .dismiss_contact_notification(responder_id, requester_id)
1044                .await?;
1045        } else {
1046            let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1047            self.app_state
1048                .db
1049                .respond_to_contact_request(responder_id, requester_id, accept)
1050                .await?;
1051
1052            let store = self.store().await;
1053            // Update responder with new contact
1054            let mut update = proto::UpdateContacts::default();
1055            if accept {
1056                update
1057                    .contacts
1058                    .push(store.contact_for_user(requester_id, false));
1059            }
1060            update
1061                .remove_incoming_requests
1062                .push(requester_id.to_proto());
1063            for connection_id in store.connection_ids_for_user(responder_id) {
1064                self.peer.send(connection_id, update.clone())?;
1065            }
1066
1067            // Update requester with new contact
1068            let mut update = proto::UpdateContacts::default();
1069            if accept {
1070                update
1071                    .contacts
1072                    .push(store.contact_for_user(responder_id, true));
1073            }
1074            update
1075                .remove_outgoing_requests
1076                .push(responder_id.to_proto());
1077            for connection_id in store.connection_ids_for_user(requester_id) {
1078                self.peer.send(connection_id, update.clone())?;
1079            }
1080        }
1081
1082        response.send(proto::Ack {})?;
1083        Ok(())
1084    }
1085
1086    async fn remove_contact(
1087        self: Arc<Server>,
1088        request: TypedEnvelope<proto::RemoveContact>,
1089        response: Response<proto::RemoveContact>,
1090    ) -> Result<()> {
1091        let requester_id = self
1092            .store()
1093            .await
1094            .user_id_for_connection(request.sender_id)?;
1095        let responder_id = UserId::from_proto(request.payload.user_id);
1096        self.app_state
1097            .db
1098            .remove_contact(requester_id, responder_id)
1099            .await?;
1100
1101        // Update outgoing contact requests of requester
1102        let mut update = proto::UpdateContacts::default();
1103        update
1104            .remove_outgoing_requests
1105            .push(responder_id.to_proto());
1106        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
1107            self.peer.send(connection_id, update.clone())?;
1108        }
1109
1110        // Update incoming contact requests of responder
1111        let mut update = proto::UpdateContacts::default();
1112        update
1113            .remove_incoming_requests
1114            .push(requester_id.to_proto());
1115        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1116            self.peer.send(connection_id, update.clone())?;
1117        }
1118
1119        response.send(proto::Ack {})?;
1120        Ok(())
1121    }
1122
1123    async fn join_channel(
1124        self: Arc<Self>,
1125        request: TypedEnvelope<proto::JoinChannel>,
1126        response: Response<proto::JoinChannel>,
1127    ) -> Result<()> {
1128        let user_id = self
1129            .store()
1130            .await
1131            .user_id_for_connection(request.sender_id)?;
1132        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1133        if !self
1134            .app_state
1135            .db
1136            .can_user_access_channel(user_id, channel_id)
1137            .await?
1138        {
1139            Err(anyhow!("access denied"))?;
1140        }
1141
1142        self.store_mut()
1143            .await
1144            .join_channel(request.sender_id, channel_id);
1145        let messages = self
1146            .app_state
1147            .db
1148            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1149            .await?
1150            .into_iter()
1151            .map(|msg| proto::ChannelMessage {
1152                id: msg.id.to_proto(),
1153                body: msg.body,
1154                timestamp: msg.sent_at.unix_timestamp() as u64,
1155                sender_id: msg.sender_id.to_proto(),
1156                nonce: Some(msg.nonce.as_u128().into()),
1157            })
1158            .collect::<Vec<_>>();
1159        response.send(proto::JoinChannelResponse {
1160            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1161            messages,
1162        })?;
1163        Ok(())
1164    }
1165
1166    async fn leave_channel(
1167        self: Arc<Self>,
1168        request: TypedEnvelope<proto::LeaveChannel>,
1169    ) -> Result<()> {
1170        let user_id = self
1171            .store()
1172            .await
1173            .user_id_for_connection(request.sender_id)?;
1174        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1175        if !self
1176            .app_state
1177            .db
1178            .can_user_access_channel(user_id, channel_id)
1179            .await?
1180        {
1181            Err(anyhow!("access denied"))?;
1182        }
1183
1184        self.store_mut()
1185            .await
1186            .leave_channel(request.sender_id, channel_id);
1187
1188        Ok(())
1189    }
1190
1191    async fn send_channel_message(
1192        self: Arc<Self>,
1193        request: TypedEnvelope<proto::SendChannelMessage>,
1194        response: Response<proto::SendChannelMessage>,
1195    ) -> Result<()> {
1196        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1197        let user_id;
1198        let connection_ids;
1199        {
1200            let state = self.store().await;
1201            user_id = state.user_id_for_connection(request.sender_id)?;
1202            connection_ids = state.channel_connection_ids(channel_id)?;
1203        }
1204
1205        // Validate the message body.
1206        let body = request.payload.body.trim().to_string();
1207        if body.len() > MAX_MESSAGE_LEN {
1208            return Err(anyhow!("message is too long"))?;
1209        }
1210        if body.is_empty() {
1211            return Err(anyhow!("message can't be blank"))?;
1212        }
1213
1214        let timestamp = OffsetDateTime::now_utc();
1215        let nonce = request
1216            .payload
1217            .nonce
1218            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1219
1220        let message_id = self
1221            .app_state
1222            .db
1223            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1224            .await?
1225            .to_proto();
1226        let message = proto::ChannelMessage {
1227            sender_id: user_id.to_proto(),
1228            id: message_id,
1229            body,
1230            timestamp: timestamp.unix_timestamp() as u64,
1231            nonce: Some(nonce),
1232        };
1233        broadcast(request.sender_id, connection_ids, |conn_id| {
1234            self.peer.send(
1235                conn_id,
1236                proto::ChannelMessageSent {
1237                    channel_id: channel_id.to_proto(),
1238                    message: Some(message.clone()),
1239                },
1240            )
1241        });
1242        response.send(proto::SendChannelMessageResponse {
1243            message: Some(message),
1244        })?;
1245        Ok(())
1246    }
1247
1248    async fn get_channel_messages(
1249        self: Arc<Self>,
1250        request: TypedEnvelope<proto::GetChannelMessages>,
1251        response: Response<proto::GetChannelMessages>,
1252    ) -> Result<()> {
1253        let user_id = self
1254            .store()
1255            .await
1256            .user_id_for_connection(request.sender_id)?;
1257        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1258        if !self
1259            .app_state
1260            .db
1261            .can_user_access_channel(user_id, channel_id)
1262            .await?
1263        {
1264            Err(anyhow!("access denied"))?;
1265        }
1266
1267        let messages = self
1268            .app_state
1269            .db
1270            .get_channel_messages(
1271                channel_id,
1272                MESSAGE_COUNT_PER_PAGE,
1273                Some(MessageId::from_proto(request.payload.before_message_id)),
1274            )
1275            .await?
1276            .into_iter()
1277            .map(|msg| proto::ChannelMessage {
1278                id: msg.id.to_proto(),
1279                body: msg.body,
1280                timestamp: msg.sent_at.unix_timestamp() as u64,
1281                sender_id: msg.sender_id.to_proto(),
1282                nonce: Some(msg.nonce.as_u128().into()),
1283            })
1284            .collect::<Vec<_>>();
1285        response.send(proto::GetChannelMessagesResponse {
1286            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1287            messages,
1288        })?;
1289        Ok(())
1290    }
1291
1292    async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1293        #[cfg(test)]
1294        tokio::task::yield_now().await;
1295        let guard = self.store.read().await;
1296        #[cfg(test)]
1297        tokio::task::yield_now().await;
1298        StoreReadGuard {
1299            guard,
1300            _not_send: PhantomData,
1301        }
1302    }
1303
1304    async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1305        #[cfg(test)]
1306        tokio::task::yield_now().await;
1307        let guard = self.store.write().await;
1308        #[cfg(test)]
1309        tokio::task::yield_now().await;
1310        StoreWriteGuard {
1311            guard,
1312            _not_send: PhantomData,
1313        }
1314    }
1315}
1316
1317impl<'a> Deref for StoreReadGuard<'a> {
1318    type Target = Store;
1319
1320    fn deref(&self) -> &Self::Target {
1321        &*self.guard
1322    }
1323}
1324
1325impl<'a> Deref for StoreWriteGuard<'a> {
1326    type Target = Store;
1327
1328    fn deref(&self) -> &Self::Target {
1329        &*self.guard
1330    }
1331}
1332
1333impl<'a> DerefMut for StoreWriteGuard<'a> {
1334    fn deref_mut(&mut self) -> &mut Self::Target {
1335        &mut *self.guard
1336    }
1337}
1338
1339impl<'a> Drop for StoreWriteGuard<'a> {
1340    fn drop(&mut self) {
1341        #[cfg(test)]
1342        self.check_invariants();
1343
1344        let metrics = self.metrics();
1345        tracing::info!(
1346            connections = metrics.connections,
1347            registered_projects = metrics.registered_projects,
1348            shared_projects = metrics.shared_projects,
1349            "metrics"
1350        );
1351    }
1352}
1353
1354impl Executor for RealExecutor {
1355    type Sleep = Sleep;
1356
1357    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1358        tokio::task::spawn(future);
1359    }
1360
1361    fn sleep(&self, duration: Duration) -> Self::Sleep {
1362        tokio::time::sleep(duration)
1363    }
1364}
1365
1366fn broadcast<F>(
1367    sender_id: ConnectionId,
1368    receiver_ids: impl IntoIterator<Item = ConnectionId>,
1369    mut f: F,
1370) where
1371    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1372{
1373    for receiver_id in receiver_ids {
1374        if receiver_id != sender_id {
1375            f(receiver_id).trace_err();
1376        }
1377    }
1378}
1379
1380lazy_static! {
1381    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1382}
1383
1384pub struct ProtocolVersion(u32);
1385
1386impl Header for ProtocolVersion {
1387    fn name() -> &'static HeaderName {
1388        &ZED_PROTOCOL_VERSION
1389    }
1390
1391    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1392    where
1393        Self: Sized,
1394        I: Iterator<Item = &'i axum::http::HeaderValue>,
1395    {
1396        let version = values
1397            .next()
1398            .ok_or_else(|| axum::headers::Error::invalid())?
1399            .to_str()
1400            .map_err(|_| axum::headers::Error::invalid())?
1401            .parse()
1402            .map_err(|_| axum::headers::Error::invalid())?;
1403        Ok(Self(version))
1404    }
1405
1406    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1407        values.extend([self.0.to_string().parse().unwrap()]);
1408    }
1409}
1410
1411pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1412    let server = Server::new(app_state.clone(), None);
1413    Router::new()
1414        .route("/rpc", get(handle_websocket_request))
1415        .layer(
1416            ServiceBuilder::new()
1417                .layer(Extension(app_state))
1418                .layer(middleware::from_fn(auth::validate_header))
1419                .layer(Extension(server)),
1420        )
1421}
1422
1423pub async fn handle_websocket_request(
1424    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1425    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1426    Extension(server): Extension<Arc<Server>>,
1427    Extension(user): Extension<User>,
1428    ws: WebSocketUpgrade,
1429) -> axum::response::Response {
1430    if protocol_version != rpc::PROTOCOL_VERSION {
1431        return (
1432            StatusCode::UPGRADE_REQUIRED,
1433            "client must be upgraded".to_string(),
1434        )
1435            .into_response();
1436    }
1437    let socket_address = socket_address.to_string();
1438    ws.on_upgrade(move |socket| {
1439        use util::ResultExt;
1440        let socket = socket
1441            .map_ok(to_tungstenite_message)
1442            .err_into()
1443            .with(|message| async move { Ok(to_axum_message(message)) });
1444        let connection = Connection::new(Box::pin(socket));
1445        async move {
1446            server
1447                .handle_connection(connection, socket_address, user, None, RealExecutor)
1448                .await
1449                .log_err();
1450        }
1451    })
1452}
1453
1454fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1455    match message {
1456        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1457        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1458        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1459        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1460        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1461            code: frame.code.into(),
1462            reason: frame.reason,
1463        })),
1464    }
1465}
1466
1467fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1468    match message {
1469        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1470        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1471        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1472        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1473        AxumMessage::Close(frame) => {
1474            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1475                code: frame.code.into(),
1476                reason: frame.reason,
1477            }))
1478        }
1479    }
1480}
1481
1482pub trait ResultExt {
1483    type Ok;
1484
1485    fn trace_err(self) -> Option<Self::Ok>;
1486}
1487
1488impl<T, E> ResultExt for Result<T, E>
1489where
1490    E: std::fmt::Debug,
1491{
1492    type Ok = T;
1493
1494    fn trace_err(self) -> Option<T> {
1495        match self {
1496            Ok(value) => Some(value),
1497            Err(error) => {
1498                tracing::error!("{:?}", error);
1499                None
1500            }
1501        }
1502    }
1503}
1504
1505#[cfg(test)]
1506mod tests {
1507    use super::*;
1508    use crate::{
1509        db::{tests::TestDb, UserId},
1510        AppState,
1511    };
1512    use ::rpc::Peer;
1513    use client::{
1514        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1515        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1516    };
1517    use collections::{BTreeMap, HashSet};
1518    use editor::{
1519        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1520        ToOffset, ToggleCodeActions, Undo,
1521    };
1522    use gpui::{
1523        executor::{self, Deterministic},
1524        geometry::vector::vec2f,
1525        ModelHandle, TestAppContext, ViewHandle,
1526    };
1527    use language::{
1528        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1529        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1530    };
1531    use lsp::{self, FakeLanguageServer};
1532    use parking_lot::Mutex;
1533    use project::{
1534        fs::{FakeFs, Fs as _},
1535        search::SearchQuery,
1536        worktree::WorktreeHandle,
1537        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1538    };
1539    use rand::prelude::*;
1540    use rpc::PeerId;
1541    use serde_json::json;
1542    use settings::Settings;
1543    use sqlx::types::time::OffsetDateTime;
1544    use std::{
1545        env,
1546        ops::Deref,
1547        path::{Path, PathBuf},
1548        rc::Rc,
1549        sync::{
1550            atomic::{AtomicBool, Ordering::SeqCst},
1551            Arc,
1552        },
1553        time::Duration,
1554    };
1555    use theme::ThemeRegistry;
1556    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1557
1558    #[cfg(test)]
1559    #[ctor::ctor]
1560    fn init_logger() {
1561        if std::env::var("RUST_LOG").is_ok() {
1562            env_logger::init();
1563        }
1564    }
1565
1566    #[gpui::test(iterations = 10)]
1567    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1568        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1569        let lang_registry = Arc::new(LanguageRegistry::test());
1570        let fs = FakeFs::new(cx_a.background());
1571        cx_a.foreground().forbid_parking();
1572
1573        // Connect to a server as 2 clients.
1574        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1575        let client_a = server.create_client(cx_a, "user_a").await;
1576        let client_b = server.create_client(cx_b, "user_b").await;
1577        server
1578            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1579            .await;
1580
1581        // Share a project as client A
1582        fs.insert_tree(
1583            "/a",
1584            json!({
1585                "a.txt": "a-contents",
1586                "b.txt": "b-contents",
1587            }),
1588        )
1589        .await;
1590        let project_a = cx_a.update(|cx| {
1591            Project::local(
1592                client_a.clone(),
1593                client_a.user_store.clone(),
1594                lang_registry.clone(),
1595                fs.clone(),
1596                cx,
1597            )
1598        });
1599        let (worktree_a, _) = project_a
1600            .update(cx_a, |p, cx| {
1601                p.find_or_create_local_worktree("/a", true, cx)
1602            })
1603            .await
1604            .unwrap();
1605        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1606        worktree_a
1607            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1608            .await;
1609        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1610        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1611
1612        // Join that project as client B
1613        let project_b = Project::remote(
1614            project_id,
1615            client_b.clone(),
1616            client_b.user_store.clone(),
1617            lang_registry.clone(),
1618            fs.clone(),
1619            &mut cx_b.to_async(),
1620        )
1621        .await
1622        .unwrap();
1623
1624        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1625            assert_eq!(
1626                project
1627                    .collaborators()
1628                    .get(&client_a.peer_id)
1629                    .unwrap()
1630                    .user
1631                    .github_login,
1632                "user_a"
1633            );
1634            project.replica_id()
1635        });
1636        project_a
1637            .condition(&cx_a, |tree, _| {
1638                tree.collaborators()
1639                    .get(&client_b.peer_id)
1640                    .map_or(false, |collaborator| {
1641                        collaborator.replica_id == replica_id_b
1642                            && collaborator.user.github_login == "user_b"
1643                    })
1644            })
1645            .await;
1646
1647        // Open the same file as client B and client A.
1648        let buffer_b = project_b
1649            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1650            .await
1651            .unwrap();
1652        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1653        project_a.read_with(cx_a, |project, cx| {
1654            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1655        });
1656        let buffer_a = project_a
1657            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1658            .await
1659            .unwrap();
1660
1661        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1662
1663        // TODO
1664        // // Create a selection set as client B and see that selection set as client A.
1665        // buffer_a
1666        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1667        //     .await;
1668
1669        // Edit the buffer as client B and see that edit as client A.
1670        editor_b.update(cx_b, |editor, cx| {
1671            editor.handle_input(&Input("ok, ".into()), cx)
1672        });
1673        buffer_a
1674            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1675            .await;
1676
1677        // TODO
1678        // // Remove the selection set as client B, see those selections disappear as client A.
1679        cx_b.update(move |_| drop(editor_b));
1680        // buffer_a
1681        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1682        //     .await;
1683
1684        // Dropping the client B's project removes client B from client A's collaborators.
1685        cx_b.update(move |_| drop(project_b));
1686        project_a
1687            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1688            .await;
1689    }
1690
1691    #[gpui::test(iterations = 10)]
1692    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1693        let lang_registry = Arc::new(LanguageRegistry::test());
1694        let fs = FakeFs::new(cx_a.background());
1695        cx_a.foreground().forbid_parking();
1696
1697        // Connect to a server as 2 clients.
1698        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1699        let client_a = server.create_client(cx_a, "user_a").await;
1700        let client_b = server.create_client(cx_b, "user_b").await;
1701        server
1702            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1703            .await;
1704
1705        // Share a project as client A
1706        fs.insert_tree(
1707            "/a",
1708            json!({
1709                "a.txt": "a-contents",
1710                "b.txt": "b-contents",
1711            }),
1712        )
1713        .await;
1714        let project_a = cx_a.update(|cx| {
1715            Project::local(
1716                client_a.clone(),
1717                client_a.user_store.clone(),
1718                lang_registry.clone(),
1719                fs.clone(),
1720                cx,
1721            )
1722        });
1723        let (worktree_a, _) = project_a
1724            .update(cx_a, |p, cx| {
1725                p.find_or_create_local_worktree("/a", true, cx)
1726            })
1727            .await
1728            .unwrap();
1729        worktree_a
1730            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1731            .await;
1732        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1733        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1734        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1735        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1736
1737        // Join that project as client B
1738        let project_b = Project::remote(
1739            project_id,
1740            client_b.clone(),
1741            client_b.user_store.clone(),
1742            lang_registry.clone(),
1743            fs.clone(),
1744            &mut cx_b.to_async(),
1745        )
1746        .await
1747        .unwrap();
1748        project_b
1749            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1750            .await
1751            .unwrap();
1752
1753        // Unshare the project as client A
1754        project_a.update(cx_a, |project, cx| project.unshare(cx));
1755        project_b
1756            .condition(cx_b, |project, _| project.is_read_only())
1757            .await;
1758        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1759        cx_b.update(|_| {
1760            drop(project_b);
1761        });
1762
1763        // Share the project again and ensure guests can still join.
1764        project_a
1765            .update(cx_a, |project, cx| project.share(cx))
1766            .await
1767            .unwrap();
1768        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1769
1770        let project_b2 = Project::remote(
1771            project_id,
1772            client_b.clone(),
1773            client_b.user_store.clone(),
1774            lang_registry.clone(),
1775            fs.clone(),
1776            &mut cx_b.to_async(),
1777        )
1778        .await
1779        .unwrap();
1780        project_b2
1781            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1782            .await
1783            .unwrap();
1784    }
1785
1786    #[gpui::test(iterations = 10)]
1787    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1788        let lang_registry = Arc::new(LanguageRegistry::test());
1789        let fs = FakeFs::new(cx_a.background());
1790        cx_a.foreground().forbid_parking();
1791
1792        // Connect to a server as 2 clients.
1793        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1794        let client_a = server.create_client(cx_a, "user_a").await;
1795        let client_b = server.create_client(cx_b, "user_b").await;
1796        server
1797            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
1798            .await;
1799
1800        // Share a project as client A
1801        fs.insert_tree(
1802            "/a",
1803            json!({
1804                "a.txt": "a-contents",
1805                "b.txt": "b-contents",
1806            }),
1807        )
1808        .await;
1809        let project_a = cx_a.update(|cx| {
1810            Project::local(
1811                client_a.clone(),
1812                client_a.user_store.clone(),
1813                lang_registry.clone(),
1814                fs.clone(),
1815                cx,
1816            )
1817        });
1818        let (worktree_a, _) = project_a
1819            .update(cx_a, |p, cx| {
1820                p.find_or_create_local_worktree("/a", true, cx)
1821            })
1822            .await
1823            .unwrap();
1824        worktree_a
1825            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1826            .await;
1827        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1828        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1829        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1830        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1831
1832        // Join that project as client B
1833        let project_b = Project::remote(
1834            project_id,
1835            client_b.clone(),
1836            client_b.user_store.clone(),
1837            lang_registry.clone(),
1838            fs.clone(),
1839            &mut cx_b.to_async(),
1840        )
1841        .await
1842        .unwrap();
1843        project_b
1844            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1845            .await
1846            .unwrap();
1847
1848        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1849        server.disconnect_client(client_a.current_user_id(cx_a));
1850        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1851        project_a
1852            .condition(cx_a, |project, _| project.collaborators().is_empty())
1853            .await;
1854        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1855        project_b
1856            .condition(cx_b, |project, _| project.is_read_only())
1857            .await;
1858        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1859        cx_b.update(|_| {
1860            drop(project_b);
1861        });
1862
1863        // Await reconnection
1864        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1865
1866        // Share the project again and ensure guests can still join.
1867        project_a
1868            .update(cx_a, |project, cx| project.share(cx))
1869            .await
1870            .unwrap();
1871        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1872
1873        let project_b2 = Project::remote(
1874            project_id,
1875            client_b.clone(),
1876            client_b.user_store.clone(),
1877            lang_registry.clone(),
1878            fs.clone(),
1879            &mut cx_b.to_async(),
1880        )
1881        .await
1882        .unwrap();
1883        project_b2
1884            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1885            .await
1886            .unwrap();
1887    }
1888
1889    #[gpui::test(iterations = 10)]
1890    async fn test_propagate_saves_and_fs_changes(
1891        cx_a: &mut TestAppContext,
1892        cx_b: &mut TestAppContext,
1893        cx_c: &mut TestAppContext,
1894    ) {
1895        let lang_registry = Arc::new(LanguageRegistry::test());
1896        let fs = FakeFs::new(cx_a.background());
1897        cx_a.foreground().forbid_parking();
1898
1899        // Connect to a server as 3 clients.
1900        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1901        let client_a = server.create_client(cx_a, "user_a").await;
1902        let client_b = server.create_client(cx_b, "user_b").await;
1903        let client_c = server.create_client(cx_c, "user_c").await;
1904        server
1905            .make_contacts(vec![
1906                (&client_a, cx_a),
1907                (&client_b, cx_b),
1908                (&client_c, cx_c),
1909            ])
1910            .await;
1911
1912        // Share a worktree as client A.
1913        fs.insert_tree(
1914            "/a",
1915            json!({
1916                "file1": "",
1917                "file2": ""
1918            }),
1919        )
1920        .await;
1921        let project_a = cx_a.update(|cx| {
1922            Project::local(
1923                client_a.clone(),
1924                client_a.user_store.clone(),
1925                lang_registry.clone(),
1926                fs.clone(),
1927                cx,
1928            )
1929        });
1930        let (worktree_a, _) = project_a
1931            .update(cx_a, |p, cx| {
1932                p.find_or_create_local_worktree("/a", true, cx)
1933            })
1934            .await
1935            .unwrap();
1936        worktree_a
1937            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1938            .await;
1939        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1940        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1941        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1942
1943        // Join that worktree as clients B and C.
1944        let project_b = Project::remote(
1945            project_id,
1946            client_b.clone(),
1947            client_b.user_store.clone(),
1948            lang_registry.clone(),
1949            fs.clone(),
1950            &mut cx_b.to_async(),
1951        )
1952        .await
1953        .unwrap();
1954        let project_c = Project::remote(
1955            project_id,
1956            client_c.clone(),
1957            client_c.user_store.clone(),
1958            lang_registry.clone(),
1959            fs.clone(),
1960            &mut cx_c.to_async(),
1961        )
1962        .await
1963        .unwrap();
1964        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1965        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1966
1967        // Open and edit a buffer as both guests B and C.
1968        let buffer_b = project_b
1969            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1970            .await
1971            .unwrap();
1972        let buffer_c = project_c
1973            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1974            .await
1975            .unwrap();
1976        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1977        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1978
1979        // Open and edit that buffer as the host.
1980        let buffer_a = project_a
1981            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1982            .await
1983            .unwrap();
1984
1985        buffer_a
1986            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1987            .await;
1988        buffer_a.update(cx_a, |buf, cx| {
1989            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1990        });
1991
1992        // Wait for edits to propagate
1993        buffer_a
1994            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1995            .await;
1996        buffer_b
1997            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1998            .await;
1999        buffer_c
2000            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
2001            .await;
2002
2003        // Edit the buffer as the host and concurrently save as guest B.
2004        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
2005        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
2006        save_b.await.unwrap();
2007        assert_eq!(
2008            fs.load("/a/file1".as_ref()).await.unwrap(),
2009            "hi-a, i-am-c, i-am-b, i-am-a"
2010        );
2011        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
2012        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
2013        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
2014
2015        worktree_a.flush_fs_events(cx_a).await;
2016
2017        // Make changes on host's file system, see those changes on guest worktrees.
2018        fs.rename(
2019            "/a/file1".as_ref(),
2020            "/a/file1-renamed".as_ref(),
2021            Default::default(),
2022        )
2023        .await
2024        .unwrap();
2025
2026        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
2027            .await
2028            .unwrap();
2029        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
2030
2031        worktree_a
2032            .condition(&cx_a, |tree, _| {
2033                tree.paths()
2034                    .map(|p| p.to_string_lossy())
2035                    .collect::<Vec<_>>()
2036                    == ["file1-renamed", "file3", "file4"]
2037            })
2038            .await;
2039        worktree_b
2040            .condition(&cx_b, |tree, _| {
2041                tree.paths()
2042                    .map(|p| p.to_string_lossy())
2043                    .collect::<Vec<_>>()
2044                    == ["file1-renamed", "file3", "file4"]
2045            })
2046            .await;
2047        worktree_c
2048            .condition(&cx_c, |tree, _| {
2049                tree.paths()
2050                    .map(|p| p.to_string_lossy())
2051                    .collect::<Vec<_>>()
2052                    == ["file1-renamed", "file3", "file4"]
2053            })
2054            .await;
2055
2056        // Ensure buffer files are updated as well.
2057        buffer_a
2058            .condition(&cx_a, |buf, _| {
2059                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2060            })
2061            .await;
2062        buffer_b
2063            .condition(&cx_b, |buf, _| {
2064                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2065            })
2066            .await;
2067        buffer_c
2068            .condition(&cx_c, |buf, _| {
2069                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2070            })
2071            .await;
2072    }
2073
2074    #[gpui::test(iterations = 10)]
2075    async fn test_fs_operations(
2076        executor: Arc<Deterministic>,
2077        cx_a: &mut TestAppContext,
2078        cx_b: &mut TestAppContext,
2079    ) {
2080        executor.forbid_parking();
2081        let fs = FakeFs::new(cx_a.background());
2082
2083        // Connect to a server as 2 clients.
2084        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2085        let mut client_a = server.create_client(cx_a, "user_a").await;
2086        let mut client_b = server.create_client(cx_b, "user_b").await;
2087        server
2088            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2089            .await;
2090
2091        // Share a project as client A
2092        fs.insert_tree(
2093            "/dir",
2094            json!({
2095                "a.txt": "a-contents",
2096                "b.txt": "b-contents",
2097            }),
2098        )
2099        .await;
2100
2101        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2102        let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
2103        project_a
2104            .update(cx_a, |project, cx| project.share(cx))
2105            .await
2106            .unwrap();
2107
2108        let project_b = client_b.build_remote_project(project_id, cx_b).await;
2109
2110        let worktree_a =
2111            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2112        let worktree_b =
2113            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2114
2115        let entry = project_b
2116            .update(cx_b, |project, cx| {
2117                project
2118                    .create_entry((worktree_id, "c.txt"), false, cx)
2119                    .unwrap()
2120            })
2121            .await
2122            .unwrap();
2123        worktree_a.read_with(cx_a, |worktree, _| {
2124            assert_eq!(
2125                worktree
2126                    .paths()
2127                    .map(|p| p.to_string_lossy())
2128                    .collect::<Vec<_>>(),
2129                ["a.txt", "b.txt", "c.txt"]
2130            );
2131        });
2132        worktree_b.read_with(cx_b, |worktree, _| {
2133            assert_eq!(
2134                worktree
2135                    .paths()
2136                    .map(|p| p.to_string_lossy())
2137                    .collect::<Vec<_>>(),
2138                ["a.txt", "b.txt", "c.txt"]
2139            );
2140        });
2141
2142        project_b
2143            .update(cx_b, |project, cx| {
2144                project.rename_entry(entry.id, Path::new("d.txt"), cx)
2145            })
2146            .unwrap()
2147            .await
2148            .unwrap();
2149        worktree_a.read_with(cx_a, |worktree, _| {
2150            assert_eq!(
2151                worktree
2152                    .paths()
2153                    .map(|p| p.to_string_lossy())
2154                    .collect::<Vec<_>>(),
2155                ["a.txt", "b.txt", "d.txt"]
2156            );
2157        });
2158        worktree_b.read_with(cx_b, |worktree, _| {
2159            assert_eq!(
2160                worktree
2161                    .paths()
2162                    .map(|p| p.to_string_lossy())
2163                    .collect::<Vec<_>>(),
2164                ["a.txt", "b.txt", "d.txt"]
2165            );
2166        });
2167
2168        let dir_entry = project_b
2169            .update(cx_b, |project, cx| {
2170                project
2171                    .create_entry((worktree_id, "DIR"), true, cx)
2172                    .unwrap()
2173            })
2174            .await
2175            .unwrap();
2176        worktree_a.read_with(cx_a, |worktree, _| {
2177            assert_eq!(
2178                worktree
2179                    .paths()
2180                    .map(|p| p.to_string_lossy())
2181                    .collect::<Vec<_>>(),
2182                ["DIR", "a.txt", "b.txt", "d.txt"]
2183            );
2184        });
2185        worktree_b.read_with(cx_b, |worktree, _| {
2186            assert_eq!(
2187                worktree
2188                    .paths()
2189                    .map(|p| p.to_string_lossy())
2190                    .collect::<Vec<_>>(),
2191                ["DIR", "a.txt", "b.txt", "d.txt"]
2192            );
2193        });
2194
2195        project_b
2196            .update(cx_b, |project, cx| {
2197                project.delete_entry(dir_entry.id, cx).unwrap()
2198            })
2199            .await
2200            .unwrap();
2201        worktree_a.read_with(cx_a, |worktree, _| {
2202            assert_eq!(
2203                worktree
2204                    .paths()
2205                    .map(|p| p.to_string_lossy())
2206                    .collect::<Vec<_>>(),
2207                ["a.txt", "b.txt", "d.txt"]
2208            );
2209        });
2210        worktree_b.read_with(cx_b, |worktree, _| {
2211            assert_eq!(
2212                worktree
2213                    .paths()
2214                    .map(|p| p.to_string_lossy())
2215                    .collect::<Vec<_>>(),
2216                ["a.txt", "b.txt", "d.txt"]
2217            );
2218        });
2219
2220        project_b
2221            .update(cx_b, |project, cx| {
2222                project.delete_entry(entry.id, cx).unwrap()
2223            })
2224            .await
2225            .unwrap();
2226        worktree_a.read_with(cx_a, |worktree, _| {
2227            assert_eq!(
2228                worktree
2229                    .paths()
2230                    .map(|p| p.to_string_lossy())
2231                    .collect::<Vec<_>>(),
2232                ["a.txt", "b.txt"]
2233            );
2234        });
2235        worktree_b.read_with(cx_b, |worktree, _| {
2236            assert_eq!(
2237                worktree
2238                    .paths()
2239                    .map(|p| p.to_string_lossy())
2240                    .collect::<Vec<_>>(),
2241                ["a.txt", "b.txt"]
2242            );
2243        });
2244    }
2245
2246    #[gpui::test(iterations = 10)]
2247    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2248        cx_a.foreground().forbid_parking();
2249        let lang_registry = Arc::new(LanguageRegistry::test());
2250        let fs = FakeFs::new(cx_a.background());
2251
2252        // Connect to a server as 2 clients.
2253        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2254        let client_a = server.create_client(cx_a, "user_a").await;
2255        let client_b = server.create_client(cx_b, "user_b").await;
2256        server
2257            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2258            .await;
2259
2260        // Share a project as client A
2261        fs.insert_tree(
2262            "/dir",
2263            json!({
2264                "a.txt": "a-contents",
2265            }),
2266        )
2267        .await;
2268
2269        let project_a = cx_a.update(|cx| {
2270            Project::local(
2271                client_a.clone(),
2272                client_a.user_store.clone(),
2273                lang_registry.clone(),
2274                fs.clone(),
2275                cx,
2276            )
2277        });
2278        let (worktree_a, _) = project_a
2279            .update(cx_a, |p, cx| {
2280                p.find_or_create_local_worktree("/dir", true, cx)
2281            })
2282            .await
2283            .unwrap();
2284        worktree_a
2285            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2286            .await;
2287        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2288        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2289        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2290
2291        // Join that project as client B
2292        let project_b = Project::remote(
2293            project_id,
2294            client_b.clone(),
2295            client_b.user_store.clone(),
2296            lang_registry.clone(),
2297            fs.clone(),
2298            &mut cx_b.to_async(),
2299        )
2300        .await
2301        .unwrap();
2302
2303        // Open a buffer as client B
2304        let buffer_b = project_b
2305            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2306            .await
2307            .unwrap();
2308
2309        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2310        buffer_b.read_with(cx_b, |buf, _| {
2311            assert!(buf.is_dirty());
2312            assert!(!buf.has_conflict());
2313        });
2314
2315        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2316        buffer_b
2317            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2318            .await;
2319        buffer_b.read_with(cx_b, |buf, _| {
2320            assert!(!buf.has_conflict());
2321        });
2322
2323        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2324        buffer_b.read_with(cx_b, |buf, _| {
2325            assert!(buf.is_dirty());
2326            assert!(!buf.has_conflict());
2327        });
2328    }
2329
2330    #[gpui::test(iterations = 10)]
2331    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2332        cx_a.foreground().forbid_parking();
2333        let lang_registry = Arc::new(LanguageRegistry::test());
2334        let fs = FakeFs::new(cx_a.background());
2335
2336        // Connect to a server as 2 clients.
2337        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2338        let client_a = server.create_client(cx_a, "user_a").await;
2339        let client_b = server.create_client(cx_b, "user_b").await;
2340        server
2341            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2342            .await;
2343
2344        // Share a project as client A
2345        fs.insert_tree(
2346            "/dir",
2347            json!({
2348                "a.txt": "a-contents",
2349            }),
2350        )
2351        .await;
2352
2353        let project_a = cx_a.update(|cx| {
2354            Project::local(
2355                client_a.clone(),
2356                client_a.user_store.clone(),
2357                lang_registry.clone(),
2358                fs.clone(),
2359                cx,
2360            )
2361        });
2362        let (worktree_a, _) = project_a
2363            .update(cx_a, |p, cx| {
2364                p.find_or_create_local_worktree("/dir", true, cx)
2365            })
2366            .await
2367            .unwrap();
2368        worktree_a
2369            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2370            .await;
2371        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2372        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2373        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2374
2375        // Join that project as client B
2376        let project_b = Project::remote(
2377            project_id,
2378            client_b.clone(),
2379            client_b.user_store.clone(),
2380            lang_registry.clone(),
2381            fs.clone(),
2382            &mut cx_b.to_async(),
2383        )
2384        .await
2385        .unwrap();
2386        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2387
2388        // Open a buffer as client B
2389        let buffer_b = project_b
2390            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2391            .await
2392            .unwrap();
2393        buffer_b.read_with(cx_b, |buf, _| {
2394            assert!(!buf.is_dirty());
2395            assert!(!buf.has_conflict());
2396        });
2397
2398        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2399            .await
2400            .unwrap();
2401        buffer_b
2402            .condition(&cx_b, |buf, _| {
2403                buf.text() == "new contents" && !buf.is_dirty()
2404            })
2405            .await;
2406        buffer_b.read_with(cx_b, |buf, _| {
2407            assert!(!buf.has_conflict());
2408        });
2409    }
2410
2411    #[gpui::test(iterations = 10)]
2412    async fn test_editing_while_guest_opens_buffer(
2413        cx_a: &mut TestAppContext,
2414        cx_b: &mut TestAppContext,
2415    ) {
2416        cx_a.foreground().forbid_parking();
2417        let lang_registry = Arc::new(LanguageRegistry::test());
2418        let fs = FakeFs::new(cx_a.background());
2419
2420        // Connect to a server as 2 clients.
2421        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2422        let client_a = server.create_client(cx_a, "user_a").await;
2423        let client_b = server.create_client(cx_b, "user_b").await;
2424        server
2425            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2426            .await;
2427
2428        // Share a project as client A
2429        fs.insert_tree(
2430            "/dir",
2431            json!({
2432                "a.txt": "a-contents",
2433            }),
2434        )
2435        .await;
2436        let project_a = cx_a.update(|cx| {
2437            Project::local(
2438                client_a.clone(),
2439                client_a.user_store.clone(),
2440                lang_registry.clone(),
2441                fs.clone(),
2442                cx,
2443            )
2444        });
2445        let (worktree_a, _) = project_a
2446            .update(cx_a, |p, cx| {
2447                p.find_or_create_local_worktree("/dir", true, cx)
2448            })
2449            .await
2450            .unwrap();
2451        worktree_a
2452            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2453            .await;
2454        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2455        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2456        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2457
2458        // Join that project as client B
2459        let project_b = Project::remote(
2460            project_id,
2461            client_b.clone(),
2462            client_b.user_store.clone(),
2463            lang_registry.clone(),
2464            fs.clone(),
2465            &mut cx_b.to_async(),
2466        )
2467        .await
2468        .unwrap();
2469
2470        // Open a buffer as client A
2471        let buffer_a = project_a
2472            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2473            .await
2474            .unwrap();
2475
2476        // Start opening the same buffer as client B
2477        let buffer_b = cx_b
2478            .background()
2479            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2480
2481        // Edit the buffer as client A while client B is still opening it.
2482        cx_b.background().simulate_random_delay().await;
2483        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2484        cx_b.background().simulate_random_delay().await;
2485        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2486
2487        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2488        let buffer_b = buffer_b.await.unwrap();
2489        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2490    }
2491
2492    #[gpui::test(iterations = 10)]
2493    async fn test_leaving_worktree_while_opening_buffer(
2494        cx_a: &mut TestAppContext,
2495        cx_b: &mut TestAppContext,
2496    ) {
2497        cx_a.foreground().forbid_parking();
2498        let lang_registry = Arc::new(LanguageRegistry::test());
2499        let fs = FakeFs::new(cx_a.background());
2500
2501        // Connect to a server as 2 clients.
2502        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2503        let client_a = server.create_client(cx_a, "user_a").await;
2504        let client_b = server.create_client(cx_b, "user_b").await;
2505        server
2506            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2507            .await;
2508
2509        // Share a project as client A
2510        fs.insert_tree(
2511            "/dir",
2512            json!({
2513                "a.txt": "a-contents",
2514            }),
2515        )
2516        .await;
2517        let project_a = cx_a.update(|cx| {
2518            Project::local(
2519                client_a.clone(),
2520                client_a.user_store.clone(),
2521                lang_registry.clone(),
2522                fs.clone(),
2523                cx,
2524            )
2525        });
2526        let (worktree_a, _) = project_a
2527            .update(cx_a, |p, cx| {
2528                p.find_or_create_local_worktree("/dir", true, cx)
2529            })
2530            .await
2531            .unwrap();
2532        worktree_a
2533            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2534            .await;
2535        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2536        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2537        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2538
2539        // Join that project as client B
2540        let project_b = Project::remote(
2541            project_id,
2542            client_b.clone(),
2543            client_b.user_store.clone(),
2544            lang_registry.clone(),
2545            fs.clone(),
2546            &mut cx_b.to_async(),
2547        )
2548        .await
2549        .unwrap();
2550
2551        // See that a guest has joined as client A.
2552        project_a
2553            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2554            .await;
2555
2556        // Begin opening a buffer as client B, but leave the project before the open completes.
2557        let buffer_b = cx_b
2558            .background()
2559            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2560        cx_b.update(|_| drop(project_b));
2561        drop(buffer_b);
2562
2563        // See that the guest has left.
2564        project_a
2565            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2566            .await;
2567    }
2568
2569    #[gpui::test(iterations = 10)]
2570    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2571        cx_a.foreground().forbid_parking();
2572        let lang_registry = Arc::new(LanguageRegistry::test());
2573        let fs = FakeFs::new(cx_a.background());
2574
2575        // Connect to a server as 2 clients.
2576        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2577        let client_a = server.create_client(cx_a, "user_a").await;
2578        let client_b = server.create_client(cx_b, "user_b").await;
2579        server
2580            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2581            .await;
2582
2583        // Share a project as client A
2584        fs.insert_tree(
2585            "/a",
2586            json!({
2587                "a.txt": "a-contents",
2588                "b.txt": "b-contents",
2589            }),
2590        )
2591        .await;
2592        let project_a = cx_a.update(|cx| {
2593            Project::local(
2594                client_a.clone(),
2595                client_a.user_store.clone(),
2596                lang_registry.clone(),
2597                fs.clone(),
2598                cx,
2599            )
2600        });
2601        let (worktree_a, _) = project_a
2602            .update(cx_a, |p, cx| {
2603                p.find_or_create_local_worktree("/a", true, cx)
2604            })
2605            .await
2606            .unwrap();
2607        worktree_a
2608            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2609            .await;
2610        let project_id = project_a
2611            .update(cx_a, |project, _| project.next_remote_id())
2612            .await;
2613        project_a
2614            .update(cx_a, |project, cx| project.share(cx))
2615            .await
2616            .unwrap();
2617
2618        // Join that project as client B
2619        let _project_b = Project::remote(
2620            project_id,
2621            client_b.clone(),
2622            client_b.user_store.clone(),
2623            lang_registry.clone(),
2624            fs.clone(),
2625            &mut cx_b.to_async(),
2626        )
2627        .await
2628        .unwrap();
2629
2630        // Client A sees that a guest has joined.
2631        project_a
2632            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2633            .await;
2634
2635        // Drop client B's connection and ensure client A observes client B leaving the project.
2636        client_b.disconnect(&cx_b.to_async()).unwrap();
2637        project_a
2638            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2639            .await;
2640
2641        // Rejoin the project as client B
2642        let _project_b = Project::remote(
2643            project_id,
2644            client_b.clone(),
2645            client_b.user_store.clone(),
2646            lang_registry.clone(),
2647            fs.clone(),
2648            &mut cx_b.to_async(),
2649        )
2650        .await
2651        .unwrap();
2652
2653        // Client A sees that a guest has re-joined.
2654        project_a
2655            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2656            .await;
2657
2658        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2659        client_b.wait_for_current_user(cx_b).await;
2660        server.disconnect_client(client_b.current_user_id(cx_b));
2661        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2662        project_a
2663            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2664            .await;
2665    }
2666
2667    #[gpui::test(iterations = 10)]
2668    async fn test_collaborating_with_diagnostics(
2669        cx_a: &mut TestAppContext,
2670        cx_b: &mut TestAppContext,
2671    ) {
2672        cx_a.foreground().forbid_parking();
2673        let lang_registry = Arc::new(LanguageRegistry::test());
2674        let fs = FakeFs::new(cx_a.background());
2675
2676        // Set up a fake language server.
2677        let mut language = Language::new(
2678            LanguageConfig {
2679                name: "Rust".into(),
2680                path_suffixes: vec!["rs".to_string()],
2681                ..Default::default()
2682            },
2683            Some(tree_sitter_rust::language()),
2684        );
2685        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2686        lang_registry.add(Arc::new(language));
2687
2688        // Connect to a server as 2 clients.
2689        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2690        let client_a = server.create_client(cx_a, "user_a").await;
2691        let client_b = server.create_client(cx_b, "user_b").await;
2692        server
2693            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2694            .await;
2695
2696        // Share a project as client A
2697        fs.insert_tree(
2698            "/a",
2699            json!({
2700                "a.rs": "let one = two",
2701                "other.rs": "",
2702            }),
2703        )
2704        .await;
2705        let project_a = cx_a.update(|cx| {
2706            Project::local(
2707                client_a.clone(),
2708                client_a.user_store.clone(),
2709                lang_registry.clone(),
2710                fs.clone(),
2711                cx,
2712            )
2713        });
2714        let (worktree_a, _) = project_a
2715            .update(cx_a, |p, cx| {
2716                p.find_or_create_local_worktree("/a", true, cx)
2717            })
2718            .await
2719            .unwrap();
2720        worktree_a
2721            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2722            .await;
2723        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2724        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2725        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2726
2727        // Cause the language server to start.
2728        let _ = cx_a
2729            .background()
2730            .spawn(project_a.update(cx_a, |project, cx| {
2731                project.open_buffer(
2732                    ProjectPath {
2733                        worktree_id,
2734                        path: Path::new("other.rs").into(),
2735                    },
2736                    cx,
2737                )
2738            }))
2739            .await
2740            .unwrap();
2741
2742        // Simulate a language server reporting errors for a file.
2743        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2744        fake_language_server
2745            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2746            .await;
2747        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2748            lsp::PublishDiagnosticsParams {
2749                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2750                version: None,
2751                diagnostics: vec![lsp::Diagnostic {
2752                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2753                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2754                    message: "message 1".to_string(),
2755                    ..Default::default()
2756                }],
2757            },
2758        );
2759
2760        // Wait for server to see the diagnostics update.
2761        server
2762            .condition(|store| {
2763                let worktree = store
2764                    .project(project_id)
2765                    .unwrap()
2766                    .share
2767                    .as_ref()
2768                    .unwrap()
2769                    .worktrees
2770                    .get(&worktree_id.to_proto())
2771                    .unwrap();
2772
2773                !worktree.diagnostic_summaries.is_empty()
2774            })
2775            .await;
2776
2777        // Join the worktree as client B.
2778        let project_b = Project::remote(
2779            project_id,
2780            client_b.clone(),
2781            client_b.user_store.clone(),
2782            lang_registry.clone(),
2783            fs.clone(),
2784            &mut cx_b.to_async(),
2785        )
2786        .await
2787        .unwrap();
2788
2789        project_b.read_with(cx_b, |project, cx| {
2790            assert_eq!(
2791                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2792                &[(
2793                    ProjectPath {
2794                        worktree_id,
2795                        path: Arc::from(Path::new("a.rs")),
2796                    },
2797                    DiagnosticSummary {
2798                        error_count: 1,
2799                        warning_count: 0,
2800                        ..Default::default()
2801                    },
2802                )]
2803            )
2804        });
2805
2806        // Simulate a language server reporting more errors for a file.
2807        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2808            lsp::PublishDiagnosticsParams {
2809                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2810                version: None,
2811                diagnostics: vec![
2812                    lsp::Diagnostic {
2813                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2814                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2815                        message: "message 1".to_string(),
2816                        ..Default::default()
2817                    },
2818                    lsp::Diagnostic {
2819                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2820                        range: lsp::Range::new(
2821                            lsp::Position::new(0, 10),
2822                            lsp::Position::new(0, 13),
2823                        ),
2824                        message: "message 2".to_string(),
2825                        ..Default::default()
2826                    },
2827                ],
2828            },
2829        );
2830
2831        // Client b gets the updated summaries
2832        project_b
2833            .condition(&cx_b, |project, cx| {
2834                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2835                    == &[(
2836                        ProjectPath {
2837                            worktree_id,
2838                            path: Arc::from(Path::new("a.rs")),
2839                        },
2840                        DiagnosticSummary {
2841                            error_count: 1,
2842                            warning_count: 1,
2843                            ..Default::default()
2844                        },
2845                    )]
2846            })
2847            .await;
2848
2849        // Open the file with the errors on client B. They should be present.
2850        let buffer_b = cx_b
2851            .background()
2852            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2853            .await
2854            .unwrap();
2855
2856        buffer_b.read_with(cx_b, |buffer, _| {
2857            assert_eq!(
2858                buffer
2859                    .snapshot()
2860                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2861                    .map(|entry| entry)
2862                    .collect::<Vec<_>>(),
2863                &[
2864                    DiagnosticEntry {
2865                        range: Point::new(0, 4)..Point::new(0, 7),
2866                        diagnostic: Diagnostic {
2867                            group_id: 0,
2868                            message: "message 1".to_string(),
2869                            severity: lsp::DiagnosticSeverity::ERROR,
2870                            is_primary: true,
2871                            ..Default::default()
2872                        }
2873                    },
2874                    DiagnosticEntry {
2875                        range: Point::new(0, 10)..Point::new(0, 13),
2876                        diagnostic: Diagnostic {
2877                            group_id: 1,
2878                            severity: lsp::DiagnosticSeverity::WARNING,
2879                            message: "message 2".to_string(),
2880                            is_primary: true,
2881                            ..Default::default()
2882                        }
2883                    }
2884                ]
2885            );
2886        });
2887
2888        // Simulate a language server reporting no errors for a file.
2889        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2890            lsp::PublishDiagnosticsParams {
2891                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2892                version: None,
2893                diagnostics: vec![],
2894            },
2895        );
2896        project_a
2897            .condition(cx_a, |project, cx| {
2898                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2899            })
2900            .await;
2901        project_b
2902            .condition(cx_b, |project, cx| {
2903                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2904            })
2905            .await;
2906    }
2907
2908    #[gpui::test(iterations = 10)]
2909    async fn test_collaborating_with_completion(
2910        cx_a: &mut TestAppContext,
2911        cx_b: &mut TestAppContext,
2912    ) {
2913        cx_a.foreground().forbid_parking();
2914        let lang_registry = Arc::new(LanguageRegistry::test());
2915        let fs = FakeFs::new(cx_a.background());
2916
2917        // Set up a fake language server.
2918        let mut language = Language::new(
2919            LanguageConfig {
2920                name: "Rust".into(),
2921                path_suffixes: vec!["rs".to_string()],
2922                ..Default::default()
2923            },
2924            Some(tree_sitter_rust::language()),
2925        );
2926        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2927            capabilities: lsp::ServerCapabilities {
2928                completion_provider: Some(lsp::CompletionOptions {
2929                    trigger_characters: Some(vec![".".to_string()]),
2930                    ..Default::default()
2931                }),
2932                ..Default::default()
2933            },
2934            ..Default::default()
2935        });
2936        lang_registry.add(Arc::new(language));
2937
2938        // Connect to a server as 2 clients.
2939        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2940        let client_a = server.create_client(cx_a, "user_a").await;
2941        let client_b = server.create_client(cx_b, "user_b").await;
2942        server
2943            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
2944            .await;
2945
2946        // Share a project as client A
2947        fs.insert_tree(
2948            "/a",
2949            json!({
2950                "main.rs": "fn main() { a }",
2951                "other.rs": "",
2952            }),
2953        )
2954        .await;
2955        let project_a = cx_a.update(|cx| {
2956            Project::local(
2957                client_a.clone(),
2958                client_a.user_store.clone(),
2959                lang_registry.clone(),
2960                fs.clone(),
2961                cx,
2962            )
2963        });
2964        let (worktree_a, _) = project_a
2965            .update(cx_a, |p, cx| {
2966                p.find_or_create_local_worktree("/a", true, cx)
2967            })
2968            .await
2969            .unwrap();
2970        worktree_a
2971            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2972            .await;
2973        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2974        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2975        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2976
2977        // Join the worktree as client B.
2978        let project_b = Project::remote(
2979            project_id,
2980            client_b.clone(),
2981            client_b.user_store.clone(),
2982            lang_registry.clone(),
2983            fs.clone(),
2984            &mut cx_b.to_async(),
2985        )
2986        .await
2987        .unwrap();
2988
2989        // Open a file in an editor as the guest.
2990        let buffer_b = project_b
2991            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2992            .await
2993            .unwrap();
2994        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2995        let editor_b = cx_b.add_view(window_b, |cx| {
2996            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2997        });
2998
2999        let fake_language_server = fake_language_servers.next().await.unwrap();
3000        buffer_b
3001            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
3002            .await;
3003
3004        // Type a completion trigger character as the guest.
3005        editor_b.update(cx_b, |editor, cx| {
3006            editor.select_ranges([13..13], None, cx);
3007            editor.handle_input(&Input(".".into()), cx);
3008            cx.focus(&editor_b);
3009        });
3010
3011        // Receive a completion request as the host's language server.
3012        // Return some completions from the host's language server.
3013        cx_a.foreground().start_waiting();
3014        fake_language_server
3015            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
3016                assert_eq!(
3017                    params.text_document_position.text_document.uri,
3018                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3019                );
3020                assert_eq!(
3021                    params.text_document_position.position,
3022                    lsp::Position::new(0, 14),
3023                );
3024
3025                Ok(Some(lsp::CompletionResponse::Array(vec![
3026                    lsp::CompletionItem {
3027                        label: "first_method(…)".into(),
3028                        detail: Some("fn(&mut self, B) -> C".into()),
3029                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3030                            new_text: "first_method($1)".to_string(),
3031                            range: lsp::Range::new(
3032                                lsp::Position::new(0, 14),
3033                                lsp::Position::new(0, 14),
3034                            ),
3035                        })),
3036                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3037                        ..Default::default()
3038                    },
3039                    lsp::CompletionItem {
3040                        label: "second_method(…)".into(),
3041                        detail: Some("fn(&mut self, C) -> D<E>".into()),
3042                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3043                            new_text: "second_method()".to_string(),
3044                            range: lsp::Range::new(
3045                                lsp::Position::new(0, 14),
3046                                lsp::Position::new(0, 14),
3047                            ),
3048                        })),
3049                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3050                        ..Default::default()
3051                    },
3052                ])))
3053            })
3054            .next()
3055            .await
3056            .unwrap();
3057        cx_a.foreground().finish_waiting();
3058
3059        // Open the buffer on the host.
3060        let buffer_a = project_a
3061            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
3062            .await
3063            .unwrap();
3064        buffer_a
3065            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
3066            .await;
3067
3068        // Confirm a completion on the guest.
3069        editor_b
3070            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3071            .await;
3072        editor_b.update(cx_b, |editor, cx| {
3073            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
3074            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3075        });
3076
3077        // Return a resolved completion from the host's language server.
3078        // The resolved completion has an additional text edit.
3079        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3080            |params, _| async move {
3081                assert_eq!(params.label, "first_method(…)");
3082                Ok(lsp::CompletionItem {
3083                    label: "first_method(…)".into(),
3084                    detail: Some("fn(&mut self, B) -> C".into()),
3085                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3086                        new_text: "first_method($1)".to_string(),
3087                        range: lsp::Range::new(
3088                            lsp::Position::new(0, 14),
3089                            lsp::Position::new(0, 14),
3090                        ),
3091                    })),
3092                    additional_text_edits: Some(vec![lsp::TextEdit {
3093                        new_text: "use d::SomeTrait;\n".to_string(),
3094                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3095                    }]),
3096                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3097                    ..Default::default()
3098                })
3099            },
3100        );
3101
3102        // The additional edit is applied.
3103        buffer_a
3104            .condition(&cx_a, |buffer, _| {
3105                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3106            })
3107            .await;
3108        buffer_b
3109            .condition(&cx_b, |buffer, _| {
3110                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3111            })
3112            .await;
3113    }
3114
3115    #[gpui::test(iterations = 10)]
3116    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3117        cx_a.foreground().forbid_parking();
3118        let lang_registry = Arc::new(LanguageRegistry::test());
3119        let fs = FakeFs::new(cx_a.background());
3120
3121        // Connect to a server as 2 clients.
3122        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3123        let client_a = server.create_client(cx_a, "user_a").await;
3124        let client_b = server.create_client(cx_b, "user_b").await;
3125        server
3126            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3127            .await;
3128
3129        // Share a project as client A
3130        fs.insert_tree(
3131            "/a",
3132            json!({
3133                "a.rs": "let one = 1;",
3134            }),
3135        )
3136        .await;
3137        let project_a = cx_a.update(|cx| {
3138            Project::local(
3139                client_a.clone(),
3140                client_a.user_store.clone(),
3141                lang_registry.clone(),
3142                fs.clone(),
3143                cx,
3144            )
3145        });
3146        let (worktree_a, _) = project_a
3147            .update(cx_a, |p, cx| {
3148                p.find_or_create_local_worktree("/a", true, cx)
3149            })
3150            .await
3151            .unwrap();
3152        worktree_a
3153            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3154            .await;
3155        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3156        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3157        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3158        let buffer_a = project_a
3159            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3160            .await
3161            .unwrap();
3162
3163        // Join the worktree as client B.
3164        let project_b = Project::remote(
3165            project_id,
3166            client_b.clone(),
3167            client_b.user_store.clone(),
3168            lang_registry.clone(),
3169            fs.clone(),
3170            &mut cx_b.to_async(),
3171        )
3172        .await
3173        .unwrap();
3174
3175        let buffer_b = cx_b
3176            .background()
3177            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3178            .await
3179            .unwrap();
3180        buffer_b.update(cx_b, |buffer, cx| {
3181            buffer.edit([(4..7, "six")], cx);
3182            buffer.edit([(10..11, "6")], cx);
3183            assert_eq!(buffer.text(), "let six = 6;");
3184            assert!(buffer.is_dirty());
3185            assert!(!buffer.has_conflict());
3186        });
3187        buffer_a
3188            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3189            .await;
3190
3191        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3192            .await
3193            .unwrap();
3194        buffer_a
3195            .condition(cx_a, |buffer, _| buffer.has_conflict())
3196            .await;
3197        buffer_b
3198            .condition(cx_b, |buffer, _| buffer.has_conflict())
3199            .await;
3200
3201        project_b
3202            .update(cx_b, |project, cx| {
3203                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3204            })
3205            .await
3206            .unwrap();
3207        buffer_a.read_with(cx_a, |buffer, _| {
3208            assert_eq!(buffer.text(), "let seven = 7;");
3209            assert!(!buffer.is_dirty());
3210            assert!(!buffer.has_conflict());
3211        });
3212        buffer_b.read_with(cx_b, |buffer, _| {
3213            assert_eq!(buffer.text(), "let seven = 7;");
3214            assert!(!buffer.is_dirty());
3215            assert!(!buffer.has_conflict());
3216        });
3217
3218        buffer_a.update(cx_a, |buffer, cx| {
3219            // Undoing on the host is a no-op when the reload was initiated by the guest.
3220            buffer.undo(cx);
3221            assert_eq!(buffer.text(), "let seven = 7;");
3222            assert!(!buffer.is_dirty());
3223            assert!(!buffer.has_conflict());
3224        });
3225        buffer_b.update(cx_b, |buffer, cx| {
3226            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3227            buffer.undo(cx);
3228            assert_eq!(buffer.text(), "let six = 6;");
3229            assert!(buffer.is_dirty());
3230            assert!(!buffer.has_conflict());
3231        });
3232    }
3233
3234    #[gpui::test(iterations = 10)]
3235    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3236        cx_a.foreground().forbid_parking();
3237        let lang_registry = Arc::new(LanguageRegistry::test());
3238        let fs = FakeFs::new(cx_a.background());
3239
3240        // Set up a fake language server.
3241        let mut language = Language::new(
3242            LanguageConfig {
3243                name: "Rust".into(),
3244                path_suffixes: vec!["rs".to_string()],
3245                ..Default::default()
3246            },
3247            Some(tree_sitter_rust::language()),
3248        );
3249        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3250        lang_registry.add(Arc::new(language));
3251
3252        // Connect to a server as 2 clients.
3253        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3254        let client_a = server.create_client(cx_a, "user_a").await;
3255        let client_b = server.create_client(cx_b, "user_b").await;
3256        server
3257            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3258            .await;
3259
3260        // Share a project as client A
3261        fs.insert_tree(
3262            "/a",
3263            json!({
3264                "a.rs": "let one = two",
3265            }),
3266        )
3267        .await;
3268        let project_a = cx_a.update(|cx| {
3269            Project::local(
3270                client_a.clone(),
3271                client_a.user_store.clone(),
3272                lang_registry.clone(),
3273                fs.clone(),
3274                cx,
3275            )
3276        });
3277        let (worktree_a, _) = project_a
3278            .update(cx_a, |p, cx| {
3279                p.find_or_create_local_worktree("/a", true, cx)
3280            })
3281            .await
3282            .unwrap();
3283        worktree_a
3284            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3285            .await;
3286        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3287        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3288        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3289
3290        // Join the worktree as client B.
3291        let project_b = Project::remote(
3292            project_id,
3293            client_b.clone(),
3294            client_b.user_store.clone(),
3295            lang_registry.clone(),
3296            fs.clone(),
3297            &mut cx_b.to_async(),
3298        )
3299        .await
3300        .unwrap();
3301
3302        let buffer_b = cx_b
3303            .background()
3304            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3305            .await
3306            .unwrap();
3307
3308        let fake_language_server = fake_language_servers.next().await.unwrap();
3309        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3310            Ok(Some(vec![
3311                lsp::TextEdit {
3312                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3313                    new_text: "h".to_string(),
3314                },
3315                lsp::TextEdit {
3316                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3317                    new_text: "y".to_string(),
3318                },
3319            ]))
3320        });
3321
3322        project_b
3323            .update(cx_b, |project, cx| {
3324                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3325            })
3326            .await
3327            .unwrap();
3328        assert_eq!(
3329            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3330            "let honey = two"
3331        );
3332    }
3333
3334    #[gpui::test(iterations = 10)]
3335    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3336        cx_a.foreground().forbid_parking();
3337        let lang_registry = Arc::new(LanguageRegistry::test());
3338        let fs = FakeFs::new(cx_a.background());
3339        fs.insert_tree(
3340            "/root-1",
3341            json!({
3342                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3343            }),
3344        )
3345        .await;
3346        fs.insert_tree(
3347            "/root-2",
3348            json!({
3349                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3350            }),
3351        )
3352        .await;
3353
3354        // Set up a fake language server.
3355        let mut language = Language::new(
3356            LanguageConfig {
3357                name: "Rust".into(),
3358                path_suffixes: vec!["rs".to_string()],
3359                ..Default::default()
3360            },
3361            Some(tree_sitter_rust::language()),
3362        );
3363        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3364        lang_registry.add(Arc::new(language));
3365
3366        // Connect to a server as 2 clients.
3367        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3368        let client_a = server.create_client(cx_a, "user_a").await;
3369        let client_b = server.create_client(cx_b, "user_b").await;
3370        server
3371            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3372            .await;
3373
3374        // Share a project as client A
3375        let project_a = cx_a.update(|cx| {
3376            Project::local(
3377                client_a.clone(),
3378                client_a.user_store.clone(),
3379                lang_registry.clone(),
3380                fs.clone(),
3381                cx,
3382            )
3383        });
3384        let (worktree_a, _) = project_a
3385            .update(cx_a, |p, cx| {
3386                p.find_or_create_local_worktree("/root-1", true, cx)
3387            })
3388            .await
3389            .unwrap();
3390        worktree_a
3391            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3392            .await;
3393        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3394        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3395        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3396
3397        // Join the worktree as client B.
3398        let project_b = Project::remote(
3399            project_id,
3400            client_b.clone(),
3401            client_b.user_store.clone(),
3402            lang_registry.clone(),
3403            fs.clone(),
3404            &mut cx_b.to_async(),
3405        )
3406        .await
3407        .unwrap();
3408
3409        // Open the file on client B.
3410        let buffer_b = cx_b
3411            .background()
3412            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3413            .await
3414            .unwrap();
3415
3416        // Request the definition of a symbol as the guest.
3417        let fake_language_server = fake_language_servers.next().await.unwrap();
3418        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3419            |_, _| async move {
3420                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3421                    lsp::Location::new(
3422                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3423                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3424                    ),
3425                )))
3426            },
3427        );
3428
3429        let definitions_1 = project_b
3430            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3431            .await
3432            .unwrap();
3433        cx_b.read(|cx| {
3434            assert_eq!(definitions_1.len(), 1);
3435            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3436            let target_buffer = definitions_1[0].buffer.read(cx);
3437            assert_eq!(
3438                target_buffer.text(),
3439                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3440            );
3441            assert_eq!(
3442                definitions_1[0].range.to_point(target_buffer),
3443                Point::new(0, 6)..Point::new(0, 9)
3444            );
3445        });
3446
3447        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3448        // the previous call to `definition`.
3449        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3450            |_, _| async move {
3451                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3452                    lsp::Location::new(
3453                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3454                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3455                    ),
3456                )))
3457            },
3458        );
3459
3460        let definitions_2 = project_b
3461            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3462            .await
3463            .unwrap();
3464        cx_b.read(|cx| {
3465            assert_eq!(definitions_2.len(), 1);
3466            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3467            let target_buffer = definitions_2[0].buffer.read(cx);
3468            assert_eq!(
3469                target_buffer.text(),
3470                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3471            );
3472            assert_eq!(
3473                definitions_2[0].range.to_point(target_buffer),
3474                Point::new(1, 6)..Point::new(1, 11)
3475            );
3476        });
3477        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3478    }
3479
3480    #[gpui::test(iterations = 10)]
3481    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3482        cx_a.foreground().forbid_parking();
3483        let lang_registry = Arc::new(LanguageRegistry::test());
3484        let fs = FakeFs::new(cx_a.background());
3485        fs.insert_tree(
3486            "/root-1",
3487            json!({
3488                "one.rs": "const ONE: usize = 1;",
3489                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3490            }),
3491        )
3492        .await;
3493        fs.insert_tree(
3494            "/root-2",
3495            json!({
3496                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3497            }),
3498        )
3499        .await;
3500
3501        // Set up a fake language server.
3502        let mut language = Language::new(
3503            LanguageConfig {
3504                name: "Rust".into(),
3505                path_suffixes: vec!["rs".to_string()],
3506                ..Default::default()
3507            },
3508            Some(tree_sitter_rust::language()),
3509        );
3510        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3511        lang_registry.add(Arc::new(language));
3512
3513        // Connect to a server as 2 clients.
3514        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3515        let client_a = server.create_client(cx_a, "user_a").await;
3516        let client_b = server.create_client(cx_b, "user_b").await;
3517        server
3518            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3519            .await;
3520
3521        // Share a project as client A
3522        let project_a = cx_a.update(|cx| {
3523            Project::local(
3524                client_a.clone(),
3525                client_a.user_store.clone(),
3526                lang_registry.clone(),
3527                fs.clone(),
3528                cx,
3529            )
3530        });
3531        let (worktree_a, _) = project_a
3532            .update(cx_a, |p, cx| {
3533                p.find_or_create_local_worktree("/root-1", true, cx)
3534            })
3535            .await
3536            .unwrap();
3537        worktree_a
3538            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3539            .await;
3540        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3541        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3542        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3543
3544        // Join the worktree as client B.
3545        let project_b = Project::remote(
3546            project_id,
3547            client_b.clone(),
3548            client_b.user_store.clone(),
3549            lang_registry.clone(),
3550            fs.clone(),
3551            &mut cx_b.to_async(),
3552        )
3553        .await
3554        .unwrap();
3555
3556        // Open the file on client B.
3557        let buffer_b = cx_b
3558            .background()
3559            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3560            .await
3561            .unwrap();
3562
3563        // Request references to a symbol as the guest.
3564        let fake_language_server = fake_language_servers.next().await.unwrap();
3565        fake_language_server.handle_request::<lsp::request::References, _, _>(
3566            |params, _| async move {
3567                assert_eq!(
3568                    params.text_document_position.text_document.uri.as_str(),
3569                    "file:///root-1/one.rs"
3570                );
3571                Ok(Some(vec![
3572                    lsp::Location {
3573                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3574                        range: lsp::Range::new(
3575                            lsp::Position::new(0, 24),
3576                            lsp::Position::new(0, 27),
3577                        ),
3578                    },
3579                    lsp::Location {
3580                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3581                        range: lsp::Range::new(
3582                            lsp::Position::new(0, 35),
3583                            lsp::Position::new(0, 38),
3584                        ),
3585                    },
3586                    lsp::Location {
3587                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3588                        range: lsp::Range::new(
3589                            lsp::Position::new(0, 37),
3590                            lsp::Position::new(0, 40),
3591                        ),
3592                    },
3593                ]))
3594            },
3595        );
3596
3597        let references = project_b
3598            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3599            .await
3600            .unwrap();
3601        cx_b.read(|cx| {
3602            assert_eq!(references.len(), 3);
3603            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3604
3605            let two_buffer = references[0].buffer.read(cx);
3606            let three_buffer = references[2].buffer.read(cx);
3607            assert_eq!(
3608                two_buffer.file().unwrap().path().as_ref(),
3609                Path::new("two.rs")
3610            );
3611            assert_eq!(references[1].buffer, references[0].buffer);
3612            assert_eq!(
3613                three_buffer.file().unwrap().full_path(cx),
3614                Path::new("three.rs")
3615            );
3616
3617            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3618            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3619            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3620        });
3621    }
3622
3623    #[gpui::test(iterations = 10)]
3624    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3625        cx_a.foreground().forbid_parking();
3626        let lang_registry = Arc::new(LanguageRegistry::test());
3627        let fs = FakeFs::new(cx_a.background());
3628        fs.insert_tree(
3629            "/root-1",
3630            json!({
3631                "a": "hello world",
3632                "b": "goodnight moon",
3633                "c": "a world of goo",
3634                "d": "world champion of clown world",
3635            }),
3636        )
3637        .await;
3638        fs.insert_tree(
3639            "/root-2",
3640            json!({
3641                "e": "disney world is fun",
3642            }),
3643        )
3644        .await;
3645
3646        // Connect to a server as 2 clients.
3647        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3648        let client_a = server.create_client(cx_a, "user_a").await;
3649        let client_b = server.create_client(cx_b, "user_b").await;
3650        server
3651            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3652            .await;
3653
3654        // Share a project as client A
3655        let project_a = cx_a.update(|cx| {
3656            Project::local(
3657                client_a.clone(),
3658                client_a.user_store.clone(),
3659                lang_registry.clone(),
3660                fs.clone(),
3661                cx,
3662            )
3663        });
3664        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3665
3666        let (worktree_1, _) = project_a
3667            .update(cx_a, |p, cx| {
3668                p.find_or_create_local_worktree("/root-1", true, cx)
3669            })
3670            .await
3671            .unwrap();
3672        worktree_1
3673            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3674            .await;
3675        let (worktree_2, _) = project_a
3676            .update(cx_a, |p, cx| {
3677                p.find_or_create_local_worktree("/root-2", true, cx)
3678            })
3679            .await
3680            .unwrap();
3681        worktree_2
3682            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3683            .await;
3684
3685        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3686
3687        // Join the worktree as client B.
3688        let project_b = Project::remote(
3689            project_id,
3690            client_b.clone(),
3691            client_b.user_store.clone(),
3692            lang_registry.clone(),
3693            fs.clone(),
3694            &mut cx_b.to_async(),
3695        )
3696        .await
3697        .unwrap();
3698
3699        let results = project_b
3700            .update(cx_b, |project, cx| {
3701                project.search(SearchQuery::text("world", false, false), cx)
3702            })
3703            .await
3704            .unwrap();
3705
3706        let mut ranges_by_path = results
3707            .into_iter()
3708            .map(|(buffer, ranges)| {
3709                buffer.read_with(cx_b, |buffer, cx| {
3710                    let path = buffer.file().unwrap().full_path(cx);
3711                    let offset_ranges = ranges
3712                        .into_iter()
3713                        .map(|range| range.to_offset(buffer))
3714                        .collect::<Vec<_>>();
3715                    (path, offset_ranges)
3716                })
3717            })
3718            .collect::<Vec<_>>();
3719        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3720
3721        assert_eq!(
3722            ranges_by_path,
3723            &[
3724                (PathBuf::from("root-1/a"), vec![6..11]),
3725                (PathBuf::from("root-1/c"), vec![2..7]),
3726                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3727                (PathBuf::from("root-2/e"), vec![7..12]),
3728            ]
3729        );
3730    }
3731
3732    #[gpui::test(iterations = 10)]
3733    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3734        cx_a.foreground().forbid_parking();
3735        let lang_registry = Arc::new(LanguageRegistry::test());
3736        let fs = FakeFs::new(cx_a.background());
3737        fs.insert_tree(
3738            "/root-1",
3739            json!({
3740                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3741            }),
3742        )
3743        .await;
3744
3745        // Set up a fake language server.
3746        let mut language = Language::new(
3747            LanguageConfig {
3748                name: "Rust".into(),
3749                path_suffixes: vec!["rs".to_string()],
3750                ..Default::default()
3751            },
3752            Some(tree_sitter_rust::language()),
3753        );
3754        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3755        lang_registry.add(Arc::new(language));
3756
3757        // Connect to a server as 2 clients.
3758        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3759        let client_a = server.create_client(cx_a, "user_a").await;
3760        let client_b = server.create_client(cx_b, "user_b").await;
3761        server
3762            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3763            .await;
3764
3765        // Share a project as client A
3766        let project_a = cx_a.update(|cx| {
3767            Project::local(
3768                client_a.clone(),
3769                client_a.user_store.clone(),
3770                lang_registry.clone(),
3771                fs.clone(),
3772                cx,
3773            )
3774        });
3775        let (worktree_a, _) = project_a
3776            .update(cx_a, |p, cx| {
3777                p.find_or_create_local_worktree("/root-1", true, cx)
3778            })
3779            .await
3780            .unwrap();
3781        worktree_a
3782            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3783            .await;
3784        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3785        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3786        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3787
3788        // Join the worktree as client B.
3789        let project_b = Project::remote(
3790            project_id,
3791            client_b.clone(),
3792            client_b.user_store.clone(),
3793            lang_registry.clone(),
3794            fs.clone(),
3795            &mut cx_b.to_async(),
3796        )
3797        .await
3798        .unwrap();
3799
3800        // Open the file on client B.
3801        let buffer_b = cx_b
3802            .background()
3803            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3804            .await
3805            .unwrap();
3806
3807        // Request document highlights as the guest.
3808        let fake_language_server = fake_language_servers.next().await.unwrap();
3809        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3810            |params, _| async move {
3811                assert_eq!(
3812                    params
3813                        .text_document_position_params
3814                        .text_document
3815                        .uri
3816                        .as_str(),
3817                    "file:///root-1/main.rs"
3818                );
3819                assert_eq!(
3820                    params.text_document_position_params.position,
3821                    lsp::Position::new(0, 34)
3822                );
3823                Ok(Some(vec![
3824                    lsp::DocumentHighlight {
3825                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3826                        range: lsp::Range::new(
3827                            lsp::Position::new(0, 10),
3828                            lsp::Position::new(0, 16),
3829                        ),
3830                    },
3831                    lsp::DocumentHighlight {
3832                        kind: Some(lsp::DocumentHighlightKind::READ),
3833                        range: lsp::Range::new(
3834                            lsp::Position::new(0, 32),
3835                            lsp::Position::new(0, 38),
3836                        ),
3837                    },
3838                    lsp::DocumentHighlight {
3839                        kind: Some(lsp::DocumentHighlightKind::READ),
3840                        range: lsp::Range::new(
3841                            lsp::Position::new(0, 41),
3842                            lsp::Position::new(0, 47),
3843                        ),
3844                    },
3845                ]))
3846            },
3847        );
3848
3849        let highlights = project_b
3850            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3851            .await
3852            .unwrap();
3853        buffer_b.read_with(cx_b, |buffer, _| {
3854            let snapshot = buffer.snapshot();
3855
3856            let highlights = highlights
3857                .into_iter()
3858                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3859                .collect::<Vec<_>>();
3860            assert_eq!(
3861                highlights,
3862                &[
3863                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3864                    (lsp::DocumentHighlightKind::READ, 32..38),
3865                    (lsp::DocumentHighlightKind::READ, 41..47)
3866                ]
3867            )
3868        });
3869    }
3870
3871    #[gpui::test(iterations = 10)]
3872    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3873        cx_a.foreground().forbid_parking();
3874        let lang_registry = Arc::new(LanguageRegistry::test());
3875        let fs = FakeFs::new(cx_a.background());
3876        fs.insert_tree(
3877            "/code",
3878            json!({
3879                "crate-1": {
3880                    "one.rs": "const ONE: usize = 1;",
3881                },
3882                "crate-2": {
3883                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3884                },
3885                "private": {
3886                    "passwords.txt": "the-password",
3887                }
3888            }),
3889        )
3890        .await;
3891
3892        // Set up a fake language server.
3893        let mut language = Language::new(
3894            LanguageConfig {
3895                name: "Rust".into(),
3896                path_suffixes: vec!["rs".to_string()],
3897                ..Default::default()
3898            },
3899            Some(tree_sitter_rust::language()),
3900        );
3901        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3902        lang_registry.add(Arc::new(language));
3903
3904        // Connect to a server as 2 clients.
3905        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3906        let client_a = server.create_client(cx_a, "user_a").await;
3907        let client_b = server.create_client(cx_b, "user_b").await;
3908        server
3909            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
3910            .await;
3911
3912        // Share a project as client A
3913        let project_a = cx_a.update(|cx| {
3914            Project::local(
3915                client_a.clone(),
3916                client_a.user_store.clone(),
3917                lang_registry.clone(),
3918                fs.clone(),
3919                cx,
3920            )
3921        });
3922        let (worktree_a, _) = project_a
3923            .update(cx_a, |p, cx| {
3924                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3925            })
3926            .await
3927            .unwrap();
3928        worktree_a
3929            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3930            .await;
3931        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3932        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3933        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3934
3935        // Join the worktree as client B.
3936        let project_b = Project::remote(
3937            project_id,
3938            client_b.clone(),
3939            client_b.user_store.clone(),
3940            lang_registry.clone(),
3941            fs.clone(),
3942            &mut cx_b.to_async(),
3943        )
3944        .await
3945        .unwrap();
3946
3947        // Cause the language server to start.
3948        let _buffer = cx_b
3949            .background()
3950            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3951            .await
3952            .unwrap();
3953
3954        let fake_language_server = fake_language_servers.next().await.unwrap();
3955        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3956            |_, _| async move {
3957                #[allow(deprecated)]
3958                Ok(Some(vec![lsp::SymbolInformation {
3959                    name: "TWO".into(),
3960                    location: lsp::Location {
3961                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3962                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3963                    },
3964                    kind: lsp::SymbolKind::CONSTANT,
3965                    tags: None,
3966                    container_name: None,
3967                    deprecated: None,
3968                }]))
3969            },
3970        );
3971
3972        // Request the definition of a symbol as the guest.
3973        let symbols = project_b
3974            .update(cx_b, |p, cx| p.symbols("two", cx))
3975            .await
3976            .unwrap();
3977        assert_eq!(symbols.len(), 1);
3978        assert_eq!(symbols[0].name, "TWO");
3979
3980        // Open one of the returned symbols.
3981        let buffer_b_2 = project_b
3982            .update(cx_b, |project, cx| {
3983                project.open_buffer_for_symbol(&symbols[0], cx)
3984            })
3985            .await
3986            .unwrap();
3987        buffer_b_2.read_with(cx_b, |buffer, _| {
3988            assert_eq!(
3989                buffer.file().unwrap().path().as_ref(),
3990                Path::new("../crate-2/two.rs")
3991            );
3992        });
3993
3994        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3995        let mut fake_symbol = symbols[0].clone();
3996        fake_symbol.path = Path::new("/code/secrets").into();
3997        let error = project_b
3998            .update(cx_b, |project, cx| {
3999                project.open_buffer_for_symbol(&fake_symbol, cx)
4000            })
4001            .await
4002            .unwrap_err();
4003        assert!(error.to_string().contains("invalid symbol signature"));
4004    }
4005
4006    #[gpui::test(iterations = 10)]
4007    async fn test_open_buffer_while_getting_definition_pointing_to_it(
4008        cx_a: &mut TestAppContext,
4009        cx_b: &mut TestAppContext,
4010        mut rng: StdRng,
4011    ) {
4012        cx_a.foreground().forbid_parking();
4013        let lang_registry = Arc::new(LanguageRegistry::test());
4014        let fs = FakeFs::new(cx_a.background());
4015        fs.insert_tree(
4016            "/root",
4017            json!({
4018                "a.rs": "const ONE: usize = b::TWO;",
4019                "b.rs": "const TWO: usize = 2",
4020            }),
4021        )
4022        .await;
4023
4024        // Set up a fake language server.
4025        let mut language = Language::new(
4026            LanguageConfig {
4027                name: "Rust".into(),
4028                path_suffixes: vec!["rs".to_string()],
4029                ..Default::default()
4030            },
4031            Some(tree_sitter_rust::language()),
4032        );
4033        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4034        lang_registry.add(Arc::new(language));
4035
4036        // Connect to a server as 2 clients.
4037        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4038        let client_a = server.create_client(cx_a, "user_a").await;
4039        let client_b = server.create_client(cx_b, "user_b").await;
4040        server
4041            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4042            .await;
4043
4044        // Share a project as client A
4045        let project_a = cx_a.update(|cx| {
4046            Project::local(
4047                client_a.clone(),
4048                client_a.user_store.clone(),
4049                lang_registry.clone(),
4050                fs.clone(),
4051                cx,
4052            )
4053        });
4054
4055        let (worktree_a, _) = project_a
4056            .update(cx_a, |p, cx| {
4057                p.find_or_create_local_worktree("/root", true, cx)
4058            })
4059            .await
4060            .unwrap();
4061        worktree_a
4062            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4063            .await;
4064        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4065        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4066        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4067
4068        // Join the worktree as client B.
4069        let project_b = Project::remote(
4070            project_id,
4071            client_b.clone(),
4072            client_b.user_store.clone(),
4073            lang_registry.clone(),
4074            fs.clone(),
4075            &mut cx_b.to_async(),
4076        )
4077        .await
4078        .unwrap();
4079
4080        let buffer_b1 = cx_b
4081            .background()
4082            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
4083            .await
4084            .unwrap();
4085
4086        let fake_language_server = fake_language_servers.next().await.unwrap();
4087        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
4088            |_, _| async move {
4089                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
4090                    lsp::Location::new(
4091                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
4092                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4093                    ),
4094                )))
4095            },
4096        );
4097
4098        let definitions;
4099        let buffer_b2;
4100        if rng.gen() {
4101            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4102            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4103        } else {
4104            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4105            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4106        }
4107
4108        let buffer_b2 = buffer_b2.await.unwrap();
4109        let definitions = definitions.await.unwrap();
4110        assert_eq!(definitions.len(), 1);
4111        assert_eq!(definitions[0].buffer, buffer_b2);
4112    }
4113
4114    #[gpui::test(iterations = 10)]
4115    async fn test_collaborating_with_code_actions(
4116        cx_a: &mut TestAppContext,
4117        cx_b: &mut TestAppContext,
4118    ) {
4119        cx_a.foreground().forbid_parking();
4120        let lang_registry = Arc::new(LanguageRegistry::test());
4121        let fs = FakeFs::new(cx_a.background());
4122        cx_b.update(|cx| editor::init(cx));
4123
4124        // Set up a fake language server.
4125        let mut language = Language::new(
4126            LanguageConfig {
4127                name: "Rust".into(),
4128                path_suffixes: vec!["rs".to_string()],
4129                ..Default::default()
4130            },
4131            Some(tree_sitter_rust::language()),
4132        );
4133        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4134        lang_registry.add(Arc::new(language));
4135
4136        // Connect to a server as 2 clients.
4137        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4138        let client_a = server.create_client(cx_a, "user_a").await;
4139        let client_b = server.create_client(cx_b, "user_b").await;
4140        server
4141            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4142            .await;
4143
4144        // Share a project as client A
4145        fs.insert_tree(
4146            "/a",
4147            json!({
4148                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4149                "other.rs": "pub fn foo() -> usize { 4 }",
4150            }),
4151        )
4152        .await;
4153        let project_a = cx_a.update(|cx| {
4154            Project::local(
4155                client_a.clone(),
4156                client_a.user_store.clone(),
4157                lang_registry.clone(),
4158                fs.clone(),
4159                cx,
4160            )
4161        });
4162        let (worktree_a, _) = project_a
4163            .update(cx_a, |p, cx| {
4164                p.find_or_create_local_worktree("/a", true, cx)
4165            })
4166            .await
4167            .unwrap();
4168        worktree_a
4169            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4170            .await;
4171        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4172        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4173        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4174
4175        // Join the worktree as client B.
4176        let project_b = Project::remote(
4177            project_id,
4178            client_b.clone(),
4179            client_b.user_store.clone(),
4180            lang_registry.clone(),
4181            fs.clone(),
4182            &mut cx_b.to_async(),
4183        )
4184        .await
4185        .unwrap();
4186        let mut params = cx_b.update(WorkspaceParams::test);
4187        params.languages = lang_registry.clone();
4188        params.client = client_b.client.clone();
4189        params.user_store = client_b.user_store.clone();
4190        params.project = project_b;
4191
4192        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4193        let editor_b = workspace_b
4194            .update(cx_b, |workspace, cx| {
4195                workspace.open_path((worktree_id, "main.rs"), true, cx)
4196            })
4197            .await
4198            .unwrap()
4199            .downcast::<Editor>()
4200            .unwrap();
4201
4202        let mut fake_language_server = fake_language_servers.next().await.unwrap();
4203        fake_language_server
4204            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4205                assert_eq!(
4206                    params.text_document.uri,
4207                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4208                );
4209                assert_eq!(params.range.start, lsp::Position::new(0, 0));
4210                assert_eq!(params.range.end, lsp::Position::new(0, 0));
4211                Ok(None)
4212            })
4213            .next()
4214            .await;
4215
4216        // Move cursor to a location that contains code actions.
4217        editor_b.update(cx_b, |editor, cx| {
4218            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4219            cx.focus(&editor_b);
4220        });
4221
4222        fake_language_server
4223            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4224                assert_eq!(
4225                    params.text_document.uri,
4226                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4227                );
4228                assert_eq!(params.range.start, lsp::Position::new(1, 31));
4229                assert_eq!(params.range.end, lsp::Position::new(1, 31));
4230
4231                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4232                    lsp::CodeAction {
4233                        title: "Inline into all callers".to_string(),
4234                        edit: Some(lsp::WorkspaceEdit {
4235                            changes: Some(
4236                                [
4237                                    (
4238                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
4239                                        vec![lsp::TextEdit::new(
4240                                            lsp::Range::new(
4241                                                lsp::Position::new(1, 22),
4242                                                lsp::Position::new(1, 34),
4243                                            ),
4244                                            "4".to_string(),
4245                                        )],
4246                                    ),
4247                                    (
4248                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
4249                                        vec![lsp::TextEdit::new(
4250                                            lsp::Range::new(
4251                                                lsp::Position::new(0, 0),
4252                                                lsp::Position::new(0, 27),
4253                                            ),
4254                                            "".to_string(),
4255                                        )],
4256                                    ),
4257                                ]
4258                                .into_iter()
4259                                .collect(),
4260                            ),
4261                            ..Default::default()
4262                        }),
4263                        data: Some(json!({
4264                            "codeActionParams": {
4265                                "range": {
4266                                    "start": {"line": 1, "column": 31},
4267                                    "end": {"line": 1, "column": 31},
4268                                }
4269                            }
4270                        })),
4271                        ..Default::default()
4272                    },
4273                )]))
4274            })
4275            .next()
4276            .await;
4277
4278        // Toggle code actions and wait for them to display.
4279        editor_b.update(cx_b, |editor, cx| {
4280            editor.toggle_code_actions(
4281                &ToggleCodeActions {
4282                    deployed_from_indicator: false,
4283                },
4284                cx,
4285            );
4286        });
4287        editor_b
4288            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4289            .await;
4290
4291        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4292
4293        // Confirming the code action will trigger a resolve request.
4294        let confirm_action = workspace_b
4295            .update(cx_b, |workspace, cx| {
4296                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4297            })
4298            .unwrap();
4299        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4300            |_, _| async move {
4301                Ok(lsp::CodeAction {
4302                    title: "Inline into all callers".to_string(),
4303                    edit: Some(lsp::WorkspaceEdit {
4304                        changes: Some(
4305                            [
4306                                (
4307                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4308                                    vec![lsp::TextEdit::new(
4309                                        lsp::Range::new(
4310                                            lsp::Position::new(1, 22),
4311                                            lsp::Position::new(1, 34),
4312                                        ),
4313                                        "4".to_string(),
4314                                    )],
4315                                ),
4316                                (
4317                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4318                                    vec![lsp::TextEdit::new(
4319                                        lsp::Range::new(
4320                                            lsp::Position::new(0, 0),
4321                                            lsp::Position::new(0, 27),
4322                                        ),
4323                                        "".to_string(),
4324                                    )],
4325                                ),
4326                            ]
4327                            .into_iter()
4328                            .collect(),
4329                        ),
4330                        ..Default::default()
4331                    }),
4332                    ..Default::default()
4333                })
4334            },
4335        );
4336
4337        // After the action is confirmed, an editor containing both modified files is opened.
4338        confirm_action.await.unwrap();
4339        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4340            workspace
4341                .active_item(cx)
4342                .unwrap()
4343                .downcast::<Editor>()
4344                .unwrap()
4345        });
4346        code_action_editor.update(cx_b, |editor, cx| {
4347            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4348            editor.undo(&Undo, cx);
4349            assert_eq!(
4350                editor.text(cx),
4351                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4352            );
4353            editor.redo(&Redo, cx);
4354            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4355        });
4356    }
4357
4358    #[gpui::test(iterations = 10)]
4359    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4360        cx_a.foreground().forbid_parking();
4361        let lang_registry = Arc::new(LanguageRegistry::test());
4362        let fs = FakeFs::new(cx_a.background());
4363        cx_b.update(|cx| editor::init(cx));
4364
4365        // Set up a fake language server.
4366        let mut language = Language::new(
4367            LanguageConfig {
4368                name: "Rust".into(),
4369                path_suffixes: vec!["rs".to_string()],
4370                ..Default::default()
4371            },
4372            Some(tree_sitter_rust::language()),
4373        );
4374        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4375            capabilities: lsp::ServerCapabilities {
4376                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4377                    prepare_provider: Some(true),
4378                    work_done_progress_options: Default::default(),
4379                })),
4380                ..Default::default()
4381            },
4382            ..Default::default()
4383        });
4384        lang_registry.add(Arc::new(language));
4385
4386        // Connect to a server as 2 clients.
4387        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4388        let client_a = server.create_client(cx_a, "user_a").await;
4389        let client_b = server.create_client(cx_b, "user_b").await;
4390        server
4391            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
4392            .await;
4393
4394        // Share a project as client A
4395        fs.insert_tree(
4396            "/dir",
4397            json!({
4398                "one.rs": "const ONE: usize = 1;",
4399                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4400            }),
4401        )
4402        .await;
4403        let project_a = cx_a.update(|cx| {
4404            Project::local(
4405                client_a.clone(),
4406                client_a.user_store.clone(),
4407                lang_registry.clone(),
4408                fs.clone(),
4409                cx,
4410            )
4411        });
4412        let (worktree_a, _) = project_a
4413            .update(cx_a, |p, cx| {
4414                p.find_or_create_local_worktree("/dir", true, cx)
4415            })
4416            .await
4417            .unwrap();
4418        worktree_a
4419            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4420            .await;
4421        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4422        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4423        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4424
4425        // Join the worktree as client B.
4426        let project_b = Project::remote(
4427            project_id,
4428            client_b.clone(),
4429            client_b.user_store.clone(),
4430            lang_registry.clone(),
4431            fs.clone(),
4432            &mut cx_b.to_async(),
4433        )
4434        .await
4435        .unwrap();
4436        let mut params = cx_b.update(WorkspaceParams::test);
4437        params.languages = lang_registry.clone();
4438        params.client = client_b.client.clone();
4439        params.user_store = client_b.user_store.clone();
4440        params.project = project_b;
4441
4442        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4443        let editor_b = workspace_b
4444            .update(cx_b, |workspace, cx| {
4445                workspace.open_path((worktree_id, "one.rs"), true, cx)
4446            })
4447            .await
4448            .unwrap()
4449            .downcast::<Editor>()
4450            .unwrap();
4451        let fake_language_server = fake_language_servers.next().await.unwrap();
4452
4453        // Move cursor to a location that can be renamed.
4454        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4455            editor.select_ranges([7..7], None, cx);
4456            editor.rename(&Rename, cx).unwrap()
4457        });
4458
4459        fake_language_server
4460            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4461                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4462                assert_eq!(params.position, lsp::Position::new(0, 7));
4463                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4464                    lsp::Position::new(0, 6),
4465                    lsp::Position::new(0, 9),
4466                ))))
4467            })
4468            .next()
4469            .await
4470            .unwrap();
4471        prepare_rename.await.unwrap();
4472        editor_b.update(cx_b, |editor, cx| {
4473            let rename = editor.pending_rename().unwrap();
4474            let buffer = editor.buffer().read(cx).snapshot(cx);
4475            assert_eq!(
4476                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4477                6..9
4478            );
4479            rename.editor.update(cx, |rename_editor, cx| {
4480                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4481                    rename_buffer.edit([(0..3, "THREE")], cx);
4482                });
4483            });
4484        });
4485
4486        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4487            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4488        });
4489        fake_language_server
4490            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4491                assert_eq!(
4492                    params.text_document_position.text_document.uri.as_str(),
4493                    "file:///dir/one.rs"
4494                );
4495                assert_eq!(
4496                    params.text_document_position.position,
4497                    lsp::Position::new(0, 6)
4498                );
4499                assert_eq!(params.new_name, "THREE");
4500                Ok(Some(lsp::WorkspaceEdit {
4501                    changes: Some(
4502                        [
4503                            (
4504                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4505                                vec![lsp::TextEdit::new(
4506                                    lsp::Range::new(
4507                                        lsp::Position::new(0, 6),
4508                                        lsp::Position::new(0, 9),
4509                                    ),
4510                                    "THREE".to_string(),
4511                                )],
4512                            ),
4513                            (
4514                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4515                                vec![
4516                                    lsp::TextEdit::new(
4517                                        lsp::Range::new(
4518                                            lsp::Position::new(0, 24),
4519                                            lsp::Position::new(0, 27),
4520                                        ),
4521                                        "THREE".to_string(),
4522                                    ),
4523                                    lsp::TextEdit::new(
4524                                        lsp::Range::new(
4525                                            lsp::Position::new(0, 35),
4526                                            lsp::Position::new(0, 38),
4527                                        ),
4528                                        "THREE".to_string(),
4529                                    ),
4530                                ],
4531                            ),
4532                        ]
4533                        .into_iter()
4534                        .collect(),
4535                    ),
4536                    ..Default::default()
4537                }))
4538            })
4539            .next()
4540            .await
4541            .unwrap();
4542        confirm_rename.await.unwrap();
4543
4544        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4545            workspace
4546                .active_item(cx)
4547                .unwrap()
4548                .downcast::<Editor>()
4549                .unwrap()
4550        });
4551        rename_editor.update(cx_b, |editor, cx| {
4552            assert_eq!(
4553                editor.text(cx),
4554                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4555            );
4556            editor.undo(&Undo, cx);
4557            assert_eq!(
4558                editor.text(cx),
4559                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4560            );
4561            editor.redo(&Redo, cx);
4562            assert_eq!(
4563                editor.text(cx),
4564                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4565            );
4566        });
4567
4568        // Ensure temporary rename edits cannot be undone/redone.
4569        editor_b.update(cx_b, |editor, cx| {
4570            editor.undo(&Undo, cx);
4571            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4572            editor.undo(&Undo, cx);
4573            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4574            editor.redo(&Redo, cx);
4575            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4576        })
4577    }
4578
4579    #[gpui::test(iterations = 10)]
4580    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4581        cx_a.foreground().forbid_parking();
4582
4583        // Connect to a server as 2 clients.
4584        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4585        let client_a = server.create_client(cx_a, "user_a").await;
4586        let client_b = server.create_client(cx_b, "user_b").await;
4587
4588        // Create an org that includes these 2 users.
4589        let db = &server.app_state.db;
4590        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4591        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4592            .await
4593            .unwrap();
4594        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4595            .await
4596            .unwrap();
4597
4598        // Create a channel that includes all the users.
4599        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4600        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4601            .await
4602            .unwrap();
4603        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4604            .await
4605            .unwrap();
4606        db.create_channel_message(
4607            channel_id,
4608            client_b.current_user_id(&cx_b),
4609            "hello A, it's B.",
4610            OffsetDateTime::now_utc(),
4611            1,
4612        )
4613        .await
4614        .unwrap();
4615
4616        let channels_a = cx_a
4617            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4618        channels_a
4619            .condition(cx_a, |list, _| list.available_channels().is_some())
4620            .await;
4621        channels_a.read_with(cx_a, |list, _| {
4622            assert_eq!(
4623                list.available_channels().unwrap(),
4624                &[ChannelDetails {
4625                    id: channel_id.to_proto(),
4626                    name: "test-channel".to_string()
4627                }]
4628            )
4629        });
4630        let channel_a = channels_a.update(cx_a, |this, cx| {
4631            this.get_channel(channel_id.to_proto(), cx).unwrap()
4632        });
4633        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4634        channel_a
4635            .condition(&cx_a, |channel, _| {
4636                channel_messages(channel)
4637                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4638            })
4639            .await;
4640
4641        let channels_b = cx_b
4642            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4643        channels_b
4644            .condition(cx_b, |list, _| list.available_channels().is_some())
4645            .await;
4646        channels_b.read_with(cx_b, |list, _| {
4647            assert_eq!(
4648                list.available_channels().unwrap(),
4649                &[ChannelDetails {
4650                    id: channel_id.to_proto(),
4651                    name: "test-channel".to_string()
4652                }]
4653            )
4654        });
4655
4656        let channel_b = channels_b.update(cx_b, |this, cx| {
4657            this.get_channel(channel_id.to_proto(), cx).unwrap()
4658        });
4659        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4660        channel_b
4661            .condition(&cx_b, |channel, _| {
4662                channel_messages(channel)
4663                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4664            })
4665            .await;
4666
4667        channel_a
4668            .update(cx_a, |channel, cx| {
4669                channel
4670                    .send_message("oh, hi B.".to_string(), cx)
4671                    .unwrap()
4672                    .detach();
4673                let task = channel.send_message("sup".to_string(), cx).unwrap();
4674                assert_eq!(
4675                    channel_messages(channel),
4676                    &[
4677                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4678                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4679                        ("user_a".to_string(), "sup".to_string(), true)
4680                    ]
4681                );
4682                task
4683            })
4684            .await
4685            .unwrap();
4686
4687        channel_b
4688            .condition(&cx_b, |channel, _| {
4689                channel_messages(channel)
4690                    == [
4691                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4692                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4693                        ("user_a".to_string(), "sup".to_string(), false),
4694                    ]
4695            })
4696            .await;
4697
4698        assert_eq!(
4699            server
4700                .state()
4701                .await
4702                .channel(channel_id)
4703                .unwrap()
4704                .connection_ids
4705                .len(),
4706            2
4707        );
4708        cx_b.update(|_| drop(channel_b));
4709        server
4710            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4711            .await;
4712
4713        cx_a.update(|_| drop(channel_a));
4714        server
4715            .condition(|state| state.channel(channel_id).is_none())
4716            .await;
4717    }
4718
4719    #[gpui::test(iterations = 10)]
4720    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4721        cx_a.foreground().forbid_parking();
4722
4723        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4724        let client_a = server.create_client(cx_a, "user_a").await;
4725
4726        let db = &server.app_state.db;
4727        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4728        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4729        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4730            .await
4731            .unwrap();
4732        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4733            .await
4734            .unwrap();
4735
4736        let channels_a = cx_a
4737            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4738        channels_a
4739            .condition(cx_a, |list, _| list.available_channels().is_some())
4740            .await;
4741        let channel_a = channels_a.update(cx_a, |this, cx| {
4742            this.get_channel(channel_id.to_proto(), cx).unwrap()
4743        });
4744
4745        // Messages aren't allowed to be too long.
4746        channel_a
4747            .update(cx_a, |channel, cx| {
4748                let long_body = "this is long.\n".repeat(1024);
4749                channel.send_message(long_body, cx).unwrap()
4750            })
4751            .await
4752            .unwrap_err();
4753
4754        // Messages aren't allowed to be blank.
4755        channel_a.update(cx_a, |channel, cx| {
4756            channel.send_message(String::new(), cx).unwrap_err()
4757        });
4758
4759        // Leading and trailing whitespace are trimmed.
4760        channel_a
4761            .update(cx_a, |channel, cx| {
4762                channel
4763                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4764                    .unwrap()
4765            })
4766            .await
4767            .unwrap();
4768        assert_eq!(
4769            db.get_channel_messages(channel_id, 10, None)
4770                .await
4771                .unwrap()
4772                .iter()
4773                .map(|m| &m.body)
4774                .collect::<Vec<_>>(),
4775            &["surrounded by whitespace"]
4776        );
4777    }
4778
4779    #[gpui::test(iterations = 10)]
4780    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4781        cx_a.foreground().forbid_parking();
4782
4783        // Connect to a server as 2 clients.
4784        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4785        let client_a = server.create_client(cx_a, "user_a").await;
4786        let client_b = server.create_client(cx_b, "user_b").await;
4787        let mut status_b = client_b.status();
4788
4789        // Create an org that includes these 2 users.
4790        let db = &server.app_state.db;
4791        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4792        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4793            .await
4794            .unwrap();
4795        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4796            .await
4797            .unwrap();
4798
4799        // Create a channel that includes all the users.
4800        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4801        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4802            .await
4803            .unwrap();
4804        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4805            .await
4806            .unwrap();
4807        db.create_channel_message(
4808            channel_id,
4809            client_b.current_user_id(&cx_b),
4810            "hello A, it's B.",
4811            OffsetDateTime::now_utc(),
4812            2,
4813        )
4814        .await
4815        .unwrap();
4816
4817        let channels_a = cx_a
4818            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4819        channels_a
4820            .condition(cx_a, |list, _| list.available_channels().is_some())
4821            .await;
4822
4823        channels_a.read_with(cx_a, |list, _| {
4824            assert_eq!(
4825                list.available_channels().unwrap(),
4826                &[ChannelDetails {
4827                    id: channel_id.to_proto(),
4828                    name: "test-channel".to_string()
4829                }]
4830            )
4831        });
4832        let channel_a = channels_a.update(cx_a, |this, cx| {
4833            this.get_channel(channel_id.to_proto(), cx).unwrap()
4834        });
4835        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4836        channel_a
4837            .condition(&cx_a, |channel, _| {
4838                channel_messages(channel)
4839                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4840            })
4841            .await;
4842
4843        let channels_b = cx_b
4844            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4845        channels_b
4846            .condition(cx_b, |list, _| list.available_channels().is_some())
4847            .await;
4848        channels_b.read_with(cx_b, |list, _| {
4849            assert_eq!(
4850                list.available_channels().unwrap(),
4851                &[ChannelDetails {
4852                    id: channel_id.to_proto(),
4853                    name: "test-channel".to_string()
4854                }]
4855            )
4856        });
4857
4858        let channel_b = channels_b.update(cx_b, |this, cx| {
4859            this.get_channel(channel_id.to_proto(), cx).unwrap()
4860        });
4861        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4862        channel_b
4863            .condition(&cx_b, |channel, _| {
4864                channel_messages(channel)
4865                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4866            })
4867            .await;
4868
4869        // Disconnect client B, ensuring we can still access its cached channel data.
4870        server.forbid_connections();
4871        server.disconnect_client(client_b.current_user_id(&cx_b));
4872        cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4873        while !matches!(
4874            status_b.next().await,
4875            Some(client::Status::ReconnectionError { .. })
4876        ) {}
4877
4878        channels_b.read_with(cx_b, |channels, _| {
4879            assert_eq!(
4880                channels.available_channels().unwrap(),
4881                [ChannelDetails {
4882                    id: channel_id.to_proto(),
4883                    name: "test-channel".to_string()
4884                }]
4885            )
4886        });
4887        channel_b.read_with(cx_b, |channel, _| {
4888            assert_eq!(
4889                channel_messages(channel),
4890                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4891            )
4892        });
4893
4894        // Send a message from client B while it is disconnected.
4895        channel_b
4896            .update(cx_b, |channel, cx| {
4897                let task = channel
4898                    .send_message("can you see this?".to_string(), cx)
4899                    .unwrap();
4900                assert_eq!(
4901                    channel_messages(channel),
4902                    &[
4903                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4904                        ("user_b".to_string(), "can you see this?".to_string(), true)
4905                    ]
4906                );
4907                task
4908            })
4909            .await
4910            .unwrap_err();
4911
4912        // Send a message from client A while B is disconnected.
4913        channel_a
4914            .update(cx_a, |channel, cx| {
4915                channel
4916                    .send_message("oh, hi B.".to_string(), cx)
4917                    .unwrap()
4918                    .detach();
4919                let task = channel.send_message("sup".to_string(), cx).unwrap();
4920                assert_eq!(
4921                    channel_messages(channel),
4922                    &[
4923                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4924                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4925                        ("user_a".to_string(), "sup".to_string(), true)
4926                    ]
4927                );
4928                task
4929            })
4930            .await
4931            .unwrap();
4932
4933        // Give client B a chance to reconnect.
4934        server.allow_connections();
4935        cx_b.foreground().advance_clock(Duration::from_secs(10));
4936
4937        // Verify that B sees the new messages upon reconnection, as well as the message client B
4938        // sent while offline.
4939        channel_b
4940            .condition(&cx_b, |channel, _| {
4941                channel_messages(channel)
4942                    == [
4943                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4944                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4945                        ("user_a".to_string(), "sup".to_string(), false),
4946                        ("user_b".to_string(), "can you see this?".to_string(), false),
4947                    ]
4948            })
4949            .await;
4950
4951        // Ensure client A and B can communicate normally after reconnection.
4952        channel_a
4953            .update(cx_a, |channel, cx| {
4954                channel.send_message("you online?".to_string(), cx).unwrap()
4955            })
4956            .await
4957            .unwrap();
4958        channel_b
4959            .condition(&cx_b, |channel, _| {
4960                channel_messages(channel)
4961                    == [
4962                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4963                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4964                        ("user_a".to_string(), "sup".to_string(), false),
4965                        ("user_b".to_string(), "can you see this?".to_string(), false),
4966                        ("user_a".to_string(), "you online?".to_string(), false),
4967                    ]
4968            })
4969            .await;
4970
4971        channel_b
4972            .update(cx_b, |channel, cx| {
4973                channel.send_message("yep".to_string(), cx).unwrap()
4974            })
4975            .await
4976            .unwrap();
4977        channel_a
4978            .condition(&cx_a, |channel, _| {
4979                channel_messages(channel)
4980                    == [
4981                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4982                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4983                        ("user_a".to_string(), "sup".to_string(), false),
4984                        ("user_b".to_string(), "can you see this?".to_string(), false),
4985                        ("user_a".to_string(), "you online?".to_string(), false),
4986                        ("user_b".to_string(), "yep".to_string(), false),
4987                    ]
4988            })
4989            .await;
4990    }
4991
4992    #[gpui::test(iterations = 10)]
4993    async fn test_contacts(
4994        deterministic: Arc<Deterministic>,
4995        cx_a: &mut TestAppContext,
4996        cx_b: &mut TestAppContext,
4997        cx_c: &mut TestAppContext,
4998    ) {
4999        cx_a.foreground().forbid_parking();
5000
5001        // Connect to a server as 3 clients.
5002        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5003        let mut client_a = server.create_client(cx_a, "user_a").await;
5004        let mut client_b = server.create_client(cx_b, "user_b").await;
5005        let client_c = server.create_client(cx_c, "user_c").await;
5006        server
5007            .make_contacts(vec![
5008                (&client_a, cx_a),
5009                (&client_b, cx_b),
5010                (&client_c, cx_c),
5011            ])
5012            .await;
5013
5014        deterministic.run_until_parked();
5015        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5016            client.user_store.read_with(*cx, |store, _| {
5017                assert_eq!(
5018                    contacts(store),
5019                    [
5020                        ("user_a", true, vec![]),
5021                        ("user_b", true, vec![]),
5022                        ("user_c", true, vec![])
5023                    ]
5024                )
5025            });
5026        }
5027
5028        // Share a project as client A.
5029        let fs = FakeFs::new(cx_a.background());
5030        fs.create_dir(Path::new("/a")).await.unwrap();
5031        let (project_a, _) = client_a.build_local_project(fs, "/a", cx_a).await;
5032
5033        deterministic.run_until_parked();
5034        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5035            client.user_store.read_with(*cx, |store, _| {
5036                assert_eq!(
5037                    contacts(store),
5038                    [
5039                        ("user_a", true, vec![("a", false, vec![])]),
5040                        ("user_b", true, vec![]),
5041                        ("user_c", true, vec![])
5042                    ]
5043                )
5044            });
5045        }
5046
5047        let project_id = project_a
5048            .update(cx_a, |project, _| project.next_remote_id())
5049            .await;
5050        project_a
5051            .update(cx_a, |project, cx| project.share(cx))
5052            .await
5053            .unwrap();
5054
5055        deterministic.run_until_parked();
5056        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5057            client.user_store.read_with(*cx, |store, _| {
5058                assert_eq!(
5059                    contacts(store),
5060                    [
5061                        ("user_a", true, vec![("a", true, vec![])]),
5062                        ("user_b", true, vec![]),
5063                        ("user_c", true, vec![])
5064                    ]
5065                )
5066            });
5067        }
5068
5069        let _project_b = client_b.build_remote_project(project_id, cx_b).await;
5070
5071        deterministic.run_until_parked();
5072        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5073            client.user_store.read_with(*cx, |store, _| {
5074                assert_eq!(
5075                    contacts(store),
5076                    [
5077                        ("user_a", true, vec![("a", true, vec!["user_b"])]),
5078                        ("user_b", true, vec![]),
5079                        ("user_c", true, vec![])
5080                    ]
5081                )
5082            });
5083        }
5084
5085        // Add a local project as client B
5086        let fs = FakeFs::new(cx_b.background());
5087        fs.create_dir(Path::new("/b")).await.unwrap();
5088        let (_project_b, _) = client_b.build_local_project(fs, "/b", cx_a).await;
5089
5090        deterministic.run_until_parked();
5091        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5092            client.user_store.read_with(*cx, |store, _| {
5093                assert_eq!(
5094                    contacts(store),
5095                    [
5096                        ("user_a", true, vec![("a", true, vec!["user_b"])]),
5097                        ("user_b", true, vec![("b", false, vec![])]),
5098                        ("user_c", true, vec![])
5099                    ]
5100                )
5101            });
5102        }
5103
5104        project_a
5105            .condition(&cx_a, |project, _| {
5106                project.collaborators().contains_key(&client_b.peer_id)
5107            })
5108            .await;
5109
5110        client_a.project.take();
5111        cx_a.update(move |_| drop(project_a));
5112        deterministic.run_until_parked();
5113        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5114            client.user_store.read_with(*cx, |store, _| {
5115                assert_eq!(
5116                    contacts(store),
5117                    [
5118                        ("user_a", true, vec![]),
5119                        ("user_b", true, vec![("b", false, vec![])]),
5120                        ("user_c", true, vec![])
5121                    ]
5122                )
5123            });
5124        }
5125
5126        server.disconnect_client(client_c.current_user_id(cx_c));
5127        server.forbid_connections();
5128        deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
5129        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b)] {
5130            client.user_store.read_with(*cx, |store, _| {
5131                assert_eq!(
5132                    contacts(store),
5133                    [
5134                        ("user_a", true, vec![]),
5135                        ("user_b", true, vec![("b", false, vec![])]),
5136                        ("user_c", false, vec![])
5137                    ]
5138                )
5139            });
5140        }
5141        client_c
5142            .user_store
5143            .read_with(cx_c, |store, _| assert_eq!(contacts(store), []));
5144
5145        server.allow_connections();
5146        client_c
5147            .authenticate_and_connect(false, &cx_c.to_async())
5148            .await
5149            .unwrap();
5150
5151        deterministic.run_until_parked();
5152        for (client, cx) in [(&client_a, &cx_a), (&client_b, &cx_b), (&client_c, &cx_c)] {
5153            client.user_store.read_with(*cx, |store, _| {
5154                assert_eq!(
5155                    contacts(store),
5156                    [
5157                        ("user_a", true, vec![]),
5158                        ("user_b", true, vec![("b", false, vec![])]),
5159                        ("user_c", true, vec![])
5160                    ]
5161                )
5162            });
5163        }
5164
5165        fn contacts(user_store: &UserStore) -> Vec<(&str, bool, Vec<(&str, bool, Vec<&str>)>)> {
5166            user_store
5167                .contacts()
5168                .iter()
5169                .map(|contact| {
5170                    let projects = contact
5171                        .projects
5172                        .iter()
5173                        .map(|p| {
5174                            (
5175                                p.worktree_root_names[0].as_str(),
5176                                p.is_shared,
5177                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5178                            )
5179                        })
5180                        .collect();
5181                    (contact.user.github_login.as_str(), contact.online, projects)
5182                })
5183                .collect()
5184        }
5185    }
5186
5187    #[gpui::test(iterations = 10)]
5188    async fn test_contact_requests(
5189        executor: Arc<Deterministic>,
5190        cx_a: &mut TestAppContext,
5191        cx_a2: &mut TestAppContext,
5192        cx_b: &mut TestAppContext,
5193        cx_b2: &mut TestAppContext,
5194        cx_c: &mut TestAppContext,
5195        cx_c2: &mut TestAppContext,
5196    ) {
5197        cx_a.foreground().forbid_parking();
5198
5199        // Connect to a server as 3 clients.
5200        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5201        let client_a = server.create_client(cx_a, "user_a").await;
5202        let client_a2 = server.create_client(cx_a2, "user_a").await;
5203        let client_b = server.create_client(cx_b, "user_b").await;
5204        let client_b2 = server.create_client(cx_b2, "user_b").await;
5205        let client_c = server.create_client(cx_c, "user_c").await;
5206        let client_c2 = server.create_client(cx_c2, "user_c").await;
5207
5208        assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5209        assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5210        assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5211
5212        // User A and User C request that user B become their contact.
5213        client_a
5214            .user_store
5215            .update(cx_a, |store, cx| {
5216                store.request_contact(client_b.user_id().unwrap(), cx)
5217            })
5218            .await
5219            .unwrap();
5220        client_c
5221            .user_store
5222            .update(cx_c, |store, cx| {
5223                store.request_contact(client_b.user_id().unwrap(), cx)
5224            })
5225            .await
5226            .unwrap();
5227        executor.run_until_parked();
5228
5229        // All users see the pending request appear in all their clients.
5230        assert_eq!(
5231            client_a.summarize_contacts(&cx_a).outgoing_requests,
5232            &["user_b"]
5233        );
5234        assert_eq!(
5235            client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5236            &["user_b"]
5237        );
5238        assert_eq!(
5239            client_b.summarize_contacts(&cx_b).incoming_requests,
5240            &["user_a", "user_c"]
5241        );
5242        assert_eq!(
5243            client_b2.summarize_contacts(&cx_b2).incoming_requests,
5244            &["user_a", "user_c"]
5245        );
5246        assert_eq!(
5247            client_c.summarize_contacts(&cx_c).outgoing_requests,
5248            &["user_b"]
5249        );
5250        assert_eq!(
5251            client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5252            &["user_b"]
5253        );
5254
5255        // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5256        disconnect_and_reconnect(&client_a, cx_a).await;
5257        disconnect_and_reconnect(&client_b, cx_b).await;
5258        disconnect_and_reconnect(&client_c, cx_c).await;
5259        executor.run_until_parked();
5260        assert_eq!(
5261            client_a.summarize_contacts(&cx_a).outgoing_requests,
5262            &["user_b"]
5263        );
5264        assert_eq!(
5265            client_b.summarize_contacts(&cx_b).incoming_requests,
5266            &["user_a", "user_c"]
5267        );
5268        assert_eq!(
5269            client_c.summarize_contacts(&cx_c).outgoing_requests,
5270            &["user_b"]
5271        );
5272
5273        // User B accepts the request from user A.
5274        client_b
5275            .user_store
5276            .update(cx_b, |store, cx| {
5277                store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
5278            })
5279            .await
5280            .unwrap();
5281
5282        executor.run_until_parked();
5283
5284        // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5285        let contacts_b = client_b.summarize_contacts(&cx_b);
5286        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5287        assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5288        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5289        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5290        assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5291
5292        // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5293        let contacts_a = client_a.summarize_contacts(&cx_a);
5294        assert_eq!(contacts_a.current, &["user_a", "user_b"]);
5295        assert!(contacts_a.outgoing_requests.is_empty());
5296        let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5297        assert_eq!(contacts_a2.current, &["user_a", "user_b"]);
5298        assert!(contacts_a2.outgoing_requests.is_empty());
5299
5300        // Contacts are present upon connecting (tested here via disconnect/reconnect)
5301        disconnect_and_reconnect(&client_a, cx_a).await;
5302        disconnect_and_reconnect(&client_b, cx_b).await;
5303        disconnect_and_reconnect(&client_c, cx_c).await;
5304        executor.run_until_parked();
5305        assert_eq!(
5306            client_a.summarize_contacts(&cx_a).current,
5307            &["user_a", "user_b"]
5308        );
5309        assert_eq!(
5310            client_b.summarize_contacts(&cx_b).current,
5311            &["user_a", "user_b"]
5312        );
5313        assert_eq!(
5314            client_b.summarize_contacts(&cx_b).incoming_requests,
5315            &["user_c"]
5316        );
5317        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5318        assert_eq!(
5319            client_c.summarize_contacts(&cx_c).outgoing_requests,
5320            &["user_b"]
5321        );
5322
5323        // User B rejects the request from user C.
5324        client_b
5325            .user_store
5326            .update(cx_b, |store, cx| {
5327                store.respond_to_contact_request(client_c.user_id().unwrap(), false, cx)
5328            })
5329            .await
5330            .unwrap();
5331
5332        executor.run_until_parked();
5333
5334        // User B doesn't see user C as their contact, and the incoming request from them is removed.
5335        let contacts_b = client_b.summarize_contacts(&cx_b);
5336        assert_eq!(contacts_b.current, &["user_a", "user_b"]);
5337        assert!(contacts_b.incoming_requests.is_empty());
5338        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5339        assert_eq!(contacts_b2.current, &["user_a", "user_b"]);
5340        assert!(contacts_b2.incoming_requests.is_empty());
5341
5342        // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5343        let contacts_c = client_c.summarize_contacts(&cx_c);
5344        assert_eq!(contacts_c.current, &["user_c"]);
5345        assert!(contacts_c.outgoing_requests.is_empty());
5346        let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5347        assert_eq!(contacts_c2.current, &["user_c"]);
5348        assert!(contacts_c2.outgoing_requests.is_empty());
5349
5350        // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5351        disconnect_and_reconnect(&client_a, cx_a).await;
5352        disconnect_and_reconnect(&client_b, cx_b).await;
5353        disconnect_and_reconnect(&client_c, cx_c).await;
5354        executor.run_until_parked();
5355        assert_eq!(
5356            client_a.summarize_contacts(&cx_a).current,
5357            &["user_a", "user_b"]
5358        );
5359        assert_eq!(
5360            client_b.summarize_contacts(&cx_b).current,
5361            &["user_a", "user_b"]
5362        );
5363        assert!(client_b
5364            .summarize_contacts(&cx_b)
5365            .incoming_requests
5366            .is_empty());
5367        assert_eq!(client_c.summarize_contacts(&cx_c).current, &["user_c"]);
5368        assert!(client_c
5369            .summarize_contacts(&cx_c)
5370            .outgoing_requests
5371            .is_empty());
5372
5373        async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5374            client.disconnect(&cx.to_async()).unwrap();
5375            client.clear_contacts(cx).await;
5376            client
5377                .authenticate_and_connect(false, &cx.to_async())
5378                .await
5379                .unwrap();
5380        }
5381    }
5382
5383    #[gpui::test(iterations = 10)]
5384    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5385        cx_a.foreground().forbid_parking();
5386        let fs = FakeFs::new(cx_a.background());
5387
5388        // 2 clients connect to a server.
5389        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5390        let mut client_a = server.create_client(cx_a, "user_a").await;
5391        let mut client_b = server.create_client(cx_b, "user_b").await;
5392        server
5393            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5394            .await;
5395        cx_a.update(editor::init);
5396        cx_b.update(editor::init);
5397
5398        // Client A shares a project.
5399        fs.insert_tree(
5400            "/a",
5401            json!({
5402                "1.txt": "one",
5403                "2.txt": "two",
5404                "3.txt": "three",
5405            }),
5406        )
5407        .await;
5408        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5409        project_a
5410            .update(cx_a, |project, cx| project.share(cx))
5411            .await
5412            .unwrap();
5413
5414        // Client B joins the project.
5415        let project_b = client_b
5416            .build_remote_project(
5417                project_a
5418                    .read_with(cx_a, |project, _| project.remote_id())
5419                    .unwrap(),
5420                cx_b,
5421            )
5422            .await;
5423
5424        // Client A opens some editors.
5425        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5426        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5427        let editor_a1 = workspace_a
5428            .update(cx_a, |workspace, cx| {
5429                workspace.open_path((worktree_id, "1.txt"), true, cx)
5430            })
5431            .await
5432            .unwrap()
5433            .downcast::<Editor>()
5434            .unwrap();
5435        let editor_a2 = workspace_a
5436            .update(cx_a, |workspace, cx| {
5437                workspace.open_path((worktree_id, "2.txt"), true, cx)
5438            })
5439            .await
5440            .unwrap()
5441            .downcast::<Editor>()
5442            .unwrap();
5443
5444        // Client B opens an editor.
5445        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5446        let editor_b1 = workspace_b
5447            .update(cx_b, |workspace, cx| {
5448                workspace.open_path((worktree_id, "1.txt"), true, cx)
5449            })
5450            .await
5451            .unwrap()
5452            .downcast::<Editor>()
5453            .unwrap();
5454
5455        let client_a_id = project_b.read_with(cx_b, |project, _| {
5456            project.collaborators().values().next().unwrap().peer_id
5457        });
5458        let client_b_id = project_a.read_with(cx_a, |project, _| {
5459            project.collaborators().values().next().unwrap().peer_id
5460        });
5461
5462        // When client B starts following client A, all visible view states are replicated to client B.
5463        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5464        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5465        workspace_b
5466            .update(cx_b, |workspace, cx| {
5467                workspace
5468                    .toggle_follow(&ToggleFollow(client_a_id), cx)
5469                    .unwrap()
5470            })
5471            .await
5472            .unwrap();
5473        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5474            workspace
5475                .active_item(cx)
5476                .unwrap()
5477                .downcast::<Editor>()
5478                .unwrap()
5479        });
5480        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5481        assert_eq!(
5482            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5483            Some((worktree_id, "2.txt").into())
5484        );
5485        assert_eq!(
5486            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5487            vec![2..3]
5488        );
5489        assert_eq!(
5490            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5491            vec![0..1]
5492        );
5493
5494        // When client A activates a different editor, client B does so as well.
5495        workspace_a.update(cx_a, |workspace, cx| {
5496            workspace.activate_item(&editor_a1, cx)
5497        });
5498        workspace_b
5499            .condition(cx_b, |workspace, cx| {
5500                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5501            })
5502            .await;
5503
5504        // When client A navigates back and forth, client B does so as well.
5505        workspace_a
5506            .update(cx_a, |workspace, cx| {
5507                workspace::Pane::go_back(workspace, None, cx)
5508            })
5509            .await;
5510        workspace_b
5511            .condition(cx_b, |workspace, cx| {
5512                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5513            })
5514            .await;
5515
5516        workspace_a
5517            .update(cx_a, |workspace, cx| {
5518                workspace::Pane::go_forward(workspace, None, cx)
5519            })
5520            .await;
5521        workspace_b
5522            .condition(cx_b, |workspace, cx| {
5523                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5524            })
5525            .await;
5526
5527        // Changes to client A's editor are reflected on client B.
5528        editor_a1.update(cx_a, |editor, cx| {
5529            editor.select_ranges([1..1, 2..2], None, cx);
5530        });
5531        editor_b1
5532            .condition(cx_b, |editor, cx| {
5533                editor.selected_ranges(cx) == vec![1..1, 2..2]
5534            })
5535            .await;
5536
5537        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5538        editor_b1
5539            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5540            .await;
5541
5542        editor_a1.update(cx_a, |editor, cx| {
5543            editor.select_ranges([3..3], None, cx);
5544            editor.set_scroll_position(vec2f(0., 100.), cx);
5545        });
5546        editor_b1
5547            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5548            .await;
5549
5550        // After unfollowing, client B stops receiving updates from client A.
5551        workspace_b.update(cx_b, |workspace, cx| {
5552            workspace.unfollow(&workspace.active_pane().clone(), cx)
5553        });
5554        workspace_a.update(cx_a, |workspace, cx| {
5555            workspace.activate_item(&editor_a2, cx)
5556        });
5557        cx_a.foreground().run_until_parked();
5558        assert_eq!(
5559            workspace_b.read_with(cx_b, |workspace, cx| workspace
5560                .active_item(cx)
5561                .unwrap()
5562                .id()),
5563            editor_b1.id()
5564        );
5565
5566        // Client A starts following client B.
5567        workspace_a
5568            .update(cx_a, |workspace, cx| {
5569                workspace
5570                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5571                    .unwrap()
5572            })
5573            .await
5574            .unwrap();
5575        assert_eq!(
5576            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5577            Some(client_b_id)
5578        );
5579        assert_eq!(
5580            workspace_a.read_with(cx_a, |workspace, cx| workspace
5581                .active_item(cx)
5582                .unwrap()
5583                .id()),
5584            editor_a1.id()
5585        );
5586
5587        // Following interrupts when client B disconnects.
5588        client_b.disconnect(&cx_b.to_async()).unwrap();
5589        cx_a.foreground().run_until_parked();
5590        assert_eq!(
5591            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5592            None
5593        );
5594    }
5595
5596    #[gpui::test(iterations = 10)]
5597    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5598        cx_a.foreground().forbid_parking();
5599        let fs = FakeFs::new(cx_a.background());
5600
5601        // 2 clients connect to a server.
5602        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5603        let mut client_a = server.create_client(cx_a, "user_a").await;
5604        let mut client_b = server.create_client(cx_b, "user_b").await;
5605        server
5606            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5607            .await;
5608        cx_a.update(editor::init);
5609        cx_b.update(editor::init);
5610
5611        // Client A shares a project.
5612        fs.insert_tree(
5613            "/a",
5614            json!({
5615                "1.txt": "one",
5616                "2.txt": "two",
5617                "3.txt": "three",
5618                "4.txt": "four",
5619            }),
5620        )
5621        .await;
5622        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5623        project_a
5624            .update(cx_a, |project, cx| project.share(cx))
5625            .await
5626            .unwrap();
5627
5628        // Client B joins the project.
5629        let project_b = client_b
5630            .build_remote_project(
5631                project_a
5632                    .read_with(cx_a, |project, _| project.remote_id())
5633                    .unwrap(),
5634                cx_b,
5635            )
5636            .await;
5637
5638        // Client A opens some editors.
5639        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5640        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5641        let _editor_a1 = workspace_a
5642            .update(cx_a, |workspace, cx| {
5643                workspace.open_path((worktree_id, "1.txt"), true, cx)
5644            })
5645            .await
5646            .unwrap()
5647            .downcast::<Editor>()
5648            .unwrap();
5649
5650        // Client B opens an editor.
5651        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5652        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5653        let _editor_b1 = workspace_b
5654            .update(cx_b, |workspace, cx| {
5655                workspace.open_path((worktree_id, "2.txt"), true, cx)
5656            })
5657            .await
5658            .unwrap()
5659            .downcast::<Editor>()
5660            .unwrap();
5661
5662        // Clients A and B follow each other in split panes
5663        workspace_a
5664            .update(cx_a, |workspace, cx| {
5665                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5666                assert_ne!(*workspace.active_pane(), pane_a1);
5667                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5668                workspace
5669                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5670                    .unwrap()
5671            })
5672            .await
5673            .unwrap();
5674        workspace_b
5675            .update(cx_b, |workspace, cx| {
5676                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5677                assert_ne!(*workspace.active_pane(), pane_b1);
5678                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5679                workspace
5680                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5681                    .unwrap()
5682            })
5683            .await
5684            .unwrap();
5685
5686        workspace_a
5687            .update(cx_a, |workspace, cx| {
5688                workspace.activate_next_pane(cx);
5689                assert_eq!(*workspace.active_pane(), pane_a1);
5690                workspace.open_path((worktree_id, "3.txt"), true, cx)
5691            })
5692            .await
5693            .unwrap();
5694        workspace_b
5695            .update(cx_b, |workspace, cx| {
5696                workspace.activate_next_pane(cx);
5697                assert_eq!(*workspace.active_pane(), pane_b1);
5698                workspace.open_path((worktree_id, "4.txt"), true, cx)
5699            })
5700            .await
5701            .unwrap();
5702        cx_a.foreground().run_until_parked();
5703
5704        // Ensure leader updates don't change the active pane of followers
5705        workspace_a.read_with(cx_a, |workspace, _| {
5706            assert_eq!(*workspace.active_pane(), pane_a1);
5707        });
5708        workspace_b.read_with(cx_b, |workspace, _| {
5709            assert_eq!(*workspace.active_pane(), pane_b1);
5710        });
5711
5712        // Ensure peers following each other doesn't cause an infinite loop.
5713        assert_eq!(
5714            workspace_a.read_with(cx_a, |workspace, cx| workspace
5715                .active_item(cx)
5716                .unwrap()
5717                .project_path(cx)),
5718            Some((worktree_id, "3.txt").into())
5719        );
5720        workspace_a.update(cx_a, |workspace, cx| {
5721            assert_eq!(
5722                workspace.active_item(cx).unwrap().project_path(cx),
5723                Some((worktree_id, "3.txt").into())
5724            );
5725            workspace.activate_next_pane(cx);
5726            assert_eq!(
5727                workspace.active_item(cx).unwrap().project_path(cx),
5728                Some((worktree_id, "4.txt").into())
5729            );
5730        });
5731        workspace_b.update(cx_b, |workspace, cx| {
5732            assert_eq!(
5733                workspace.active_item(cx).unwrap().project_path(cx),
5734                Some((worktree_id, "4.txt").into())
5735            );
5736            workspace.activate_next_pane(cx);
5737            assert_eq!(
5738                workspace.active_item(cx).unwrap().project_path(cx),
5739                Some((worktree_id, "3.txt").into())
5740            );
5741        });
5742    }
5743
5744    #[gpui::test(iterations = 10)]
5745    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5746        cx_a.foreground().forbid_parking();
5747        let fs = FakeFs::new(cx_a.background());
5748
5749        // 2 clients connect to a server.
5750        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5751        let mut client_a = server.create_client(cx_a, "user_a").await;
5752        let mut client_b = server.create_client(cx_b, "user_b").await;
5753        server
5754            .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
5755            .await;
5756        cx_a.update(editor::init);
5757        cx_b.update(editor::init);
5758
5759        // Client A shares a project.
5760        fs.insert_tree(
5761            "/a",
5762            json!({
5763                "1.txt": "one",
5764                "2.txt": "two",
5765                "3.txt": "three",
5766            }),
5767        )
5768        .await;
5769        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5770        project_a
5771            .update(cx_a, |project, cx| project.share(cx))
5772            .await
5773            .unwrap();
5774
5775        // Client B joins the project.
5776        let project_b = client_b
5777            .build_remote_project(
5778                project_a
5779                    .read_with(cx_a, |project, _| project.remote_id())
5780                    .unwrap(),
5781                cx_b,
5782            )
5783            .await;
5784
5785        // Client A opens some editors.
5786        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5787        let _editor_a1 = workspace_a
5788            .update(cx_a, |workspace, cx| {
5789                workspace.open_path((worktree_id, "1.txt"), true, cx)
5790            })
5791            .await
5792            .unwrap()
5793            .downcast::<Editor>()
5794            .unwrap();
5795
5796        // Client B starts following client A.
5797        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5798        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5799        let leader_id = project_b.read_with(cx_b, |project, _| {
5800            project.collaborators().values().next().unwrap().peer_id
5801        });
5802        workspace_b
5803            .update(cx_b, |workspace, cx| {
5804                workspace
5805                    .toggle_follow(&ToggleFollow(leader_id), cx)
5806                    .unwrap()
5807            })
5808            .await
5809            .unwrap();
5810        assert_eq!(
5811            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5812            Some(leader_id)
5813        );
5814        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5815            workspace
5816                .active_item(cx)
5817                .unwrap()
5818                .downcast::<Editor>()
5819                .unwrap()
5820        });
5821
5822        // When client B moves, it automatically stops following client A.
5823        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5824        assert_eq!(
5825            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5826            None
5827        );
5828
5829        workspace_b
5830            .update(cx_b, |workspace, cx| {
5831                workspace
5832                    .toggle_follow(&ToggleFollow(leader_id), cx)
5833                    .unwrap()
5834            })
5835            .await
5836            .unwrap();
5837        assert_eq!(
5838            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5839            Some(leader_id)
5840        );
5841
5842        // When client B edits, it automatically stops following client A.
5843        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5844        assert_eq!(
5845            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5846            None
5847        );
5848
5849        workspace_b
5850            .update(cx_b, |workspace, cx| {
5851                workspace
5852                    .toggle_follow(&ToggleFollow(leader_id), cx)
5853                    .unwrap()
5854            })
5855            .await
5856            .unwrap();
5857        assert_eq!(
5858            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5859            Some(leader_id)
5860        );
5861
5862        // When client B scrolls, it automatically stops following client A.
5863        editor_b2.update(cx_b, |editor, cx| {
5864            editor.set_scroll_position(vec2f(0., 3.), cx)
5865        });
5866        assert_eq!(
5867            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5868            None
5869        );
5870
5871        workspace_b
5872            .update(cx_b, |workspace, cx| {
5873                workspace
5874                    .toggle_follow(&ToggleFollow(leader_id), cx)
5875                    .unwrap()
5876            })
5877            .await
5878            .unwrap();
5879        assert_eq!(
5880            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5881            Some(leader_id)
5882        );
5883
5884        // When client B activates a different pane, it continues following client A in the original pane.
5885        workspace_b.update(cx_b, |workspace, cx| {
5886            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5887        });
5888        assert_eq!(
5889            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5890            Some(leader_id)
5891        );
5892
5893        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5894        assert_eq!(
5895            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5896            Some(leader_id)
5897        );
5898
5899        // When client B activates a different item in the original pane, it automatically stops following client A.
5900        workspace_b
5901            .update(cx_b, |workspace, cx| {
5902                workspace.open_path((worktree_id, "2.txt"), true, cx)
5903            })
5904            .await
5905            .unwrap();
5906        assert_eq!(
5907            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5908            None
5909        );
5910    }
5911
5912    #[gpui::test(iterations = 100)]
5913    async fn test_random_collaboration(
5914        cx: &mut TestAppContext,
5915        deterministic: Arc<Deterministic>,
5916        rng: StdRng,
5917    ) {
5918        cx.foreground().forbid_parking();
5919        let max_peers = env::var("MAX_PEERS")
5920            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5921            .unwrap_or(5);
5922        assert!(max_peers <= 5);
5923
5924        let max_operations = env::var("OPERATIONS")
5925            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5926            .unwrap_or(10);
5927
5928        let rng = Arc::new(Mutex::new(rng));
5929
5930        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5931        let host_language_registry = Arc::new(LanguageRegistry::test());
5932
5933        let fs = FakeFs::new(cx.background());
5934        fs.insert_tree("/_collab", json!({"init": ""})).await;
5935
5936        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5937        let db = server.app_state.db.clone();
5938        let host_user_id = db.create_user("host", false).await.unwrap();
5939        for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
5940            let guest_user_id = db.create_user(username, false).await.unwrap();
5941            server
5942                .app_state
5943                .db
5944                .send_contact_request(guest_user_id, host_user_id)
5945                .await
5946                .unwrap();
5947            server
5948                .app_state
5949                .db
5950                .respond_to_contact_request(host_user_id, guest_user_id, true)
5951                .await
5952                .unwrap();
5953        }
5954
5955        let mut clients = Vec::new();
5956        let mut user_ids = Vec::new();
5957        let mut op_start_signals = Vec::new();
5958
5959        let mut next_entity_id = 100000;
5960        let mut host_cx = TestAppContext::new(
5961            cx.foreground_platform(),
5962            cx.platform(),
5963            deterministic.build_foreground(next_entity_id),
5964            deterministic.build_background(),
5965            cx.font_cache(),
5966            cx.leak_detector(),
5967            next_entity_id,
5968        );
5969        let host = server.create_client(&mut host_cx, "host").await;
5970        let host_project = host_cx.update(|cx| {
5971            Project::local(
5972                host.client.clone(),
5973                host.user_store.clone(),
5974                host_language_registry.clone(),
5975                fs.clone(),
5976                cx,
5977            )
5978        });
5979        let host_project_id = host_project
5980            .update(&mut host_cx, |p, _| p.next_remote_id())
5981            .await;
5982
5983        let (collab_worktree, _) = host_project
5984            .update(&mut host_cx, |project, cx| {
5985                project.find_or_create_local_worktree("/_collab", true, cx)
5986            })
5987            .await
5988            .unwrap();
5989        collab_worktree
5990            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5991            .await;
5992        host_project
5993            .update(&mut host_cx, |project, cx| project.share(cx))
5994            .await
5995            .unwrap();
5996
5997        // Set up fake language servers.
5998        let mut language = Language::new(
5999            LanguageConfig {
6000                name: "Rust".into(),
6001                path_suffixes: vec!["rs".to_string()],
6002                ..Default::default()
6003            },
6004            None,
6005        );
6006        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
6007            name: "the-fake-language-server",
6008            capabilities: lsp::LanguageServer::full_capabilities(),
6009            initializer: Some(Box::new({
6010                let rng = rng.clone();
6011                let fs = fs.clone();
6012                let project = host_project.downgrade();
6013                move |fake_server: &mut FakeLanguageServer| {
6014                    fake_server.handle_request::<lsp::request::Completion, _, _>(
6015                        |_, _| async move {
6016                            Ok(Some(lsp::CompletionResponse::Array(vec![
6017                                lsp::CompletionItem {
6018                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
6019                                        range: lsp::Range::new(
6020                                            lsp::Position::new(0, 0),
6021                                            lsp::Position::new(0, 0),
6022                                        ),
6023                                        new_text: "the-new-text".to_string(),
6024                                    })),
6025                                    ..Default::default()
6026                                },
6027                            ])))
6028                        },
6029                    );
6030
6031                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
6032                        |_, _| async move {
6033                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
6034                                lsp::CodeAction {
6035                                    title: "the-code-action".to_string(),
6036                                    ..Default::default()
6037                                },
6038                            )]))
6039                        },
6040                    );
6041
6042                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
6043                        |params, _| async move {
6044                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
6045                                params.position,
6046                                params.position,
6047                            ))))
6048                        },
6049                    );
6050
6051                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
6052                        let fs = fs.clone();
6053                        let rng = rng.clone();
6054                        move |_, _| {
6055                            let fs = fs.clone();
6056                            let rng = rng.clone();
6057                            async move {
6058                                let files = fs.files().await;
6059                                let mut rng = rng.lock();
6060                                let count = rng.gen_range::<usize, _>(1..3);
6061                                let files = (0..count)
6062                                    .map(|_| files.choose(&mut *rng).unwrap())
6063                                    .collect::<Vec<_>>();
6064                                log::info!("LSP: Returning definitions in files {:?}", &files);
6065                                Ok(Some(lsp::GotoDefinitionResponse::Array(
6066                                    files
6067                                        .into_iter()
6068                                        .map(|file| lsp::Location {
6069                                            uri: lsp::Url::from_file_path(file).unwrap(),
6070                                            range: Default::default(),
6071                                        })
6072                                        .collect(),
6073                                )))
6074                            }
6075                        }
6076                    });
6077
6078                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
6079                        let rng = rng.clone();
6080                        let project = project.clone();
6081                        move |params, mut cx| {
6082                            let highlights = if let Some(project) = project.upgrade(&cx) {
6083                                project.update(&mut cx, |project, cx| {
6084                                    let path = params
6085                                        .text_document_position_params
6086                                        .text_document
6087                                        .uri
6088                                        .to_file_path()
6089                                        .unwrap();
6090                                    let (worktree, relative_path) =
6091                                        project.find_local_worktree(&path, cx)?;
6092                                    let project_path =
6093                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
6094                                    let buffer =
6095                                        project.get_open_buffer(&project_path, cx)?.read(cx);
6096
6097                                    let mut highlights = Vec::new();
6098                                    let highlight_count = rng.lock().gen_range(1..=5);
6099                                    let mut prev_end = 0;
6100                                    for _ in 0..highlight_count {
6101                                        let range =
6102                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
6103
6104                                        highlights.push(lsp::DocumentHighlight {
6105                                            range: range_to_lsp(range.to_point_utf16(buffer)),
6106                                            kind: Some(lsp::DocumentHighlightKind::READ),
6107                                        });
6108                                        prev_end = range.end;
6109                                    }
6110                                    Some(highlights)
6111                                })
6112                            } else {
6113                                None
6114                            };
6115                            async move { Ok(highlights) }
6116                        }
6117                    });
6118                }
6119            })),
6120            ..Default::default()
6121        });
6122        host_language_registry.add(Arc::new(language));
6123
6124        let op_start_signal = futures::channel::mpsc::unbounded();
6125        user_ids.push(host.current_user_id(&host_cx));
6126        op_start_signals.push(op_start_signal.0);
6127        clients.push(host_cx.foreground().spawn(host.simulate_host(
6128            host_project,
6129            op_start_signal.1,
6130            rng.clone(),
6131            host_cx,
6132        )));
6133
6134        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
6135            rng.lock().gen_range(0..max_operations)
6136        } else {
6137            max_operations
6138        };
6139        let mut available_guests = vec![
6140            "guest-1".to_string(),
6141            "guest-2".to_string(),
6142            "guest-3".to_string(),
6143            "guest-4".to_string(),
6144        ];
6145        let mut operations = 0;
6146        while operations < max_operations {
6147            if operations == disconnect_host_at {
6148                server.disconnect_client(user_ids[0]);
6149                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6150                drop(op_start_signals);
6151                let mut clients = futures::future::join_all(clients).await;
6152                cx.foreground().run_until_parked();
6153
6154                let (host, mut host_cx, host_err) = clients.remove(0);
6155                if let Some(host_err) = host_err {
6156                    log::error!("host error - {}", host_err);
6157                }
6158                host.project
6159                    .as_ref()
6160                    .unwrap()
6161                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6162                for (guest, mut guest_cx, guest_err) in clients {
6163                    if let Some(guest_err) = guest_err {
6164                        log::error!("{} error - {}", guest.username, guest_err);
6165                    }
6166                    // TODO
6167                    // let contacts = server
6168                    //     .store
6169                    //     .read()
6170                    //     .await
6171                    //     .contacts_for_user(guest.current_user_id(&guest_cx));
6172                    // assert!(!contacts
6173                    //     .iter()
6174                    //     .flat_map(|contact| &contact.projects)
6175                    //     .any(|project| project.id == host_project_id));
6176                    guest
6177                        .project
6178                        .as_ref()
6179                        .unwrap()
6180                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6181                    guest_cx.update(|_| drop(guest));
6182                }
6183                host_cx.update(|_| drop(host));
6184
6185                return;
6186            }
6187
6188            let distribution = rng.lock().gen_range(0..100);
6189            match distribution {
6190                0..=19 if !available_guests.is_empty() => {
6191                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
6192                    let guest_username = available_guests.remove(guest_ix);
6193                    log::info!("Adding new connection for {}", guest_username);
6194                    next_entity_id += 100000;
6195                    let mut guest_cx = TestAppContext::new(
6196                        cx.foreground_platform(),
6197                        cx.platform(),
6198                        deterministic.build_foreground(next_entity_id),
6199                        deterministic.build_background(),
6200                        cx.font_cache(),
6201                        cx.leak_detector(),
6202                        next_entity_id,
6203                    );
6204                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
6205                    let guest_project = Project::remote(
6206                        host_project_id,
6207                        guest.client.clone(),
6208                        guest.user_store.clone(),
6209                        guest_lang_registry.clone(),
6210                        FakeFs::new(cx.background()),
6211                        &mut guest_cx.to_async(),
6212                    )
6213                    .await
6214                    .unwrap();
6215                    let op_start_signal = futures::channel::mpsc::unbounded();
6216                    user_ids.push(guest.current_user_id(&guest_cx));
6217                    op_start_signals.push(op_start_signal.0);
6218                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6219                        guest_username.clone(),
6220                        guest_project,
6221                        op_start_signal.1,
6222                        rng.clone(),
6223                        guest_cx,
6224                    )));
6225
6226                    log::info!("Added connection for {}", guest_username);
6227                    operations += 1;
6228                }
6229                20..=29 if clients.len() > 1 => {
6230                    let guest_ix = rng.lock().gen_range(1..clients.len());
6231                    log::info!("Removing guest {}", user_ids[guest_ix]);
6232                    let removed_guest_id = user_ids.remove(guest_ix);
6233                    let guest = clients.remove(guest_ix);
6234                    op_start_signals.remove(guest_ix);
6235                    server.disconnect_client(removed_guest_id);
6236                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6237                    let (guest, mut guest_cx, guest_err) = guest.await;
6238                    if let Some(guest_err) = guest_err {
6239                        log::error!("{} error - {}", guest.username, guest_err);
6240                    }
6241                    guest
6242                        .project
6243                        .as_ref()
6244                        .unwrap()
6245                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6246                    // TODO
6247                    // for user_id in &user_ids {
6248                    //     for contact in server.store.read().await.contacts_for_user(*user_id) {
6249                    //         assert_ne!(
6250                    //             contact.user_id, removed_guest_id.0 as u64,
6251                    //             "removed guest is still a contact of another peer"
6252                    //         );
6253                    //         for project in contact.projects {
6254                    //             for project_guest_id in project.guests {
6255                    //                 assert_ne!(
6256                    //                     project_guest_id, removed_guest_id.0 as u64,
6257                    //                     "removed guest appears as still participating on a project"
6258                    //                 );
6259                    //             }
6260                    //         }
6261                    //     }
6262                    // }
6263
6264                    log::info!("{} removed", guest.username);
6265                    available_guests.push(guest.username.clone());
6266                    guest_cx.update(|_| drop(guest));
6267
6268                    operations += 1;
6269                }
6270                _ => {
6271                    while operations < max_operations && rng.lock().gen_bool(0.7) {
6272                        op_start_signals
6273                            .choose(&mut *rng.lock())
6274                            .unwrap()
6275                            .unbounded_send(())
6276                            .unwrap();
6277                        operations += 1;
6278                    }
6279
6280                    if rng.lock().gen_bool(0.8) {
6281                        cx.foreground().run_until_parked();
6282                    }
6283                }
6284            }
6285        }
6286
6287        drop(op_start_signals);
6288        let mut clients = futures::future::join_all(clients).await;
6289        cx.foreground().run_until_parked();
6290
6291        let (host_client, mut host_cx, host_err) = clients.remove(0);
6292        if let Some(host_err) = host_err {
6293            panic!("host error - {}", host_err);
6294        }
6295        let host_project = host_client.project.as_ref().unwrap();
6296        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6297            project
6298                .worktrees(cx)
6299                .map(|worktree| {
6300                    let snapshot = worktree.read(cx).snapshot();
6301                    (snapshot.id(), snapshot)
6302                })
6303                .collect::<BTreeMap<_, _>>()
6304        });
6305
6306        host_client
6307            .project
6308            .as_ref()
6309            .unwrap()
6310            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6311
6312        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6313            if let Some(guest_err) = guest_err {
6314                panic!("{} error - {}", guest_client.username, guest_err);
6315            }
6316            let worktree_snapshots =
6317                guest_client
6318                    .project
6319                    .as_ref()
6320                    .unwrap()
6321                    .read_with(&guest_cx, |project, cx| {
6322                        project
6323                            .worktrees(cx)
6324                            .map(|worktree| {
6325                                let worktree = worktree.read(cx);
6326                                (worktree.id(), worktree.snapshot())
6327                            })
6328                            .collect::<BTreeMap<_, _>>()
6329                    });
6330
6331            assert_eq!(
6332                worktree_snapshots.keys().collect::<Vec<_>>(),
6333                host_worktree_snapshots.keys().collect::<Vec<_>>(),
6334                "{} has different worktrees than the host",
6335                guest_client.username
6336            );
6337            for (id, host_snapshot) in &host_worktree_snapshots {
6338                let guest_snapshot = &worktree_snapshots[id];
6339                assert_eq!(
6340                    guest_snapshot.root_name(),
6341                    host_snapshot.root_name(),
6342                    "{} has different root name than the host for worktree {}",
6343                    guest_client.username,
6344                    id
6345                );
6346                assert_eq!(
6347                    guest_snapshot.entries(false).collect::<Vec<_>>(),
6348                    host_snapshot.entries(false).collect::<Vec<_>>(),
6349                    "{} has different snapshot than the host for worktree {}",
6350                    guest_client.username,
6351                    id
6352                );
6353                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6354            }
6355
6356            guest_client
6357                .project
6358                .as_ref()
6359                .unwrap()
6360                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6361
6362            for guest_buffer in &guest_client.buffers {
6363                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6364                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6365                    project.buffer_for_id(buffer_id, cx).expect(&format!(
6366                        "host does not have buffer for guest:{}, peer:{}, id:{}",
6367                        guest_client.username, guest_client.peer_id, buffer_id
6368                    ))
6369                });
6370                let path = host_buffer
6371                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6372
6373                assert_eq!(
6374                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6375                    0,
6376                    "{}, buffer {}, path {:?} has deferred operations",
6377                    guest_client.username,
6378                    buffer_id,
6379                    path,
6380                );
6381                assert_eq!(
6382                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6383                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6384                    "{}, buffer {}, path {:?}, differs from the host's buffer",
6385                    guest_client.username,
6386                    buffer_id,
6387                    path
6388                );
6389            }
6390
6391            guest_cx.update(|_| drop(guest_client));
6392        }
6393
6394        host_cx.update(|_| drop(host_client));
6395    }
6396
6397    struct TestServer {
6398        peer: Arc<Peer>,
6399        app_state: Arc<AppState>,
6400        server: Arc<Server>,
6401        foreground: Rc<executor::Foreground>,
6402        notifications: mpsc::UnboundedReceiver<()>,
6403        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6404        forbid_connections: Arc<AtomicBool>,
6405        _test_db: TestDb,
6406    }
6407
6408    impl TestServer {
6409        async fn start(
6410            foreground: Rc<executor::Foreground>,
6411            background: Arc<executor::Background>,
6412        ) -> Self {
6413            let test_db = TestDb::fake(background);
6414            let app_state = Self::build_app_state(&test_db).await;
6415            let peer = Peer::new();
6416            let notifications = mpsc::unbounded();
6417            let server = Server::new(app_state.clone(), Some(notifications.0));
6418            Self {
6419                peer,
6420                app_state,
6421                server,
6422                foreground,
6423                notifications: notifications.1,
6424                connection_killers: Default::default(),
6425                forbid_connections: Default::default(),
6426                _test_db: test_db,
6427            }
6428        }
6429
6430        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6431            cx.update(|cx| {
6432                let settings = Settings::test(cx);
6433                cx.set_global(settings);
6434            });
6435
6436            let http = FakeHttpClient::with_404_response();
6437            let user_id =
6438                if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6439                    user.id
6440                } else {
6441                    self.app_state.db.create_user(name, false).await.unwrap()
6442                };
6443            let client_name = name.to_string();
6444            let mut client = Client::new(http.clone());
6445            let server = self.server.clone();
6446            let db = self.app_state.db.clone();
6447            let connection_killers = self.connection_killers.clone();
6448            let forbid_connections = self.forbid_connections.clone();
6449            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6450
6451            Arc::get_mut(&mut client)
6452                .unwrap()
6453                .override_authenticate(move |cx| {
6454                    cx.spawn(|_| async move {
6455                        let access_token = "the-token".to_string();
6456                        Ok(Credentials {
6457                            user_id: user_id.0 as u64,
6458                            access_token,
6459                        })
6460                    })
6461                })
6462                .override_establish_connection(move |credentials, cx| {
6463                    assert_eq!(credentials.user_id, user_id.0 as u64);
6464                    assert_eq!(credentials.access_token, "the-token");
6465
6466                    let server = server.clone();
6467                    let db = db.clone();
6468                    let connection_killers = connection_killers.clone();
6469                    let forbid_connections = forbid_connections.clone();
6470                    let client_name = client_name.clone();
6471                    let connection_id_tx = connection_id_tx.clone();
6472                    cx.spawn(move |cx| async move {
6473                        if forbid_connections.load(SeqCst) {
6474                            Err(EstablishConnectionError::other(anyhow!(
6475                                "server is forbidding connections"
6476                            )))
6477                        } else {
6478                            let (client_conn, server_conn, killed) =
6479                                Connection::in_memory(cx.background());
6480                            connection_killers.lock().insert(user_id, killed);
6481                            let user = db.get_user_by_id(user_id).await.unwrap().unwrap();
6482                            cx.background()
6483                                .spawn(server.handle_connection(
6484                                    server_conn,
6485                                    client_name,
6486                                    user,
6487                                    Some(connection_id_tx),
6488                                    cx.background(),
6489                                ))
6490                                .detach();
6491                            Ok(client_conn)
6492                        }
6493                    })
6494                });
6495
6496            client
6497                .authenticate_and_connect(false, &cx.to_async())
6498                .await
6499                .unwrap();
6500
6501            Channel::init(&client);
6502            Project::init(&client);
6503            cx.update(|cx| {
6504                workspace::init(&client, cx);
6505            });
6506
6507            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6508            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6509
6510            let client = TestClient {
6511                client,
6512                peer_id,
6513                username: name.to_string(),
6514                user_store,
6515                language_registry: Arc::new(LanguageRegistry::test()),
6516                project: Default::default(),
6517                buffers: Default::default(),
6518            };
6519            client.wait_for_current_user(cx).await;
6520            client
6521        }
6522
6523        fn disconnect_client(&self, user_id: UserId) {
6524            self.connection_killers
6525                .lock()
6526                .remove(&user_id)
6527                .unwrap()
6528                .store(true, SeqCst);
6529        }
6530
6531        fn forbid_connections(&self) {
6532            self.forbid_connections.store(true, SeqCst);
6533        }
6534
6535        fn allow_connections(&self) {
6536            self.forbid_connections.store(false, SeqCst);
6537        }
6538
6539        async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6540            while let Some((client_a, cx_a)) = clients.pop() {
6541                for (client_b, cx_b) in &mut clients {
6542                    client_a
6543                        .user_store
6544                        .update(cx_a, |store, cx| {
6545                            store.request_contact(client_b.user_id().unwrap(), cx)
6546                        })
6547                        .await
6548                        .unwrap();
6549                    cx_a.foreground().run_until_parked();
6550                    client_b
6551                        .user_store
6552                        .update(*cx_b, |store, cx| {
6553                            store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx)
6554                        })
6555                        .await
6556                        .unwrap();
6557                }
6558            }
6559        }
6560
6561        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6562            Arc::new(AppState {
6563                db: test_db.db().clone(),
6564                api_token: Default::default(),
6565            })
6566        }
6567
6568        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6569            self.server.store.read().await
6570        }
6571
6572        async fn condition<F>(&mut self, mut predicate: F)
6573        where
6574            F: FnMut(&Store) -> bool,
6575        {
6576            assert!(
6577                self.foreground.parking_forbidden(),
6578                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6579            );
6580            while !(predicate)(&*self.server.store.read().await) {
6581                self.foreground.start_waiting();
6582                self.notifications.next().await;
6583                self.foreground.finish_waiting();
6584            }
6585        }
6586    }
6587
6588    impl Deref for TestServer {
6589        type Target = Server;
6590
6591        fn deref(&self) -> &Self::Target {
6592            &self.server
6593        }
6594    }
6595
6596    impl Drop for TestServer {
6597        fn drop(&mut self) {
6598            self.peer.reset();
6599        }
6600    }
6601
6602    struct TestClient {
6603        client: Arc<Client>,
6604        username: String,
6605        pub peer_id: PeerId,
6606        pub user_store: ModelHandle<UserStore>,
6607        language_registry: Arc<LanguageRegistry>,
6608        project: Option<ModelHandle<Project>>,
6609        buffers: HashSet<ModelHandle<language::Buffer>>,
6610    }
6611
6612    impl Deref for TestClient {
6613        type Target = Arc<Client>;
6614
6615        fn deref(&self) -> &Self::Target {
6616            &self.client
6617        }
6618    }
6619
6620    struct ContactsSummary {
6621        pub current: Vec<String>,
6622        pub outgoing_requests: Vec<String>,
6623        pub incoming_requests: Vec<String>,
6624    }
6625
6626    impl TestClient {
6627        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6628            UserId::from_proto(
6629                self.user_store
6630                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6631            )
6632        }
6633
6634        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6635            let mut authed_user = self
6636                .user_store
6637                .read_with(cx, |user_store, _| user_store.watch_current_user());
6638            while authed_user.next().await.unwrap().is_none() {}
6639        }
6640
6641        async fn clear_contacts(&self, cx: &mut TestAppContext) {
6642            self.user_store
6643                .update(cx, |store, _| store.clear_contacts())
6644                .await;
6645        }
6646
6647        fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6648            self.user_store.read_with(cx, |store, _| ContactsSummary {
6649                current: store
6650                    .contacts()
6651                    .iter()
6652                    .map(|contact| contact.user.github_login.clone())
6653                    .collect(),
6654                outgoing_requests: store
6655                    .outgoing_contact_requests()
6656                    .iter()
6657                    .map(|user| user.github_login.clone())
6658                    .collect(),
6659                incoming_requests: store
6660                    .incoming_contact_requests()
6661                    .iter()
6662                    .map(|user| user.github_login.clone())
6663                    .collect(),
6664            })
6665        }
6666
6667        async fn build_local_project(
6668            &mut self,
6669            fs: Arc<FakeFs>,
6670            root_path: impl AsRef<Path>,
6671            cx: &mut TestAppContext,
6672        ) -> (ModelHandle<Project>, WorktreeId) {
6673            let project = cx.update(|cx| {
6674                Project::local(
6675                    self.client.clone(),
6676                    self.user_store.clone(),
6677                    self.language_registry.clone(),
6678                    fs,
6679                    cx,
6680                )
6681            });
6682            self.project = Some(project.clone());
6683            let (worktree, _) = project
6684                .update(cx, |p, cx| {
6685                    p.find_or_create_local_worktree(root_path, true, cx)
6686                })
6687                .await
6688                .unwrap();
6689            worktree
6690                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6691                .await;
6692            project
6693                .update(cx, |project, _| project.next_remote_id())
6694                .await;
6695            (project, worktree.read_with(cx, |tree, _| tree.id()))
6696        }
6697
6698        async fn build_remote_project(
6699            &mut self,
6700            project_id: u64,
6701            cx: &mut TestAppContext,
6702        ) -> ModelHandle<Project> {
6703            let project = Project::remote(
6704                project_id,
6705                self.client.clone(),
6706                self.user_store.clone(),
6707                self.language_registry.clone(),
6708                FakeFs::new(cx.background()),
6709                &mut cx.to_async(),
6710            )
6711            .await
6712            .unwrap();
6713            self.project = Some(project.clone());
6714            project
6715        }
6716
6717        fn build_workspace(
6718            &self,
6719            project: &ModelHandle<Project>,
6720            cx: &mut TestAppContext,
6721        ) -> ViewHandle<Workspace> {
6722            let (window_id, _) = cx.add_window(|_| EmptyView);
6723            cx.add_view(window_id, |cx| {
6724                let fs = project.read(cx).fs().clone();
6725                Workspace::new(
6726                    &WorkspaceParams {
6727                        fs,
6728                        project: project.clone(),
6729                        user_store: self.user_store.clone(),
6730                        languages: self.language_registry.clone(),
6731                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6732                        channel_list: cx.add_model(|cx| {
6733                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6734                        }),
6735                        client: self.client.clone(),
6736                    },
6737                    cx,
6738                )
6739            })
6740        }
6741
6742        async fn simulate_host(
6743            mut self,
6744            project: ModelHandle<Project>,
6745            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6746            rng: Arc<Mutex<StdRng>>,
6747            mut cx: TestAppContext,
6748        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6749            async fn simulate_host_internal(
6750                client: &mut TestClient,
6751                project: ModelHandle<Project>,
6752                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6753                rng: Arc<Mutex<StdRng>>,
6754                cx: &mut TestAppContext,
6755            ) -> anyhow::Result<()> {
6756                let fs = project.read_with(cx, |project, _| project.fs().clone());
6757
6758                while op_start_signal.next().await.is_some() {
6759                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6760                    let files = fs.as_fake().files().await;
6761                    match distribution {
6762                        0..=20 if !files.is_empty() => {
6763                            let path = files.choose(&mut *rng.lock()).unwrap();
6764                            let mut path = path.as_path();
6765                            while let Some(parent_path) = path.parent() {
6766                                path = parent_path;
6767                                if rng.lock().gen() {
6768                                    break;
6769                                }
6770                            }
6771
6772                            log::info!("Host: find/create local worktree {:?}", path);
6773                            let find_or_create_worktree = project.update(cx, |project, cx| {
6774                                project.find_or_create_local_worktree(path, true, cx)
6775                            });
6776                            if rng.lock().gen() {
6777                                cx.background().spawn(find_or_create_worktree).detach();
6778                            } else {
6779                                find_or_create_worktree.await?;
6780                            }
6781                        }
6782                        10..=80 if !files.is_empty() => {
6783                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6784                                let file = files.choose(&mut *rng.lock()).unwrap();
6785                                let (worktree, path) = project
6786                                    .update(cx, |project, cx| {
6787                                        project.find_or_create_local_worktree(
6788                                            file.clone(),
6789                                            true,
6790                                            cx,
6791                                        )
6792                                    })
6793                                    .await?;
6794                                let project_path =
6795                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6796                                log::info!(
6797                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6798                                    file,
6799                                    project_path.0,
6800                                    project_path.1
6801                                );
6802                                let buffer = project
6803                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6804                                    .await
6805                                    .unwrap();
6806                                client.buffers.insert(buffer.clone());
6807                                buffer
6808                            } else {
6809                                client
6810                                    .buffers
6811                                    .iter()
6812                                    .choose(&mut *rng.lock())
6813                                    .unwrap()
6814                                    .clone()
6815                            };
6816
6817                            if rng.lock().gen_bool(0.1) {
6818                                cx.update(|cx| {
6819                                    log::info!(
6820                                        "Host: dropping buffer {:?}",
6821                                        buffer.read(cx).file().unwrap().full_path(cx)
6822                                    );
6823                                    client.buffers.remove(&buffer);
6824                                    drop(buffer);
6825                                });
6826                            } else {
6827                                buffer.update(cx, |buffer, cx| {
6828                                    log::info!(
6829                                        "Host: updating buffer {:?} ({})",
6830                                        buffer.file().unwrap().full_path(cx),
6831                                        buffer.remote_id()
6832                                    );
6833
6834                                    if rng.lock().gen_bool(0.7) {
6835                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6836                                    } else {
6837                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6838                                    }
6839                                });
6840                            }
6841                        }
6842                        _ => loop {
6843                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6844                            let mut path = PathBuf::new();
6845                            path.push("/");
6846                            for _ in 0..path_component_count {
6847                                let letter = rng.lock().gen_range(b'a'..=b'z');
6848                                path.push(std::str::from_utf8(&[letter]).unwrap());
6849                            }
6850                            path.set_extension("rs");
6851                            let parent_path = path.parent().unwrap();
6852
6853                            log::info!("Host: creating file {:?}", path,);
6854
6855                            if fs.create_dir(&parent_path).await.is_ok()
6856                                && fs.create_file(&path, Default::default()).await.is_ok()
6857                            {
6858                                break;
6859                            } else {
6860                                log::info!("Host: cannot create file");
6861                            }
6862                        },
6863                    }
6864
6865                    cx.background().simulate_random_delay().await;
6866                }
6867
6868                Ok(())
6869            }
6870
6871            let result =
6872                simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx)
6873                    .await;
6874            log::info!("Host done");
6875            self.project = Some(project);
6876            (self, cx, result.err())
6877        }
6878
6879        pub async fn simulate_guest(
6880            mut self,
6881            guest_username: String,
6882            project: ModelHandle<Project>,
6883            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6884            rng: Arc<Mutex<StdRng>>,
6885            mut cx: TestAppContext,
6886        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6887            async fn simulate_guest_internal(
6888                client: &mut TestClient,
6889                guest_username: &str,
6890                project: ModelHandle<Project>,
6891                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6892                rng: Arc<Mutex<StdRng>>,
6893                cx: &mut TestAppContext,
6894            ) -> anyhow::Result<()> {
6895                while op_start_signal.next().await.is_some() {
6896                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6897                        let worktree = if let Some(worktree) =
6898                            project.read_with(cx, |project, cx| {
6899                                project
6900                                    .worktrees(&cx)
6901                                    .filter(|worktree| {
6902                                        let worktree = worktree.read(cx);
6903                                        worktree.is_visible()
6904                                            && worktree.entries(false).any(|e| e.is_file())
6905                                    })
6906                                    .choose(&mut *rng.lock())
6907                            }) {
6908                            worktree
6909                        } else {
6910                            cx.background().simulate_random_delay().await;
6911                            continue;
6912                        };
6913
6914                        let (worktree_root_name, project_path) =
6915                            worktree.read_with(cx, |worktree, _| {
6916                                let entry = worktree
6917                                    .entries(false)
6918                                    .filter(|e| e.is_file())
6919                                    .choose(&mut *rng.lock())
6920                                    .unwrap();
6921                                (
6922                                    worktree.root_name().to_string(),
6923                                    (worktree.id(), entry.path.clone()),
6924                                )
6925                            });
6926                        log::info!(
6927                            "{}: opening path {:?} in worktree {} ({})",
6928                            guest_username,
6929                            project_path.1,
6930                            project_path.0,
6931                            worktree_root_name,
6932                        );
6933                        let buffer = project
6934                            .update(cx, |project, cx| {
6935                                project.open_buffer(project_path.clone(), cx)
6936                            })
6937                            .await?;
6938                        log::info!(
6939                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6940                            guest_username,
6941                            project_path.1,
6942                            project_path.0,
6943                            worktree_root_name,
6944                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6945                        );
6946                        client.buffers.insert(buffer.clone());
6947                        buffer
6948                    } else {
6949                        client
6950                            .buffers
6951                            .iter()
6952                            .choose(&mut *rng.lock())
6953                            .unwrap()
6954                            .clone()
6955                    };
6956
6957                    let choice = rng.lock().gen_range(0..100);
6958                    match choice {
6959                        0..=9 => {
6960                            cx.update(|cx| {
6961                                log::info!(
6962                                    "{}: dropping buffer {:?}",
6963                                    guest_username,
6964                                    buffer.read(cx).file().unwrap().full_path(cx)
6965                                );
6966                                client.buffers.remove(&buffer);
6967                                drop(buffer);
6968                            });
6969                        }
6970                        10..=19 => {
6971                            let completions = project.update(cx, |project, cx| {
6972                                log::info!(
6973                                    "{}: requesting completions for buffer {} ({:?})",
6974                                    guest_username,
6975                                    buffer.read(cx).remote_id(),
6976                                    buffer.read(cx).file().unwrap().full_path(cx)
6977                                );
6978                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6979                                project.completions(&buffer, offset, cx)
6980                            });
6981                            let completions = cx.background().spawn(async move {
6982                                completions
6983                                    .await
6984                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6985                            });
6986                            if rng.lock().gen_bool(0.3) {
6987                                log::info!("{}: detaching completions request", guest_username);
6988                                cx.update(|cx| completions.detach_and_log_err(cx));
6989                            } else {
6990                                completions.await?;
6991                            }
6992                        }
6993                        20..=29 => {
6994                            let code_actions = project.update(cx, |project, cx| {
6995                                log::info!(
6996                                    "{}: requesting code actions for buffer {} ({:?})",
6997                                    guest_username,
6998                                    buffer.read(cx).remote_id(),
6999                                    buffer.read(cx).file().unwrap().full_path(cx)
7000                                );
7001                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
7002                                project.code_actions(&buffer, range, cx)
7003                            });
7004                            let code_actions = cx.background().spawn(async move {
7005                                code_actions.await.map_err(|err| {
7006                                    anyhow!("code actions request failed: {:?}", err)
7007                                })
7008                            });
7009                            if rng.lock().gen_bool(0.3) {
7010                                log::info!("{}: detaching code actions request", guest_username);
7011                                cx.update(|cx| code_actions.detach_and_log_err(cx));
7012                            } else {
7013                                code_actions.await?;
7014                            }
7015                        }
7016                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
7017                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
7018                                log::info!(
7019                                    "{}: saving buffer {} ({:?})",
7020                                    guest_username,
7021                                    buffer.remote_id(),
7022                                    buffer.file().unwrap().full_path(cx)
7023                                );
7024                                (buffer.version(), buffer.save(cx))
7025                            });
7026                            let save = cx.background().spawn(async move {
7027                                let (saved_version, _) = save
7028                                    .await
7029                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
7030                                assert!(saved_version.observed_all(&requested_version));
7031                                Ok::<_, anyhow::Error>(())
7032                            });
7033                            if rng.lock().gen_bool(0.3) {
7034                                log::info!("{}: detaching save request", guest_username);
7035                                cx.update(|cx| save.detach_and_log_err(cx));
7036                            } else {
7037                                save.await?;
7038                            }
7039                        }
7040                        40..=44 => {
7041                            let prepare_rename = project.update(cx, |project, cx| {
7042                                log::info!(
7043                                    "{}: preparing rename for buffer {} ({:?})",
7044                                    guest_username,
7045                                    buffer.read(cx).remote_id(),
7046                                    buffer.read(cx).file().unwrap().full_path(cx)
7047                                );
7048                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7049                                project.prepare_rename(buffer, offset, cx)
7050                            });
7051                            let prepare_rename = cx.background().spawn(async move {
7052                                prepare_rename.await.map_err(|err| {
7053                                    anyhow!("prepare rename request failed: {:?}", err)
7054                                })
7055                            });
7056                            if rng.lock().gen_bool(0.3) {
7057                                log::info!("{}: detaching prepare rename request", guest_username);
7058                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
7059                            } else {
7060                                prepare_rename.await?;
7061                            }
7062                        }
7063                        45..=49 => {
7064                            let definitions = project.update(cx, |project, cx| {
7065                                log::info!(
7066                                    "{}: requesting definitions for buffer {} ({:?})",
7067                                    guest_username,
7068                                    buffer.read(cx).remote_id(),
7069                                    buffer.read(cx).file().unwrap().full_path(cx)
7070                                );
7071                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7072                                project.definition(&buffer, offset, cx)
7073                            });
7074                            let definitions = cx.background().spawn(async move {
7075                                definitions
7076                                    .await
7077                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
7078                            });
7079                            if rng.lock().gen_bool(0.3) {
7080                                log::info!("{}: detaching definitions request", guest_username);
7081                                cx.update(|cx| definitions.detach_and_log_err(cx));
7082                            } else {
7083                                client
7084                                    .buffers
7085                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
7086                            }
7087                        }
7088                        50..=54 => {
7089                            let highlights = project.update(cx, |project, cx| {
7090                                log::info!(
7091                                    "{}: requesting highlights for buffer {} ({:?})",
7092                                    guest_username,
7093                                    buffer.read(cx).remote_id(),
7094                                    buffer.read(cx).file().unwrap().full_path(cx)
7095                                );
7096                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
7097                                project.document_highlights(&buffer, offset, cx)
7098                            });
7099                            let highlights = cx.background().spawn(async move {
7100                                highlights
7101                                    .await
7102                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
7103                            });
7104                            if rng.lock().gen_bool(0.3) {
7105                                log::info!("{}: detaching highlights request", guest_username);
7106                                cx.update(|cx| highlights.detach_and_log_err(cx));
7107                            } else {
7108                                highlights.await?;
7109                            }
7110                        }
7111                        55..=59 => {
7112                            let search = project.update(cx, |project, cx| {
7113                                let query = rng.lock().gen_range('a'..='z');
7114                                log::info!("{}: project-wide search {:?}", guest_username, query);
7115                                project.search(SearchQuery::text(query, false, false), cx)
7116                            });
7117                            let search = cx.background().spawn(async move {
7118                                search
7119                                    .await
7120                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
7121                            });
7122                            if rng.lock().gen_bool(0.3) {
7123                                log::info!("{}: detaching search request", guest_username);
7124                                cx.update(|cx| search.detach_and_log_err(cx));
7125                            } else {
7126                                client.buffers.extend(search.await?.into_keys());
7127                            }
7128                        }
7129                        60..=69 => {
7130                            let worktree = project
7131                                .read_with(cx, |project, cx| {
7132                                    project
7133                                        .worktrees(&cx)
7134                                        .filter(|worktree| {
7135                                            let worktree = worktree.read(cx);
7136                                            worktree.is_visible()
7137                                                && worktree.entries(false).any(|e| e.is_file())
7138                                                && worktree
7139                                                    .root_entry()
7140                                                    .map_or(false, |e| e.is_dir())
7141                                        })
7142                                        .choose(&mut *rng.lock())
7143                                })
7144                                .unwrap();
7145                            let (worktree_id, worktree_root_name) = worktree
7146                                .read_with(cx, |worktree, _| {
7147                                    (worktree.id(), worktree.root_name().to_string())
7148                                });
7149
7150                            let mut new_name = String::new();
7151                            for _ in 0..10 {
7152                                let letter = rng.lock().gen_range('a'..='z');
7153                                new_name.push(letter);
7154                            }
7155                            let mut new_path = PathBuf::new();
7156                            new_path.push(new_name);
7157                            new_path.set_extension("rs");
7158                            log::info!(
7159                                "{}: creating {:?} in worktree {} ({})",
7160                                guest_username,
7161                                new_path,
7162                                worktree_id,
7163                                worktree_root_name,
7164                            );
7165                            project
7166                                .update(cx, |project, cx| {
7167                                    project.create_entry((worktree_id, new_path), false, cx)
7168                                })
7169                                .unwrap()
7170                                .await?;
7171                        }
7172                        _ => {
7173                            buffer.update(cx, |buffer, cx| {
7174                                log::info!(
7175                                    "{}: updating buffer {} ({:?})",
7176                                    guest_username,
7177                                    buffer.remote_id(),
7178                                    buffer.file().unwrap().full_path(cx)
7179                                );
7180                                if rng.lock().gen_bool(0.7) {
7181                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7182                                } else {
7183                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7184                                }
7185                            });
7186                        }
7187                    }
7188                    cx.background().simulate_random_delay().await;
7189                }
7190                Ok(())
7191            }
7192
7193            let result = simulate_guest_internal(
7194                &mut self,
7195                &guest_username,
7196                project.clone(),
7197                op_start_signal,
7198                rng,
7199                &mut cx,
7200            )
7201            .await;
7202            log::info!("{}: done", guest_username);
7203
7204            self.project = Some(project);
7205            (self, cx, result.err())
7206        }
7207    }
7208
7209    impl Drop for TestClient {
7210        fn drop(&mut self) {
7211            self.client.tear_down();
7212        }
7213    }
7214
7215    impl Executor for Arc<gpui::executor::Background> {
7216        type Sleep = gpui::executor::Timer;
7217
7218        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7219            self.spawn(future).detach();
7220        }
7221
7222        fn sleep(&self, duration: Duration) -> Self::Sleep {
7223            self.as_ref().timer(duration)
7224        }
7225    }
7226
7227    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7228        channel
7229            .messages()
7230            .cursor::<()>()
7231            .map(|m| {
7232                (
7233                    m.sender.github_login.clone(),
7234                    m.body.clone(),
7235                    m.is_pending(),
7236                )
7237            })
7238            .collect()
7239    }
7240
7241    struct EmptyView;
7242
7243    impl gpui::Entity for EmptyView {
7244        type Event = ();
7245    }
7246
7247    impl gpui::View for EmptyView {
7248        fn ui_name() -> &'static str {
7249            "empty view"
7250        }
7251
7252        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7253            gpui::Element::boxed(gpui::elements::Empty::new())
7254        }
7255    }
7256}