rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::{IntoResponse, Response},
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::{HashMap, HashSet};
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::Arc,
  40    time::Duration,
  41};
  42use store::{Store, Worktree};
  43use time::OffsetDateTime;
  44use tokio::{
  45    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  46    time::Sleep,
  47};
  48use tower::ServiceBuilder;
  49use tracing::{info_span, instrument, Instrument};
  50
  51type MessageHandler =
  52    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  53
  54pub struct Server {
  55    peer: Arc<Peer>,
  56    store: RwLock<Store>,
  57    app_state: Arc<AppState>,
  58    handlers: HashMap<TypeId, MessageHandler>,
  59    notifications: Option<mpsc::UnboundedSender<()>>,
  60}
  61
  62pub trait Executor: Send + Clone {
  63    type Sleep: Send + Future;
  64    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  65    fn sleep(&self, duration: Duration) -> Self::Sleep;
  66}
  67
  68#[derive(Clone)]
  69pub struct RealExecutor;
  70
  71const MESSAGE_COUNT_PER_PAGE: usize = 100;
  72const MAX_MESSAGE_LEN: usize = 1024;
  73
  74struct StoreReadGuard<'a> {
  75    guard: RwLockReadGuard<'a, Store>,
  76    _not_send: PhantomData<Rc<()>>,
  77}
  78
  79struct StoreWriteGuard<'a> {
  80    guard: RwLockWriteGuard<'a, Store>,
  81    _not_send: PhantomData<Rc<()>>,
  82}
  83
  84impl Server {
  85    pub fn new(
  86        app_state: Arc<AppState>,
  87        notifications: Option<mpsc::UnboundedSender<()>>,
  88    ) -> Arc<Self> {
  89        let mut server = Self {
  90            peer: Peer::new(),
  91            app_state,
  92            store: Default::default(),
  93            handlers: Default::default(),
  94            notifications,
  95        };
  96
  97        server
  98            .add_request_handler(Server::ping)
  99            .add_request_handler(Server::register_project)
 100            .add_message_handler(Server::unregister_project)
 101            .add_request_handler(Server::share_project)
 102            .add_message_handler(Server::unshare_project)
 103            .add_sync_request_handler(Server::join_project)
 104            .add_message_handler(Server::leave_project)
 105            .add_request_handler(Server::register_worktree)
 106            .add_message_handler(Server::unregister_worktree)
 107            .add_request_handler(Server::update_worktree)
 108            .add_message_handler(Server::start_language_server)
 109            .add_message_handler(Server::update_language_server)
 110            .add_message_handler(Server::update_diagnostic_summary)
 111            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 112            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 113            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 114            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 115            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 116            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 117            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 118            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 119            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 120            .add_request_handler(
 121                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 122            )
 123            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 124            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 125            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 126            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 127            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 128            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 129            .add_request_handler(Server::update_buffer)
 130            .add_message_handler(Server::update_buffer_file)
 131            .add_message_handler(Server::buffer_reloaded)
 132            .add_message_handler(Server::buffer_saved)
 133            .add_request_handler(Server::save_buffer)
 134            .add_request_handler(Server::get_channels)
 135            .add_request_handler(Server::get_users)
 136            .add_request_handler(Server::join_channel)
 137            .add_message_handler(Server::leave_channel)
 138            .add_request_handler(Server::send_channel_message)
 139            .add_request_handler(Server::follow)
 140            .add_message_handler(Server::unfollow)
 141            .add_message_handler(Server::update_followers)
 142            .add_request_handler(Server::get_channel_messages);
 143
 144        Arc::new(server)
 145    }
 146
 147    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 148    where
 149        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 150        Fut: 'static + Send + Future<Output = Result<()>>,
 151        M: EnvelopedMessage,
 152    {
 153        let prev_handler = self.handlers.insert(
 154            TypeId::of::<M>(),
 155            Box::new(move |server, envelope| {
 156                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 157                let span = info_span!(
 158                    "handle message",
 159                    payload_type = envelope.payload_type_name(),
 160                    payload = serde_json::to_string_pretty(&envelope.payload)
 161                        .unwrap()
 162                        .as_str(),
 163                );
 164                let future = (handler)(server, *envelope);
 165                async move {
 166                    if let Err(error) = future.await {
 167                        tracing::error!(%error, "error handling message");
 168                    }
 169                }
 170                .instrument(span)
 171                .boxed()
 172            }),
 173        );
 174        if prev_handler.is_some() {
 175            panic!("registered a handler for the same message twice");
 176        }
 177        self
 178    }
 179
 180    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 181    where
 182        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 183        Fut: 'static + Send + Future<Output = Result<M::Response>>,
 184        M: RequestMessage,
 185    {
 186        self.add_message_handler(move |server, envelope| {
 187            let receipt = envelope.receipt();
 188            let response = (handler)(server.clone(), envelope);
 189            async move {
 190                match response.await {
 191                    Ok(response) => {
 192                        server.peer.respond(receipt, response)?;
 193                        Ok(())
 194                    }
 195                    Err(error) => {
 196                        server.peer.respond_with_error(
 197                            receipt,
 198                            proto::Error {
 199                                message: error.to_string(),
 200                            },
 201                        )?;
 202                        Err(error)
 203                    }
 204                }
 205            }
 206        })
 207    }
 208
 209    /// Handle a request while holding a lock to the store. This is useful when we're registering
 210    /// a connection but we want to respond on the connection before anybody else can send on it.
 211    fn add_sync_request_handler<F, M>(&mut self, handler: F) -> &mut Self
 212    where
 213        F: 'static
 214            + Send
 215            + Sync
 216            + Fn(Arc<Self>, &mut Store, TypedEnvelope<M>) -> Result<M::Response>,
 217        M: RequestMessage,
 218    {
 219        let handler = Arc::new(handler);
 220        self.add_message_handler(move |server, envelope| {
 221            let receipt = envelope.receipt();
 222            let handler = handler.clone();
 223            async move {
 224                let mut store = server.state_mut().await;
 225                let response = (handler)(server.clone(), &mut *store, envelope);
 226                match response {
 227                    Ok(response) => {
 228                        server.peer.respond(receipt, response)?;
 229                        Ok(())
 230                    }
 231                    Err(error) => {
 232                        server.peer.respond_with_error(
 233                            receipt,
 234                            proto::Error {
 235                                message: error.to_string(),
 236                            },
 237                        )?;
 238                        Err(error)
 239                    }
 240                }
 241            }
 242        })
 243    }
 244
 245    pub fn handle_connection<E: Executor>(
 246        self: &Arc<Self>,
 247        connection: Connection,
 248        address: String,
 249        user_id: UserId,
 250        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 251        executor: E,
 252    ) -> impl Future<Output = ()> {
 253        let mut this = self.clone();
 254        let span = info_span!("handle connection", %user_id, %address);
 255        async move {
 256            let (connection_id, handle_io, mut incoming_rx) = this
 257                .peer
 258                .add_connection(connection, {
 259                    let executor = executor.clone();
 260                    move |duration| {
 261                        let timer = executor.sleep(duration);
 262                        async move {
 263                            timer.await;
 264                        }
 265                    }
 266                })
 267                .await;
 268
 269            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 270
 271            if let Some(send_connection_id) = send_connection_id.as_mut() {
 272                let _ = send_connection_id.send(connection_id).await;
 273            }
 274
 275            {
 276                let mut state = this.state_mut().await;
 277                state.add_connection(connection_id, user_id);
 278                this.update_contacts_for_users(&*state, &[user_id]);
 279            }
 280
 281            let handle_io = handle_io.fuse();
 282            futures::pin_mut!(handle_io);
 283            loop {
 284                let next_message = incoming_rx.next().fuse();
 285                futures::pin_mut!(next_message);
 286                futures::select_biased! {
 287                    result = handle_io => {
 288                        if let Err(error) = result {
 289                            tracing::error!(%error, "error handling I/O");
 290                        }
 291                        break;
 292                    }
 293                    message = next_message => {
 294                        if let Some(message) = message {
 295                            let type_name = message.payload_type_name();
 296                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 297                            async {
 298                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 299                                    let notifications = this.notifications.clone();
 300                                    let is_background = message.is_background();
 301                                    let handle_message = (handler)(this.clone(), message);
 302                                    let handle_message = async move {
 303                                        handle_message.await;
 304                                        if let Some(mut notifications) = notifications {
 305                                            let _ = notifications.send(()).await;
 306                                        }
 307                                    };
 308                                    if is_background {
 309                                        executor.spawn_detached(handle_message);
 310                                    } else {
 311                                        handle_message.await;
 312                                    }
 313                                } else {
 314                                    tracing::error!("no message handler");
 315                                }
 316                            }.instrument(span).await;
 317                        } else {
 318                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 319                            break;
 320                        }
 321                    }
 322                }
 323            }
 324
 325            if let Err(error) = this.sign_out(connection_id).await {
 326                tracing::error!(%error, "error signing out");
 327            }
 328        }.instrument(span)
 329    }
 330
 331    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 332        self.peer.disconnect(connection_id);
 333        let mut state = self.state_mut().await;
 334        let removed_connection = state.remove_connection(connection_id)?;
 335
 336        for (project_id, project) in removed_connection.hosted_projects {
 337            if let Some(share) = project.share {
 338                broadcast(
 339                    connection_id,
 340                    share.guests.keys().copied().collect(),
 341                    |conn_id| {
 342                        self.peer
 343                            .send(conn_id, proto::UnshareProject { project_id })
 344                    },
 345                );
 346            }
 347        }
 348
 349        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 350            broadcast(connection_id, peer_ids, |conn_id| {
 351                self.peer.send(
 352                    conn_id,
 353                    proto::RemoveProjectCollaborator {
 354                        project_id,
 355                        peer_id: connection_id.0,
 356                    },
 357                )
 358            });
 359        }
 360
 361        self.update_contacts_for_users(&*state, removed_connection.contact_ids.iter());
 362        Ok(())
 363    }
 364
 365    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> Result<proto::Ack> {
 366        Ok(proto::Ack {})
 367    }
 368
 369    async fn register_project(
 370        self: Arc<Server>,
 371        request: TypedEnvelope<proto::RegisterProject>,
 372    ) -> Result<proto::RegisterProjectResponse> {
 373        let project_id = {
 374            let mut state = self.state_mut().await;
 375            let user_id = state.user_id_for_connection(request.sender_id)?;
 376            state.register_project(request.sender_id, user_id)
 377        };
 378        Ok(proto::RegisterProjectResponse { project_id })
 379    }
 380
 381    async fn unregister_project(
 382        self: Arc<Server>,
 383        request: TypedEnvelope<proto::UnregisterProject>,
 384    ) -> Result<()> {
 385        let mut state = self.state_mut().await;
 386        let project = state.unregister_project(request.payload.project_id, request.sender_id)?;
 387        self.update_contacts_for_users(&*state, &project.authorized_user_ids());
 388        Ok(())
 389    }
 390
 391    async fn share_project(
 392        self: Arc<Server>,
 393        request: TypedEnvelope<proto::ShareProject>,
 394    ) -> Result<proto::Ack> {
 395        let mut state = self.state_mut().await;
 396        let project = state.share_project(request.payload.project_id, request.sender_id)?;
 397        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 398        Ok(proto::Ack {})
 399    }
 400
 401    async fn unshare_project(
 402        self: Arc<Server>,
 403        request: TypedEnvelope<proto::UnshareProject>,
 404    ) -> Result<()> {
 405        let project_id = request.payload.project_id;
 406        let mut state = self.state_mut().await;
 407        let project = state.unshare_project(project_id, request.sender_id)?;
 408        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 409            self.peer
 410                .send(conn_id, proto::UnshareProject { project_id })
 411        });
 412        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 413        Ok(())
 414    }
 415
 416    fn join_project(
 417        self: Arc<Server>,
 418        state: &mut Store,
 419        request: TypedEnvelope<proto::JoinProject>,
 420    ) -> Result<proto::JoinProjectResponse> {
 421        let project_id = request.payload.project_id;
 422
 423        let user_id = state.user_id_for_connection(request.sender_id)?;
 424        let (response, connection_ids, contact_user_ids) = state
 425            .join_project(request.sender_id, user_id, project_id)
 426            .and_then(|joined| {
 427                let share = joined.project.share()?;
 428                let peer_count = share.guests.len();
 429                let mut collaborators = Vec::with_capacity(peer_count);
 430                collaborators.push(proto::Collaborator {
 431                    peer_id: joined.project.host_connection_id.0,
 432                    replica_id: 0,
 433                    user_id: joined.project.host_user_id.to_proto(),
 434                });
 435                let worktrees = share
 436                    .worktrees
 437                    .iter()
 438                    .filter_map(|(id, shared_worktree)| {
 439                        let worktree = joined.project.worktrees.get(&id)?;
 440                        Some(proto::Worktree {
 441                            id: *id,
 442                            root_name: worktree.root_name.clone(),
 443                            entries: shared_worktree.entries.values().cloned().collect(),
 444                            diagnostic_summaries: shared_worktree
 445                                .diagnostic_summaries
 446                                .values()
 447                                .cloned()
 448                                .collect(),
 449                            visible: worktree.visible,
 450                        })
 451                    })
 452                    .collect();
 453                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 454                    if *peer_conn_id != request.sender_id {
 455                        collaborators.push(proto::Collaborator {
 456                            peer_id: peer_conn_id.0,
 457                            replica_id: *peer_replica_id as u32,
 458                            user_id: peer_user_id.to_proto(),
 459                        });
 460                    }
 461                }
 462                let response = proto::JoinProjectResponse {
 463                    worktrees,
 464                    replica_id: joined.replica_id as u32,
 465                    collaborators,
 466                    language_servers: joined.project.language_servers.clone(),
 467                };
 468                let connection_ids = joined.project.connection_ids();
 469                let contact_user_ids = joined.project.authorized_user_ids();
 470                Ok((response, connection_ids, contact_user_ids))
 471            })?;
 472
 473        broadcast(request.sender_id, connection_ids, |conn_id| {
 474            self.peer.send(
 475                conn_id,
 476                proto::AddProjectCollaborator {
 477                    project_id,
 478                    collaborator: Some(proto::Collaborator {
 479                        peer_id: request.sender_id.0,
 480                        replica_id: response.replica_id,
 481                        user_id: user_id.to_proto(),
 482                    }),
 483                },
 484            )
 485        });
 486        self.update_contacts_for_users(state, &contact_user_ids);
 487        Ok(response)
 488    }
 489
 490    async fn leave_project(
 491        self: Arc<Server>,
 492        request: TypedEnvelope<proto::LeaveProject>,
 493    ) -> Result<()> {
 494        let sender_id = request.sender_id;
 495        let project_id = request.payload.project_id;
 496        let mut state = self.state_mut().await;
 497        let worktree = state.leave_project(sender_id, project_id)?;
 498        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 499            self.peer.send(
 500                conn_id,
 501                proto::RemoveProjectCollaborator {
 502                    project_id,
 503                    peer_id: sender_id.0,
 504                },
 505            )
 506        });
 507        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 508        Ok(())
 509    }
 510
 511    async fn register_worktree(
 512        self: Arc<Server>,
 513        request: TypedEnvelope<proto::RegisterWorktree>,
 514    ) -> Result<proto::Ack> {
 515        let mut contact_user_ids = HashSet::default();
 516        for github_login in &request.payload.authorized_logins {
 517            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 518            contact_user_ids.insert(contact_user_id);
 519        }
 520
 521        let mut state = self.state_mut().await;
 522        let host_user_id = state.user_id_for_connection(request.sender_id)?;
 523        contact_user_ids.insert(host_user_id);
 524
 525        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 526        let guest_connection_ids = state
 527            .read_project(request.payload.project_id, request.sender_id)?
 528            .guest_connection_ids();
 529        state.register_worktree(
 530            request.payload.project_id,
 531            request.payload.worktree_id,
 532            request.sender_id,
 533            Worktree {
 534                authorized_user_ids: contact_user_ids.clone(),
 535                root_name: request.payload.root_name.clone(),
 536                visible: request.payload.visible,
 537            },
 538        )?;
 539
 540        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 541            self.peer
 542                .forward_send(request.sender_id, connection_id, request.payload.clone())
 543        });
 544        self.update_contacts_for_users(&*state, &contact_user_ids);
 545        Ok(proto::Ack {})
 546    }
 547
 548    async fn unregister_worktree(
 549        self: Arc<Server>,
 550        request: TypedEnvelope<proto::UnregisterWorktree>,
 551    ) -> Result<()> {
 552        let project_id = request.payload.project_id;
 553        let worktree_id = request.payload.worktree_id;
 554        let mut state = self.state_mut().await;
 555        let (worktree, guest_connection_ids) =
 556            state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 557        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 558            self.peer.send(
 559                conn_id,
 560                proto::UnregisterWorktree {
 561                    project_id,
 562                    worktree_id,
 563                },
 564            )
 565        });
 566        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 567        Ok(())
 568    }
 569
 570    async fn update_worktree(
 571        self: Arc<Server>,
 572        request: TypedEnvelope<proto::UpdateWorktree>,
 573    ) -> Result<proto::Ack> {
 574        let connection_ids = self.state_mut().await.update_worktree(
 575            request.sender_id,
 576            request.payload.project_id,
 577            request.payload.worktree_id,
 578            &request.payload.removed_entries,
 579            &request.payload.updated_entries,
 580        )?;
 581
 582        broadcast(request.sender_id, connection_ids, |connection_id| {
 583            self.peer
 584                .forward_send(request.sender_id, connection_id, request.payload.clone())
 585        });
 586
 587        Ok(proto::Ack {})
 588    }
 589
 590    async fn update_diagnostic_summary(
 591        self: Arc<Server>,
 592        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 593    ) -> Result<()> {
 594        let summary = request
 595            .payload
 596            .summary
 597            .clone()
 598            .ok_or_else(|| anyhow!("invalid summary"))?;
 599        let receiver_ids = self.state_mut().await.update_diagnostic_summary(
 600            request.payload.project_id,
 601            request.payload.worktree_id,
 602            request.sender_id,
 603            summary,
 604        )?;
 605
 606        broadcast(request.sender_id, receiver_ids, |connection_id| {
 607            self.peer
 608                .forward_send(request.sender_id, connection_id, request.payload.clone())
 609        });
 610        Ok(())
 611    }
 612
 613    async fn start_language_server(
 614        self: Arc<Server>,
 615        request: TypedEnvelope<proto::StartLanguageServer>,
 616    ) -> Result<()> {
 617        let receiver_ids = self.state_mut().await.start_language_server(
 618            request.payload.project_id,
 619            request.sender_id,
 620            request
 621                .payload
 622                .server
 623                .clone()
 624                .ok_or_else(|| anyhow!("invalid language server"))?,
 625        )?;
 626        broadcast(request.sender_id, receiver_ids, |connection_id| {
 627            self.peer
 628                .forward_send(request.sender_id, connection_id, request.payload.clone())
 629        });
 630        Ok(())
 631    }
 632
 633    async fn update_language_server(
 634        self: Arc<Server>,
 635        request: TypedEnvelope<proto::UpdateLanguageServer>,
 636    ) -> Result<()> {
 637        let receiver_ids = self
 638            .state()
 639            .await
 640            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 641        broadcast(request.sender_id, receiver_ids, |connection_id| {
 642            self.peer
 643                .forward_send(request.sender_id, connection_id, request.payload.clone())
 644        });
 645        Ok(())
 646    }
 647
 648    async fn forward_project_request<T>(
 649        self: Arc<Server>,
 650        request: TypedEnvelope<T>,
 651    ) -> Result<T::Response>
 652    where
 653        T: EntityMessage + RequestMessage,
 654    {
 655        let host_connection_id = self
 656            .state()
 657            .await
 658            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 659            .host_connection_id;
 660        Ok(self
 661            .peer
 662            .forward_request(request.sender_id, host_connection_id, request.payload)
 663            .await?)
 664    }
 665
 666    async fn save_buffer(
 667        self: Arc<Server>,
 668        request: TypedEnvelope<proto::SaveBuffer>,
 669    ) -> Result<proto::BufferSaved> {
 670        let host = self
 671            .state()
 672            .await
 673            .read_project(request.payload.project_id, request.sender_id)?
 674            .host_connection_id;
 675        let response = self
 676            .peer
 677            .forward_request(request.sender_id, host, request.payload.clone())
 678            .await?;
 679
 680        let mut guests = self
 681            .state()
 682            .await
 683            .read_project(request.payload.project_id, request.sender_id)?
 684            .connection_ids();
 685        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 686        broadcast(host, guests, |conn_id| {
 687            self.peer.forward_send(host, conn_id, response.clone())
 688        });
 689
 690        Ok(response)
 691    }
 692
 693    async fn update_buffer(
 694        self: Arc<Server>,
 695        request: TypedEnvelope<proto::UpdateBuffer>,
 696    ) -> Result<proto::Ack> {
 697        let receiver_ids = self
 698            .state()
 699            .await
 700            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 701        broadcast(request.sender_id, receiver_ids, |connection_id| {
 702            self.peer
 703                .forward_send(request.sender_id, connection_id, request.payload.clone())
 704        });
 705        Ok(proto::Ack {})
 706    }
 707
 708    async fn update_buffer_file(
 709        self: Arc<Server>,
 710        request: TypedEnvelope<proto::UpdateBufferFile>,
 711    ) -> Result<()> {
 712        let receiver_ids = self
 713            .state()
 714            .await
 715            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 716        broadcast(request.sender_id, receiver_ids, |connection_id| {
 717            self.peer
 718                .forward_send(request.sender_id, connection_id, request.payload.clone())
 719        });
 720        Ok(())
 721    }
 722
 723    async fn buffer_reloaded(
 724        self: Arc<Server>,
 725        request: TypedEnvelope<proto::BufferReloaded>,
 726    ) -> Result<()> {
 727        let receiver_ids = self
 728            .state()
 729            .await
 730            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 731        broadcast(request.sender_id, receiver_ids, |connection_id| {
 732            self.peer
 733                .forward_send(request.sender_id, connection_id, request.payload.clone())
 734        });
 735        Ok(())
 736    }
 737
 738    async fn buffer_saved(
 739        self: Arc<Server>,
 740        request: TypedEnvelope<proto::BufferSaved>,
 741    ) -> Result<()> {
 742        let receiver_ids = self
 743            .state()
 744            .await
 745            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 746        broadcast(request.sender_id, receiver_ids, |connection_id| {
 747            self.peer
 748                .forward_send(request.sender_id, connection_id, request.payload.clone())
 749        });
 750        Ok(())
 751    }
 752
 753    async fn follow(
 754        self: Arc<Self>,
 755        request: TypedEnvelope<proto::Follow>,
 756    ) -> Result<proto::FollowResponse> {
 757        let leader_id = ConnectionId(request.payload.leader_id);
 758        let follower_id = request.sender_id;
 759        if !self
 760            .state()
 761            .await
 762            .project_connection_ids(request.payload.project_id, follower_id)?
 763            .contains(&leader_id)
 764        {
 765            Err(anyhow!("no such peer"))?;
 766        }
 767        let mut response = self
 768            .peer
 769            .forward_request(request.sender_id, leader_id, request.payload)
 770            .await?;
 771        response
 772            .views
 773            .retain(|view| view.leader_id != Some(follower_id.0));
 774        Ok(response)
 775    }
 776
 777    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 778        let leader_id = ConnectionId(request.payload.leader_id);
 779        if !self
 780            .state()
 781            .await
 782            .project_connection_ids(request.payload.project_id, request.sender_id)?
 783            .contains(&leader_id)
 784        {
 785            Err(anyhow!("no such peer"))?;
 786        }
 787        self.peer
 788            .forward_send(request.sender_id, leader_id, request.payload)?;
 789        Ok(())
 790    }
 791
 792    async fn update_followers(
 793        self: Arc<Self>,
 794        request: TypedEnvelope<proto::UpdateFollowers>,
 795    ) -> Result<()> {
 796        let connection_ids = self
 797            .state()
 798            .await
 799            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 800        let leader_id = request
 801            .payload
 802            .variant
 803            .as_ref()
 804            .and_then(|variant| match variant {
 805                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 806                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 807                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 808            });
 809        for follower_id in &request.payload.follower_ids {
 810            let follower_id = ConnectionId(*follower_id);
 811            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 812                self.peer
 813                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 814            }
 815        }
 816        Ok(())
 817    }
 818
 819    async fn get_channels(
 820        self: Arc<Server>,
 821        request: TypedEnvelope<proto::GetChannels>,
 822    ) -> Result<proto::GetChannelsResponse> {
 823        let user_id = self
 824            .state()
 825            .await
 826            .user_id_for_connection(request.sender_id)?;
 827        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 828        Ok(proto::GetChannelsResponse {
 829            channels: channels
 830                .into_iter()
 831                .map(|chan| proto::Channel {
 832                    id: chan.id.to_proto(),
 833                    name: chan.name,
 834                })
 835                .collect(),
 836        })
 837    }
 838
 839    async fn get_users(
 840        self: Arc<Server>,
 841        request: TypedEnvelope<proto::GetUsers>,
 842    ) -> Result<proto::GetUsersResponse> {
 843        let user_ids = request
 844            .payload
 845            .user_ids
 846            .into_iter()
 847            .map(UserId::from_proto)
 848            .collect();
 849        let users = self
 850            .app_state
 851            .db
 852            .get_users_by_ids(user_ids)
 853            .await?
 854            .into_iter()
 855            .map(|user| proto::User {
 856                id: user.id.to_proto(),
 857                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 858                github_login: user.github_login,
 859            })
 860            .collect();
 861        Ok(proto::GetUsersResponse { users })
 862    }
 863
 864    #[instrument(skip(self, state, user_ids))]
 865    fn update_contacts_for_users<'a>(
 866        self: &Arc<Self>,
 867        state: &Store,
 868        user_ids: impl IntoIterator<Item = &'a UserId>,
 869    ) {
 870        for user_id in user_ids {
 871            let contacts = state.contacts_for_user(*user_id);
 872            for connection_id in state.connection_ids_for_user(*user_id) {
 873                self.peer
 874                    .send(
 875                        connection_id,
 876                        proto::UpdateContacts {
 877                            contacts: contacts.clone(),
 878                        },
 879                    )
 880                    .trace_err();
 881            }
 882        }
 883    }
 884
 885    async fn join_channel(
 886        self: Arc<Self>,
 887        request: TypedEnvelope<proto::JoinChannel>,
 888    ) -> Result<proto::JoinChannelResponse> {
 889        let user_id = self
 890            .state()
 891            .await
 892            .user_id_for_connection(request.sender_id)?;
 893        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 894        if !self
 895            .app_state
 896            .db
 897            .can_user_access_channel(user_id, channel_id)
 898            .await?
 899        {
 900            Err(anyhow!("access denied"))?;
 901        }
 902
 903        self.state_mut()
 904            .await
 905            .join_channel(request.sender_id, channel_id);
 906        let messages = self
 907            .app_state
 908            .db
 909            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 910            .await?
 911            .into_iter()
 912            .map(|msg| proto::ChannelMessage {
 913                id: msg.id.to_proto(),
 914                body: msg.body,
 915                timestamp: msg.sent_at.unix_timestamp() as u64,
 916                sender_id: msg.sender_id.to_proto(),
 917                nonce: Some(msg.nonce.as_u128().into()),
 918            })
 919            .collect::<Vec<_>>();
 920        Ok(proto::JoinChannelResponse {
 921            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 922            messages,
 923        })
 924    }
 925
 926    async fn leave_channel(
 927        self: Arc<Self>,
 928        request: TypedEnvelope<proto::LeaveChannel>,
 929    ) -> Result<()> {
 930        let user_id = self
 931            .state()
 932            .await
 933            .user_id_for_connection(request.sender_id)?;
 934        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 935        if !self
 936            .app_state
 937            .db
 938            .can_user_access_channel(user_id, channel_id)
 939            .await?
 940        {
 941            Err(anyhow!("access denied"))?;
 942        }
 943
 944        self.state_mut()
 945            .await
 946            .leave_channel(request.sender_id, channel_id);
 947
 948        Ok(())
 949    }
 950
 951    async fn send_channel_message(
 952        self: Arc<Self>,
 953        request: TypedEnvelope<proto::SendChannelMessage>,
 954    ) -> Result<proto::SendChannelMessageResponse> {
 955        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 956        let user_id;
 957        let connection_ids;
 958        {
 959            let state = self.state().await;
 960            user_id = state.user_id_for_connection(request.sender_id)?;
 961            connection_ids = state.channel_connection_ids(channel_id)?;
 962        }
 963
 964        // Validate the message body.
 965        let body = request.payload.body.trim().to_string();
 966        if body.len() > MAX_MESSAGE_LEN {
 967            return Err(anyhow!("message is too long"))?;
 968        }
 969        if body.is_empty() {
 970            return Err(anyhow!("message can't be blank"))?;
 971        }
 972
 973        let timestamp = OffsetDateTime::now_utc();
 974        let nonce = request
 975            .payload
 976            .nonce
 977            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 978
 979        let message_id = self
 980            .app_state
 981            .db
 982            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 983            .await?
 984            .to_proto();
 985        let message = proto::ChannelMessage {
 986            sender_id: user_id.to_proto(),
 987            id: message_id,
 988            body,
 989            timestamp: timestamp.unix_timestamp() as u64,
 990            nonce: Some(nonce),
 991        };
 992        broadcast(request.sender_id, connection_ids, |conn_id| {
 993            self.peer.send(
 994                conn_id,
 995                proto::ChannelMessageSent {
 996                    channel_id: channel_id.to_proto(),
 997                    message: Some(message.clone()),
 998                },
 999            )
1000        });
1001        Ok(proto::SendChannelMessageResponse {
1002            message: Some(message),
1003        })
1004    }
1005
1006    async fn get_channel_messages(
1007        self: Arc<Self>,
1008        request: TypedEnvelope<proto::GetChannelMessages>,
1009    ) -> Result<proto::GetChannelMessagesResponse> {
1010        let user_id = self
1011            .state()
1012            .await
1013            .user_id_for_connection(request.sender_id)?;
1014        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1015        if !self
1016            .app_state
1017            .db
1018            .can_user_access_channel(user_id, channel_id)
1019            .await?
1020        {
1021            Err(anyhow!("access denied"))?;
1022        }
1023
1024        let messages = self
1025            .app_state
1026            .db
1027            .get_channel_messages(
1028                channel_id,
1029                MESSAGE_COUNT_PER_PAGE,
1030                Some(MessageId::from_proto(request.payload.before_message_id)),
1031            )
1032            .await?
1033            .into_iter()
1034            .map(|msg| proto::ChannelMessage {
1035                id: msg.id.to_proto(),
1036                body: msg.body,
1037                timestamp: msg.sent_at.unix_timestamp() as u64,
1038                sender_id: msg.sender_id.to_proto(),
1039                nonce: Some(msg.nonce.as_u128().into()),
1040            })
1041            .collect::<Vec<_>>();
1042
1043        Ok(proto::GetChannelMessagesResponse {
1044            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1045            messages,
1046        })
1047    }
1048
1049    async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1050        #[cfg(test)]
1051        tokio::task::yield_now().await;
1052        let guard = self.store.read().await;
1053        #[cfg(test)]
1054        tokio::task::yield_now().await;
1055        StoreReadGuard {
1056            guard,
1057            _not_send: PhantomData,
1058        }
1059    }
1060
1061    async fn state_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1062        #[cfg(test)]
1063        tokio::task::yield_now().await;
1064        let guard = self.store.write().await;
1065        #[cfg(test)]
1066        tokio::task::yield_now().await;
1067        StoreWriteGuard {
1068            guard,
1069            _not_send: PhantomData,
1070        }
1071    }
1072}
1073
1074impl<'a> Deref for StoreReadGuard<'a> {
1075    type Target = Store;
1076
1077    fn deref(&self) -> &Self::Target {
1078        &*self.guard
1079    }
1080}
1081
1082impl<'a> Deref for StoreWriteGuard<'a> {
1083    type Target = Store;
1084
1085    fn deref(&self) -> &Self::Target {
1086        &*self.guard
1087    }
1088}
1089
1090impl<'a> DerefMut for StoreWriteGuard<'a> {
1091    fn deref_mut(&mut self) -> &mut Self::Target {
1092        &mut *self.guard
1093    }
1094}
1095
1096impl<'a> Drop for StoreWriteGuard<'a> {
1097    fn drop(&mut self) {
1098        #[cfg(test)]
1099        self.check_invariants();
1100
1101        let metrics = self.metrics();
1102        tracing::info!(
1103            connections = metrics.connections,
1104            registered_projects = metrics.registered_projects,
1105            shared_projects = metrics.shared_projects,
1106            "metrics"
1107        );
1108    }
1109}
1110
1111impl Executor for RealExecutor {
1112    type Sleep = Sleep;
1113
1114    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1115        tokio::task::spawn(future);
1116    }
1117
1118    fn sleep(&self, duration: Duration) -> Self::Sleep {
1119        tokio::time::sleep(duration)
1120    }
1121}
1122
1123#[instrument(skip(f))]
1124fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1125where
1126    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1127{
1128    for receiver_id in receiver_ids {
1129        if receiver_id != sender_id {
1130            f(receiver_id).trace_err();
1131        }
1132    }
1133}
1134
1135lazy_static! {
1136    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1137}
1138
1139pub struct ProtocolVersion(u32);
1140
1141impl Header for ProtocolVersion {
1142    fn name() -> &'static HeaderName {
1143        &ZED_PROTOCOL_VERSION
1144    }
1145
1146    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1147    where
1148        Self: Sized,
1149        I: Iterator<Item = &'i axum::http::HeaderValue>,
1150    {
1151        let version = values
1152            .next()
1153            .ok_or_else(|| axum::headers::Error::invalid())?
1154            .to_str()
1155            .map_err(|_| axum::headers::Error::invalid())?
1156            .parse()
1157            .map_err(|_| axum::headers::Error::invalid())?;
1158        Ok(Self(version))
1159    }
1160
1161    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1162        values.extend([self.0.to_string().parse().unwrap()]);
1163    }
1164}
1165
1166pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1167    let server = Server::new(app_state.clone(), None);
1168    Router::new()
1169        .route("/rpc", get(handle_websocket_request))
1170        .layer(
1171            ServiceBuilder::new()
1172                .layer(Extension(app_state))
1173                .layer(middleware::from_fn(auth::validate_header))
1174                .layer(Extension(server)),
1175        )
1176}
1177
1178pub async fn handle_websocket_request(
1179    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1180    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1181    Extension(server): Extension<Arc<Server>>,
1182    Extension(user_id): Extension<UserId>,
1183    ws: WebSocketUpgrade,
1184) -> Response {
1185    if protocol_version != rpc::PROTOCOL_VERSION {
1186        return (
1187            StatusCode::UPGRADE_REQUIRED,
1188            "client must be upgraded".to_string(),
1189        )
1190            .into_response();
1191    }
1192    let socket_address = socket_address.to_string();
1193    ws.on_upgrade(move |socket| {
1194        let socket = socket
1195            .map_ok(to_tungstenite_message)
1196            .err_into()
1197            .with(|message| async move { Ok(to_axum_message(message)) });
1198        let connection = Connection::new(Box::pin(socket));
1199        server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
1200    })
1201}
1202
1203fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1204    match message {
1205        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1206        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1207        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1208        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1209        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1210            code: frame.code.into(),
1211            reason: frame.reason,
1212        })),
1213    }
1214}
1215
1216fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1217    match message {
1218        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1219        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1220        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1221        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1222        AxumMessage::Close(frame) => {
1223            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1224                code: frame.code.into(),
1225                reason: frame.reason,
1226            }))
1227        }
1228    }
1229}
1230
1231pub trait ResultExt {
1232    type Ok;
1233
1234    fn trace_err(self) -> Option<Self::Ok>;
1235}
1236
1237impl<T, E> ResultExt for Result<T, E>
1238where
1239    E: std::fmt::Debug,
1240{
1241    type Ok = T;
1242
1243    fn trace_err(self) -> Option<T> {
1244        match self {
1245            Ok(value) => Some(value),
1246            Err(error) => {
1247                tracing::error!("{:?}", error);
1248                None
1249            }
1250        }
1251    }
1252}
1253
1254#[cfg(test)]
1255mod tests {
1256    use super::*;
1257    use crate::{
1258        db::{tests::TestDb, UserId},
1259        AppState,
1260    };
1261    use ::rpc::Peer;
1262    use client::{
1263        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1264        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1265    };
1266    use collections::BTreeMap;
1267    use editor::{
1268        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1269        ToOffset, ToggleCodeActions, Undo,
1270    };
1271    use gpui::{
1272        executor::{self, Deterministic},
1273        geometry::vector::vec2f,
1274        ModelHandle, TestAppContext, ViewHandle,
1275    };
1276    use language::{
1277        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1278        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1279    };
1280    use lsp::{self, FakeLanguageServer};
1281    use parking_lot::Mutex;
1282    use project::{
1283        fs::{FakeFs, Fs as _},
1284        search::SearchQuery,
1285        worktree::WorktreeHandle,
1286        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1287    };
1288    use rand::prelude::*;
1289    use rpc::PeerId;
1290    use serde_json::json;
1291    use settings::Settings;
1292    use sqlx::types::time::OffsetDateTime;
1293    use std::{
1294        env,
1295        ops::Deref,
1296        path::{Path, PathBuf},
1297        rc::Rc,
1298        sync::{
1299            atomic::{AtomicBool, Ordering::SeqCst},
1300            Arc,
1301        },
1302        time::Duration,
1303    };
1304    use theme::ThemeRegistry;
1305    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1306
1307    #[cfg(test)]
1308    #[ctor::ctor]
1309    fn init_logger() {
1310        if std::env::var("RUST_LOG").is_ok() {
1311            env_logger::init();
1312        }
1313    }
1314
1315    #[gpui::test(iterations = 10)]
1316    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1317        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1318        let lang_registry = Arc::new(LanguageRegistry::test());
1319        let fs = FakeFs::new(cx_a.background());
1320        cx_a.foreground().forbid_parking();
1321
1322        // Connect to a server as 2 clients.
1323        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1324        let client_a = server.create_client(cx_a, "user_a").await;
1325        let client_b = server.create_client(cx_b, "user_b").await;
1326
1327        // Share a project as client A
1328        fs.insert_tree(
1329            "/a",
1330            json!({
1331                ".zed.toml": r#"collaborators = ["user_b"]"#,
1332                "a.txt": "a-contents",
1333                "b.txt": "b-contents",
1334            }),
1335        )
1336        .await;
1337        let project_a = cx_a.update(|cx| {
1338            Project::local(
1339                client_a.clone(),
1340                client_a.user_store.clone(),
1341                lang_registry.clone(),
1342                fs.clone(),
1343                cx,
1344            )
1345        });
1346        let (worktree_a, _) = project_a
1347            .update(cx_a, |p, cx| {
1348                p.find_or_create_local_worktree("/a", true, cx)
1349            })
1350            .await
1351            .unwrap();
1352        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1353        worktree_a
1354            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1355            .await;
1356        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1357        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1358
1359        // Join that project as client B
1360        let project_b = Project::remote(
1361            project_id,
1362            client_b.clone(),
1363            client_b.user_store.clone(),
1364            lang_registry.clone(),
1365            fs.clone(),
1366            &mut cx_b.to_async(),
1367        )
1368        .await
1369        .unwrap();
1370
1371        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1372            assert_eq!(
1373                project
1374                    .collaborators()
1375                    .get(&client_a.peer_id)
1376                    .unwrap()
1377                    .user
1378                    .github_login,
1379                "user_a"
1380            );
1381            project.replica_id()
1382        });
1383        project_a
1384            .condition(&cx_a, |tree, _| {
1385                tree.collaborators()
1386                    .get(&client_b.peer_id)
1387                    .map_or(false, |collaborator| {
1388                        collaborator.replica_id == replica_id_b
1389                            && collaborator.user.github_login == "user_b"
1390                    })
1391            })
1392            .await;
1393
1394        // Open the same file as client B and client A.
1395        let buffer_b = project_b
1396            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1397            .await
1398            .unwrap();
1399        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1400        project_a.read_with(cx_a, |project, cx| {
1401            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1402        });
1403        let buffer_a = project_a
1404            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1405            .await
1406            .unwrap();
1407
1408        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1409
1410        // TODO
1411        // // Create a selection set as client B and see that selection set as client A.
1412        // buffer_a
1413        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1414        //     .await;
1415
1416        // Edit the buffer as client B and see that edit as client A.
1417        editor_b.update(cx_b, |editor, cx| {
1418            editor.handle_input(&Input("ok, ".into()), cx)
1419        });
1420        buffer_a
1421            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1422            .await;
1423
1424        // TODO
1425        // // Remove the selection set as client B, see those selections disappear as client A.
1426        cx_b.update(move |_| drop(editor_b));
1427        // buffer_a
1428        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1429        //     .await;
1430
1431        // Dropping the client B's project removes client B from client A's collaborators.
1432        cx_b.update(move |_| drop(project_b));
1433        project_a
1434            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1435            .await;
1436    }
1437
1438    #[gpui::test(iterations = 10)]
1439    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1440        let lang_registry = Arc::new(LanguageRegistry::test());
1441        let fs = FakeFs::new(cx_a.background());
1442        cx_a.foreground().forbid_parking();
1443
1444        // Connect to a server as 2 clients.
1445        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1446        let client_a = server.create_client(cx_a, "user_a").await;
1447        let client_b = server.create_client(cx_b, "user_b").await;
1448
1449        // Share a project as client A
1450        fs.insert_tree(
1451            "/a",
1452            json!({
1453                ".zed.toml": r#"collaborators = ["user_b"]"#,
1454                "a.txt": "a-contents",
1455                "b.txt": "b-contents",
1456            }),
1457        )
1458        .await;
1459        let project_a = cx_a.update(|cx| {
1460            Project::local(
1461                client_a.clone(),
1462                client_a.user_store.clone(),
1463                lang_registry.clone(),
1464                fs.clone(),
1465                cx,
1466            )
1467        });
1468        let (worktree_a, _) = project_a
1469            .update(cx_a, |p, cx| {
1470                p.find_or_create_local_worktree("/a", true, cx)
1471            })
1472            .await
1473            .unwrap();
1474        worktree_a
1475            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1476            .await;
1477        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1478        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1479        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1480        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1481
1482        // Join that project as client B
1483        let project_b = Project::remote(
1484            project_id,
1485            client_b.clone(),
1486            client_b.user_store.clone(),
1487            lang_registry.clone(),
1488            fs.clone(),
1489            &mut cx_b.to_async(),
1490        )
1491        .await
1492        .unwrap();
1493        project_b
1494            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1495            .await
1496            .unwrap();
1497
1498        // Unshare the project as client A
1499        project_a.update(cx_a, |project, cx| project.unshare(cx));
1500        project_b
1501            .condition(cx_b, |project, _| project.is_read_only())
1502            .await;
1503        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1504        cx_b.update(|_| {
1505            drop(project_b);
1506        });
1507
1508        // Share the project again and ensure guests can still join.
1509        project_a
1510            .update(cx_a, |project, cx| project.share(cx))
1511            .await
1512            .unwrap();
1513        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1514
1515        let project_b2 = Project::remote(
1516            project_id,
1517            client_b.clone(),
1518            client_b.user_store.clone(),
1519            lang_registry.clone(),
1520            fs.clone(),
1521            &mut cx_b.to_async(),
1522        )
1523        .await
1524        .unwrap();
1525        project_b2
1526            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1527            .await
1528            .unwrap();
1529    }
1530
1531    #[gpui::test(iterations = 10)]
1532    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1533        let lang_registry = Arc::new(LanguageRegistry::test());
1534        let fs = FakeFs::new(cx_a.background());
1535        cx_a.foreground().forbid_parking();
1536
1537        // Connect to a server as 2 clients.
1538        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1539        let client_a = server.create_client(cx_a, "user_a").await;
1540        let client_b = server.create_client(cx_b, "user_b").await;
1541
1542        // Share a project as client A
1543        fs.insert_tree(
1544            "/a",
1545            json!({
1546                ".zed.toml": r#"collaborators = ["user_b"]"#,
1547                "a.txt": "a-contents",
1548                "b.txt": "b-contents",
1549            }),
1550        )
1551        .await;
1552        let project_a = cx_a.update(|cx| {
1553            Project::local(
1554                client_a.clone(),
1555                client_a.user_store.clone(),
1556                lang_registry.clone(),
1557                fs.clone(),
1558                cx,
1559            )
1560        });
1561        let (worktree_a, _) = project_a
1562            .update(cx_a, |p, cx| {
1563                p.find_or_create_local_worktree("/a", true, cx)
1564            })
1565            .await
1566            .unwrap();
1567        worktree_a
1568            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1569            .await;
1570        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1571        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1572        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1573        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1574
1575        // Join that project as client B
1576        let project_b = Project::remote(
1577            project_id,
1578            client_b.clone(),
1579            client_b.user_store.clone(),
1580            lang_registry.clone(),
1581            fs.clone(),
1582            &mut cx_b.to_async(),
1583        )
1584        .await
1585        .unwrap();
1586        project_b
1587            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1588            .await
1589            .unwrap();
1590
1591        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1592        server.disconnect_client(client_a.current_user_id(cx_a));
1593        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1594        project_a
1595            .condition(cx_a, |project, _| project.collaborators().is_empty())
1596            .await;
1597        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1598        project_b
1599            .condition(cx_b, |project, _| project.is_read_only())
1600            .await;
1601        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1602        cx_b.update(|_| {
1603            drop(project_b);
1604        });
1605
1606        // Await reconnection
1607        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1608
1609        // Share the project again and ensure guests can still join.
1610        project_a
1611            .update(cx_a, |project, cx| project.share(cx))
1612            .await
1613            .unwrap();
1614        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1615
1616        let project_b2 = Project::remote(
1617            project_id,
1618            client_b.clone(),
1619            client_b.user_store.clone(),
1620            lang_registry.clone(),
1621            fs.clone(),
1622            &mut cx_b.to_async(),
1623        )
1624        .await
1625        .unwrap();
1626        project_b2
1627            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1628            .await
1629            .unwrap();
1630    }
1631
1632    #[gpui::test(iterations = 10)]
1633    async fn test_propagate_saves_and_fs_changes(
1634        cx_a: &mut TestAppContext,
1635        cx_b: &mut TestAppContext,
1636        cx_c: &mut TestAppContext,
1637    ) {
1638        let lang_registry = Arc::new(LanguageRegistry::test());
1639        let fs = FakeFs::new(cx_a.background());
1640        cx_a.foreground().forbid_parking();
1641
1642        // Connect to a server as 3 clients.
1643        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1644        let client_a = server.create_client(cx_a, "user_a").await;
1645        let client_b = server.create_client(cx_b, "user_b").await;
1646        let client_c = server.create_client(cx_c, "user_c").await;
1647
1648        // Share a worktree as client A.
1649        fs.insert_tree(
1650            "/a",
1651            json!({
1652                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1653                "file1": "",
1654                "file2": ""
1655            }),
1656        )
1657        .await;
1658        let project_a = cx_a.update(|cx| {
1659            Project::local(
1660                client_a.clone(),
1661                client_a.user_store.clone(),
1662                lang_registry.clone(),
1663                fs.clone(),
1664                cx,
1665            )
1666        });
1667        let (worktree_a, _) = project_a
1668            .update(cx_a, |p, cx| {
1669                p.find_or_create_local_worktree("/a", true, cx)
1670            })
1671            .await
1672            .unwrap();
1673        worktree_a
1674            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1675            .await;
1676        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1677        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1678        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1679
1680        // Join that worktree as clients B and C.
1681        let project_b = Project::remote(
1682            project_id,
1683            client_b.clone(),
1684            client_b.user_store.clone(),
1685            lang_registry.clone(),
1686            fs.clone(),
1687            &mut cx_b.to_async(),
1688        )
1689        .await
1690        .unwrap();
1691        let project_c = Project::remote(
1692            project_id,
1693            client_c.clone(),
1694            client_c.user_store.clone(),
1695            lang_registry.clone(),
1696            fs.clone(),
1697            &mut cx_c.to_async(),
1698        )
1699        .await
1700        .unwrap();
1701        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1702        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1703
1704        // Open and edit a buffer as both guests B and C.
1705        let buffer_b = project_b
1706            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1707            .await
1708            .unwrap();
1709        let buffer_c = project_c
1710            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1711            .await
1712            .unwrap();
1713        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1714        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1715
1716        // Open and edit that buffer as the host.
1717        let buffer_a = project_a
1718            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1719            .await
1720            .unwrap();
1721
1722        buffer_a
1723            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1724            .await;
1725        buffer_a.update(cx_a, |buf, cx| {
1726            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1727        });
1728
1729        // Wait for edits to propagate
1730        buffer_a
1731            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1732            .await;
1733        buffer_b
1734            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1735            .await;
1736        buffer_c
1737            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1738            .await;
1739
1740        // Edit the buffer as the host and concurrently save as guest B.
1741        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1742        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
1743        save_b.await.unwrap();
1744        assert_eq!(
1745            fs.load("/a/file1".as_ref()).await.unwrap(),
1746            "hi-a, i-am-c, i-am-b, i-am-a"
1747        );
1748        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1749        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1750        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1751
1752        worktree_a.flush_fs_events(cx_a).await;
1753
1754        // Make changes on host's file system, see those changes on guest worktrees.
1755        fs.rename(
1756            "/a/file1".as_ref(),
1757            "/a/file1-renamed".as_ref(),
1758            Default::default(),
1759        )
1760        .await
1761        .unwrap();
1762
1763        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1764            .await
1765            .unwrap();
1766        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1767
1768        worktree_a
1769            .condition(&cx_a, |tree, _| {
1770                tree.paths()
1771                    .map(|p| p.to_string_lossy())
1772                    .collect::<Vec<_>>()
1773                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1774            })
1775            .await;
1776        worktree_b
1777            .condition(&cx_b, |tree, _| {
1778                tree.paths()
1779                    .map(|p| p.to_string_lossy())
1780                    .collect::<Vec<_>>()
1781                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1782            })
1783            .await;
1784        worktree_c
1785            .condition(&cx_c, |tree, _| {
1786                tree.paths()
1787                    .map(|p| p.to_string_lossy())
1788                    .collect::<Vec<_>>()
1789                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1790            })
1791            .await;
1792
1793        // Ensure buffer files are updated as well.
1794        buffer_a
1795            .condition(&cx_a, |buf, _| {
1796                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1797            })
1798            .await;
1799        buffer_b
1800            .condition(&cx_b, |buf, _| {
1801                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1802            })
1803            .await;
1804        buffer_c
1805            .condition(&cx_c, |buf, _| {
1806                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1807            })
1808            .await;
1809    }
1810
1811    #[gpui::test(iterations = 10)]
1812    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1813        cx_a.foreground().forbid_parking();
1814        let lang_registry = Arc::new(LanguageRegistry::test());
1815        let fs = FakeFs::new(cx_a.background());
1816
1817        // Connect to a server as 2 clients.
1818        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1819        let client_a = server.create_client(cx_a, "user_a").await;
1820        let client_b = server.create_client(cx_b, "user_b").await;
1821
1822        // Share a project as client A
1823        fs.insert_tree(
1824            "/dir",
1825            json!({
1826                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1827                "a.txt": "a-contents",
1828            }),
1829        )
1830        .await;
1831
1832        let project_a = cx_a.update(|cx| {
1833            Project::local(
1834                client_a.clone(),
1835                client_a.user_store.clone(),
1836                lang_registry.clone(),
1837                fs.clone(),
1838                cx,
1839            )
1840        });
1841        let (worktree_a, _) = project_a
1842            .update(cx_a, |p, cx| {
1843                p.find_or_create_local_worktree("/dir", true, cx)
1844            })
1845            .await
1846            .unwrap();
1847        worktree_a
1848            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1849            .await;
1850        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1851        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1852        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1853
1854        // Join that project as client B
1855        let project_b = Project::remote(
1856            project_id,
1857            client_b.clone(),
1858            client_b.user_store.clone(),
1859            lang_registry.clone(),
1860            fs.clone(),
1861            &mut cx_b.to_async(),
1862        )
1863        .await
1864        .unwrap();
1865
1866        // Open a buffer as client B
1867        let buffer_b = project_b
1868            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1869            .await
1870            .unwrap();
1871
1872        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
1873        buffer_b.read_with(cx_b, |buf, _| {
1874            assert!(buf.is_dirty());
1875            assert!(!buf.has_conflict());
1876        });
1877
1878        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1879        buffer_b
1880            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1881            .await;
1882        buffer_b.read_with(cx_b, |buf, _| {
1883            assert!(!buf.has_conflict());
1884        });
1885
1886        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
1887        buffer_b.read_with(cx_b, |buf, _| {
1888            assert!(buf.is_dirty());
1889            assert!(!buf.has_conflict());
1890        });
1891    }
1892
1893    #[gpui::test(iterations = 10)]
1894    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1895        cx_a.foreground().forbid_parking();
1896        let lang_registry = Arc::new(LanguageRegistry::test());
1897        let fs = FakeFs::new(cx_a.background());
1898
1899        // Connect to a server as 2 clients.
1900        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1901        let client_a = server.create_client(cx_a, "user_a").await;
1902        let client_b = server.create_client(cx_b, "user_b").await;
1903
1904        // Share a project as client A
1905        fs.insert_tree(
1906            "/dir",
1907            json!({
1908                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1909                "a.txt": "a-contents",
1910            }),
1911        )
1912        .await;
1913
1914        let project_a = cx_a.update(|cx| {
1915            Project::local(
1916                client_a.clone(),
1917                client_a.user_store.clone(),
1918                lang_registry.clone(),
1919                fs.clone(),
1920                cx,
1921            )
1922        });
1923        let (worktree_a, _) = project_a
1924            .update(cx_a, |p, cx| {
1925                p.find_or_create_local_worktree("/dir", true, cx)
1926            })
1927            .await
1928            .unwrap();
1929        worktree_a
1930            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1931            .await;
1932        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1933        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1934        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1935
1936        // Join that project as client B
1937        let project_b = Project::remote(
1938            project_id,
1939            client_b.clone(),
1940            client_b.user_store.clone(),
1941            lang_registry.clone(),
1942            fs.clone(),
1943            &mut cx_b.to_async(),
1944        )
1945        .await
1946        .unwrap();
1947        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1948
1949        // Open a buffer as client B
1950        let buffer_b = project_b
1951            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1952            .await
1953            .unwrap();
1954        buffer_b.read_with(cx_b, |buf, _| {
1955            assert!(!buf.is_dirty());
1956            assert!(!buf.has_conflict());
1957        });
1958
1959        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1960            .await
1961            .unwrap();
1962        buffer_b
1963            .condition(&cx_b, |buf, _| {
1964                buf.text() == "new contents" && !buf.is_dirty()
1965            })
1966            .await;
1967        buffer_b.read_with(cx_b, |buf, _| {
1968            assert!(!buf.has_conflict());
1969        });
1970    }
1971
1972    #[gpui::test(iterations = 10)]
1973    async fn test_editing_while_guest_opens_buffer(
1974        cx_a: &mut TestAppContext,
1975        cx_b: &mut TestAppContext,
1976    ) {
1977        cx_a.foreground().forbid_parking();
1978        let lang_registry = Arc::new(LanguageRegistry::test());
1979        let fs = FakeFs::new(cx_a.background());
1980
1981        // Connect to a server as 2 clients.
1982        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1983        let client_a = server.create_client(cx_a, "user_a").await;
1984        let client_b = server.create_client(cx_b, "user_b").await;
1985
1986        // Share a project as client A
1987        fs.insert_tree(
1988            "/dir",
1989            json!({
1990                ".zed.toml": r#"collaborators = ["user_b"]"#,
1991                "a.txt": "a-contents",
1992            }),
1993        )
1994        .await;
1995        let project_a = cx_a.update(|cx| {
1996            Project::local(
1997                client_a.clone(),
1998                client_a.user_store.clone(),
1999                lang_registry.clone(),
2000                fs.clone(),
2001                cx,
2002            )
2003        });
2004        let (worktree_a, _) = project_a
2005            .update(cx_a, |p, cx| {
2006                p.find_or_create_local_worktree("/dir", true, cx)
2007            })
2008            .await
2009            .unwrap();
2010        worktree_a
2011            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2012            .await;
2013        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2014        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2015        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2016
2017        // Join that project as client B
2018        let project_b = Project::remote(
2019            project_id,
2020            client_b.clone(),
2021            client_b.user_store.clone(),
2022            lang_registry.clone(),
2023            fs.clone(),
2024            &mut cx_b.to_async(),
2025        )
2026        .await
2027        .unwrap();
2028
2029        // Open a buffer as client A
2030        let buffer_a = project_a
2031            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2032            .await
2033            .unwrap();
2034
2035        // Start opening the same buffer as client B
2036        let buffer_b = cx_b
2037            .background()
2038            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2039
2040        // Edit the buffer as client A while client B is still opening it.
2041        cx_b.background().simulate_random_delay().await;
2042        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2043        cx_b.background().simulate_random_delay().await;
2044        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2045
2046        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2047        let buffer_b = buffer_b.await.unwrap();
2048        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2049    }
2050
2051    #[gpui::test(iterations = 10)]
2052    async fn test_leaving_worktree_while_opening_buffer(
2053        cx_a: &mut TestAppContext,
2054        cx_b: &mut TestAppContext,
2055    ) {
2056        cx_a.foreground().forbid_parking();
2057        let lang_registry = Arc::new(LanguageRegistry::test());
2058        let fs = FakeFs::new(cx_a.background());
2059
2060        // Connect to a server as 2 clients.
2061        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2062        let client_a = server.create_client(cx_a, "user_a").await;
2063        let client_b = server.create_client(cx_b, "user_b").await;
2064
2065        // Share a project as client A
2066        fs.insert_tree(
2067            "/dir",
2068            json!({
2069                ".zed.toml": r#"collaborators = ["user_b"]"#,
2070                "a.txt": "a-contents",
2071            }),
2072        )
2073        .await;
2074        let project_a = cx_a.update(|cx| {
2075            Project::local(
2076                client_a.clone(),
2077                client_a.user_store.clone(),
2078                lang_registry.clone(),
2079                fs.clone(),
2080                cx,
2081            )
2082        });
2083        let (worktree_a, _) = project_a
2084            .update(cx_a, |p, cx| {
2085                p.find_or_create_local_worktree("/dir", true, cx)
2086            })
2087            .await
2088            .unwrap();
2089        worktree_a
2090            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2091            .await;
2092        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2093        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2094        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2095
2096        // Join that project as client B
2097        let project_b = Project::remote(
2098            project_id,
2099            client_b.clone(),
2100            client_b.user_store.clone(),
2101            lang_registry.clone(),
2102            fs.clone(),
2103            &mut cx_b.to_async(),
2104        )
2105        .await
2106        .unwrap();
2107
2108        // See that a guest has joined as client A.
2109        project_a
2110            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2111            .await;
2112
2113        // Begin opening a buffer as client B, but leave the project before the open completes.
2114        let buffer_b = cx_b
2115            .background()
2116            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2117        cx_b.update(|_| drop(project_b));
2118        drop(buffer_b);
2119
2120        // See that the guest has left.
2121        project_a
2122            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2123            .await;
2124    }
2125
2126    #[gpui::test(iterations = 10)]
2127    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2128        cx_a.foreground().forbid_parking();
2129        let lang_registry = Arc::new(LanguageRegistry::test());
2130        let fs = FakeFs::new(cx_a.background());
2131
2132        // Connect to a server as 2 clients.
2133        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2134        let client_a = server.create_client(cx_a, "user_a").await;
2135        let client_b = server.create_client(cx_b, "user_b").await;
2136
2137        // Share a project as client A
2138        fs.insert_tree(
2139            "/a",
2140            json!({
2141                ".zed.toml": r#"collaborators = ["user_b"]"#,
2142                "a.txt": "a-contents",
2143                "b.txt": "b-contents",
2144            }),
2145        )
2146        .await;
2147        let project_a = cx_a.update(|cx| {
2148            Project::local(
2149                client_a.clone(),
2150                client_a.user_store.clone(),
2151                lang_registry.clone(),
2152                fs.clone(),
2153                cx,
2154            )
2155        });
2156        let (worktree_a, _) = project_a
2157            .update(cx_a, |p, cx| {
2158                p.find_or_create_local_worktree("/a", true, cx)
2159            })
2160            .await
2161            .unwrap();
2162        worktree_a
2163            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2164            .await;
2165        let project_id = project_a
2166            .update(cx_a, |project, _| project.next_remote_id())
2167            .await;
2168        project_a
2169            .update(cx_a, |project, cx| project.share(cx))
2170            .await
2171            .unwrap();
2172
2173        // Join that project as client B
2174        let _project_b = Project::remote(
2175            project_id,
2176            client_b.clone(),
2177            client_b.user_store.clone(),
2178            lang_registry.clone(),
2179            fs.clone(),
2180            &mut cx_b.to_async(),
2181        )
2182        .await
2183        .unwrap();
2184
2185        // Client A sees that a guest has joined.
2186        project_a
2187            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2188            .await;
2189
2190        // Drop client B's connection and ensure client A observes client B leaving the project.
2191        client_b.disconnect(&cx_b.to_async()).unwrap();
2192        project_a
2193            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2194            .await;
2195
2196        // Rejoin the project as client B
2197        let _project_b = Project::remote(
2198            project_id,
2199            client_b.clone(),
2200            client_b.user_store.clone(),
2201            lang_registry.clone(),
2202            fs.clone(),
2203            &mut cx_b.to_async(),
2204        )
2205        .await
2206        .unwrap();
2207
2208        // Client A sees that a guest has re-joined.
2209        project_a
2210            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2211            .await;
2212
2213        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2214        client_b.wait_for_current_user(cx_b).await;
2215        server.disconnect_client(client_b.current_user_id(cx_b));
2216        cx_a.foreground().advance_clock(Duration::from_secs(3));
2217        project_a
2218            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2219            .await;
2220    }
2221
2222    #[gpui::test(iterations = 10)]
2223    async fn test_collaborating_with_diagnostics(
2224        cx_a: &mut TestAppContext,
2225        cx_b: &mut TestAppContext,
2226    ) {
2227        cx_a.foreground().forbid_parking();
2228        let lang_registry = Arc::new(LanguageRegistry::test());
2229        let fs = FakeFs::new(cx_a.background());
2230
2231        // Set up a fake language server.
2232        let mut language = Language::new(
2233            LanguageConfig {
2234                name: "Rust".into(),
2235                path_suffixes: vec!["rs".to_string()],
2236                ..Default::default()
2237            },
2238            Some(tree_sitter_rust::language()),
2239        );
2240        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2241        lang_registry.add(Arc::new(language));
2242
2243        // Connect to a server as 2 clients.
2244        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2245        let client_a = server.create_client(cx_a, "user_a").await;
2246        let client_b = server.create_client(cx_b, "user_b").await;
2247
2248        // Share a project as client A
2249        fs.insert_tree(
2250            "/a",
2251            json!({
2252                ".zed.toml": r#"collaborators = ["user_b"]"#,
2253                "a.rs": "let one = two",
2254                "other.rs": "",
2255            }),
2256        )
2257        .await;
2258        let project_a = cx_a.update(|cx| {
2259            Project::local(
2260                client_a.clone(),
2261                client_a.user_store.clone(),
2262                lang_registry.clone(),
2263                fs.clone(),
2264                cx,
2265            )
2266        });
2267        let (worktree_a, _) = project_a
2268            .update(cx_a, |p, cx| {
2269                p.find_or_create_local_worktree("/a", true, cx)
2270            })
2271            .await
2272            .unwrap();
2273        worktree_a
2274            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2275            .await;
2276        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2277        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2278        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2279
2280        // Cause the language server to start.
2281        let _ = cx_a
2282            .background()
2283            .spawn(project_a.update(cx_a, |project, cx| {
2284                project.open_buffer(
2285                    ProjectPath {
2286                        worktree_id,
2287                        path: Path::new("other.rs").into(),
2288                    },
2289                    cx,
2290                )
2291            }))
2292            .await
2293            .unwrap();
2294
2295        // Simulate a language server reporting errors for a file.
2296        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2297        fake_language_server
2298            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2299            .await;
2300        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2301            lsp::PublishDiagnosticsParams {
2302                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2303                version: None,
2304                diagnostics: vec![lsp::Diagnostic {
2305                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2306                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2307                    message: "message 1".to_string(),
2308                    ..Default::default()
2309                }],
2310            },
2311        );
2312
2313        // Wait for server to see the diagnostics update.
2314        server
2315            .condition(|store| {
2316                let worktree = store
2317                    .project(project_id)
2318                    .unwrap()
2319                    .share
2320                    .as_ref()
2321                    .unwrap()
2322                    .worktrees
2323                    .get(&worktree_id.to_proto())
2324                    .unwrap();
2325
2326                !worktree.diagnostic_summaries.is_empty()
2327            })
2328            .await;
2329
2330        // Join the worktree as client B.
2331        let project_b = Project::remote(
2332            project_id,
2333            client_b.clone(),
2334            client_b.user_store.clone(),
2335            lang_registry.clone(),
2336            fs.clone(),
2337            &mut cx_b.to_async(),
2338        )
2339        .await
2340        .unwrap();
2341
2342        project_b.read_with(cx_b, |project, cx| {
2343            assert_eq!(
2344                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2345                &[(
2346                    ProjectPath {
2347                        worktree_id,
2348                        path: Arc::from(Path::new("a.rs")),
2349                    },
2350                    DiagnosticSummary {
2351                        error_count: 1,
2352                        warning_count: 0,
2353                        ..Default::default()
2354                    },
2355                )]
2356            )
2357        });
2358
2359        // Simulate a language server reporting more errors for a file.
2360        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2361            lsp::PublishDiagnosticsParams {
2362                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2363                version: None,
2364                diagnostics: vec![
2365                    lsp::Diagnostic {
2366                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2367                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2368                        message: "message 1".to_string(),
2369                        ..Default::default()
2370                    },
2371                    lsp::Diagnostic {
2372                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2373                        range: lsp::Range::new(
2374                            lsp::Position::new(0, 10),
2375                            lsp::Position::new(0, 13),
2376                        ),
2377                        message: "message 2".to_string(),
2378                        ..Default::default()
2379                    },
2380                ],
2381            },
2382        );
2383
2384        // Client b gets the updated summaries
2385        project_b
2386            .condition(&cx_b, |project, cx| {
2387                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2388                    == &[(
2389                        ProjectPath {
2390                            worktree_id,
2391                            path: Arc::from(Path::new("a.rs")),
2392                        },
2393                        DiagnosticSummary {
2394                            error_count: 1,
2395                            warning_count: 1,
2396                            ..Default::default()
2397                        },
2398                    )]
2399            })
2400            .await;
2401
2402        // Open the file with the errors on client B. They should be present.
2403        let buffer_b = cx_b
2404            .background()
2405            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2406            .await
2407            .unwrap();
2408
2409        buffer_b.read_with(cx_b, |buffer, _| {
2410            assert_eq!(
2411                buffer
2412                    .snapshot()
2413                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2414                    .map(|entry| entry)
2415                    .collect::<Vec<_>>(),
2416                &[
2417                    DiagnosticEntry {
2418                        range: Point::new(0, 4)..Point::new(0, 7),
2419                        diagnostic: Diagnostic {
2420                            group_id: 0,
2421                            message: "message 1".to_string(),
2422                            severity: lsp::DiagnosticSeverity::ERROR,
2423                            is_primary: true,
2424                            ..Default::default()
2425                        }
2426                    },
2427                    DiagnosticEntry {
2428                        range: Point::new(0, 10)..Point::new(0, 13),
2429                        diagnostic: Diagnostic {
2430                            group_id: 1,
2431                            severity: lsp::DiagnosticSeverity::WARNING,
2432                            message: "message 2".to_string(),
2433                            is_primary: true,
2434                            ..Default::default()
2435                        }
2436                    }
2437                ]
2438            );
2439        });
2440
2441        // Simulate a language server reporting no errors for a file.
2442        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2443            lsp::PublishDiagnosticsParams {
2444                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2445                version: None,
2446                diagnostics: vec![],
2447            },
2448        );
2449        project_a
2450            .condition(cx_a, |project, cx| {
2451                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2452            })
2453            .await;
2454        project_b
2455            .condition(cx_b, |project, cx| {
2456                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2457            })
2458            .await;
2459    }
2460
2461    #[gpui::test(iterations = 10)]
2462    async fn test_collaborating_with_completion(
2463        cx_a: &mut TestAppContext,
2464        cx_b: &mut TestAppContext,
2465    ) {
2466        cx_a.foreground().forbid_parking();
2467        let lang_registry = Arc::new(LanguageRegistry::test());
2468        let fs = FakeFs::new(cx_a.background());
2469
2470        // Set up a fake language server.
2471        let mut language = Language::new(
2472            LanguageConfig {
2473                name: "Rust".into(),
2474                path_suffixes: vec!["rs".to_string()],
2475                ..Default::default()
2476            },
2477            Some(tree_sitter_rust::language()),
2478        );
2479        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2480            capabilities: lsp::ServerCapabilities {
2481                completion_provider: Some(lsp::CompletionOptions {
2482                    trigger_characters: Some(vec![".".to_string()]),
2483                    ..Default::default()
2484                }),
2485                ..Default::default()
2486            },
2487            ..Default::default()
2488        });
2489        lang_registry.add(Arc::new(language));
2490
2491        // Connect to a server as 2 clients.
2492        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2493        let client_a = server.create_client(cx_a, "user_a").await;
2494        let client_b = server.create_client(cx_b, "user_b").await;
2495
2496        // Share a project as client A
2497        fs.insert_tree(
2498            "/a",
2499            json!({
2500                ".zed.toml": r#"collaborators = ["user_b"]"#,
2501                "main.rs": "fn main() { a }",
2502                "other.rs": "",
2503            }),
2504        )
2505        .await;
2506        let project_a = cx_a.update(|cx| {
2507            Project::local(
2508                client_a.clone(),
2509                client_a.user_store.clone(),
2510                lang_registry.clone(),
2511                fs.clone(),
2512                cx,
2513            )
2514        });
2515        let (worktree_a, _) = project_a
2516            .update(cx_a, |p, cx| {
2517                p.find_or_create_local_worktree("/a", true, cx)
2518            })
2519            .await
2520            .unwrap();
2521        worktree_a
2522            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2523            .await;
2524        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2525        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2526        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2527
2528        // Join the worktree as client B.
2529        let project_b = Project::remote(
2530            project_id,
2531            client_b.clone(),
2532            client_b.user_store.clone(),
2533            lang_registry.clone(),
2534            fs.clone(),
2535            &mut cx_b.to_async(),
2536        )
2537        .await
2538        .unwrap();
2539
2540        // Open a file in an editor as the guest.
2541        let buffer_b = project_b
2542            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2543            .await
2544            .unwrap();
2545        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2546        let editor_b = cx_b.add_view(window_b, |cx| {
2547            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2548        });
2549
2550        let fake_language_server = fake_language_servers.next().await.unwrap();
2551        buffer_b
2552            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2553            .await;
2554
2555        // Type a completion trigger character as the guest.
2556        editor_b.update(cx_b, |editor, cx| {
2557            editor.select_ranges([13..13], None, cx);
2558            editor.handle_input(&Input(".".into()), cx);
2559            cx.focus(&editor_b);
2560        });
2561
2562        // Receive a completion request as the host's language server.
2563        // Return some completions from the host's language server.
2564        cx_a.foreground().start_waiting();
2565        fake_language_server
2566            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2567                assert_eq!(
2568                    params.text_document_position.text_document.uri,
2569                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2570                );
2571                assert_eq!(
2572                    params.text_document_position.position,
2573                    lsp::Position::new(0, 14),
2574                );
2575
2576                Ok(Some(lsp::CompletionResponse::Array(vec![
2577                    lsp::CompletionItem {
2578                        label: "first_method(…)".into(),
2579                        detail: Some("fn(&mut self, B) -> C".into()),
2580                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2581                            new_text: "first_method($1)".to_string(),
2582                            range: lsp::Range::new(
2583                                lsp::Position::new(0, 14),
2584                                lsp::Position::new(0, 14),
2585                            ),
2586                        })),
2587                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2588                        ..Default::default()
2589                    },
2590                    lsp::CompletionItem {
2591                        label: "second_method(…)".into(),
2592                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2593                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2594                            new_text: "second_method()".to_string(),
2595                            range: lsp::Range::new(
2596                                lsp::Position::new(0, 14),
2597                                lsp::Position::new(0, 14),
2598                            ),
2599                        })),
2600                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2601                        ..Default::default()
2602                    },
2603                ])))
2604            })
2605            .next()
2606            .await
2607            .unwrap();
2608        cx_a.foreground().finish_waiting();
2609
2610        // Open the buffer on the host.
2611        let buffer_a = project_a
2612            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2613            .await
2614            .unwrap();
2615        buffer_a
2616            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2617            .await;
2618
2619        // Confirm a completion on the guest.
2620        editor_b
2621            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2622            .await;
2623        editor_b.update(cx_b, |editor, cx| {
2624            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2625            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2626        });
2627
2628        // Return a resolved completion from the host's language server.
2629        // The resolved completion has an additional text edit.
2630        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2631            |params, _| async move {
2632                assert_eq!(params.label, "first_method(…)");
2633                Ok(lsp::CompletionItem {
2634                    label: "first_method(…)".into(),
2635                    detail: Some("fn(&mut self, B) -> C".into()),
2636                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2637                        new_text: "first_method($1)".to_string(),
2638                        range: lsp::Range::new(
2639                            lsp::Position::new(0, 14),
2640                            lsp::Position::new(0, 14),
2641                        ),
2642                    })),
2643                    additional_text_edits: Some(vec![lsp::TextEdit {
2644                        new_text: "use d::SomeTrait;\n".to_string(),
2645                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2646                    }]),
2647                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2648                    ..Default::default()
2649                })
2650            },
2651        );
2652
2653        // The additional edit is applied.
2654        buffer_a
2655            .condition(&cx_a, |buffer, _| {
2656                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2657            })
2658            .await;
2659        buffer_b
2660            .condition(&cx_b, |buffer, _| {
2661                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2662            })
2663            .await;
2664    }
2665
2666    #[gpui::test(iterations = 10)]
2667    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2668        cx_a.foreground().forbid_parking();
2669        let lang_registry = Arc::new(LanguageRegistry::test());
2670        let fs = FakeFs::new(cx_a.background());
2671
2672        // Connect to a server as 2 clients.
2673        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2674        let client_a = server.create_client(cx_a, "user_a").await;
2675        let client_b = server.create_client(cx_b, "user_b").await;
2676
2677        // Share a project as client A
2678        fs.insert_tree(
2679            "/a",
2680            json!({
2681                ".zed.toml": r#"collaborators = ["user_b"]"#,
2682                "a.rs": "let one = 1;",
2683            }),
2684        )
2685        .await;
2686        let project_a = cx_a.update(|cx| {
2687            Project::local(
2688                client_a.clone(),
2689                client_a.user_store.clone(),
2690                lang_registry.clone(),
2691                fs.clone(),
2692                cx,
2693            )
2694        });
2695        let (worktree_a, _) = project_a
2696            .update(cx_a, |p, cx| {
2697                p.find_or_create_local_worktree("/a", true, cx)
2698            })
2699            .await
2700            .unwrap();
2701        worktree_a
2702            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2703            .await;
2704        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2705        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2706        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2707        let buffer_a = project_a
2708            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2709            .await
2710            .unwrap();
2711
2712        // Join the worktree as client B.
2713        let project_b = Project::remote(
2714            project_id,
2715            client_b.clone(),
2716            client_b.user_store.clone(),
2717            lang_registry.clone(),
2718            fs.clone(),
2719            &mut cx_b.to_async(),
2720        )
2721        .await
2722        .unwrap();
2723
2724        let buffer_b = cx_b
2725            .background()
2726            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2727            .await
2728            .unwrap();
2729        buffer_b.update(cx_b, |buffer, cx| {
2730            buffer.edit([(4..7, "six")], cx);
2731            buffer.edit([(10..11, "6")], cx);
2732            assert_eq!(buffer.text(), "let six = 6;");
2733            assert!(buffer.is_dirty());
2734            assert!(!buffer.has_conflict());
2735        });
2736        buffer_a
2737            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
2738            .await;
2739
2740        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
2741            .await
2742            .unwrap();
2743        buffer_a
2744            .condition(cx_a, |buffer, _| buffer.has_conflict())
2745            .await;
2746        buffer_b
2747            .condition(cx_b, |buffer, _| buffer.has_conflict())
2748            .await;
2749
2750        project_b
2751            .update(cx_b, |project, cx| {
2752                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
2753            })
2754            .await
2755            .unwrap();
2756        buffer_a.read_with(cx_a, |buffer, _| {
2757            assert_eq!(buffer.text(), "let seven = 7;");
2758            assert!(!buffer.is_dirty());
2759            assert!(!buffer.has_conflict());
2760        });
2761        buffer_b.read_with(cx_b, |buffer, _| {
2762            assert_eq!(buffer.text(), "let seven = 7;");
2763            assert!(!buffer.is_dirty());
2764            assert!(!buffer.has_conflict());
2765        });
2766
2767        buffer_a.update(cx_a, |buffer, cx| {
2768            // Undoing on the host is a no-op when the reload was initiated by the guest.
2769            buffer.undo(cx);
2770            assert_eq!(buffer.text(), "let seven = 7;");
2771            assert!(!buffer.is_dirty());
2772            assert!(!buffer.has_conflict());
2773        });
2774        buffer_b.update(cx_b, |buffer, cx| {
2775            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
2776            buffer.undo(cx);
2777            assert_eq!(buffer.text(), "let six = 6;");
2778            assert!(buffer.is_dirty());
2779            assert!(!buffer.has_conflict());
2780        });
2781    }
2782
2783    #[gpui::test(iterations = 10)]
2784    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2785        cx_a.foreground().forbid_parking();
2786        let lang_registry = Arc::new(LanguageRegistry::test());
2787        let fs = FakeFs::new(cx_a.background());
2788
2789        // Set up a fake language server.
2790        let mut language = Language::new(
2791            LanguageConfig {
2792                name: "Rust".into(),
2793                path_suffixes: vec!["rs".to_string()],
2794                ..Default::default()
2795            },
2796            Some(tree_sitter_rust::language()),
2797        );
2798        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2799        lang_registry.add(Arc::new(language));
2800
2801        // Connect to a server as 2 clients.
2802        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2803        let client_a = server.create_client(cx_a, "user_a").await;
2804        let client_b = server.create_client(cx_b, "user_b").await;
2805
2806        // Share a project as client A
2807        fs.insert_tree(
2808            "/a",
2809            json!({
2810                ".zed.toml": r#"collaborators = ["user_b"]"#,
2811                "a.rs": "let one = two",
2812            }),
2813        )
2814        .await;
2815        let project_a = cx_a.update(|cx| {
2816            Project::local(
2817                client_a.clone(),
2818                client_a.user_store.clone(),
2819                lang_registry.clone(),
2820                fs.clone(),
2821                cx,
2822            )
2823        });
2824        let (worktree_a, _) = project_a
2825            .update(cx_a, |p, cx| {
2826                p.find_or_create_local_worktree("/a", true, cx)
2827            })
2828            .await
2829            .unwrap();
2830        worktree_a
2831            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2832            .await;
2833        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2834        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2835        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2836
2837        // Join the worktree as client B.
2838        let project_b = Project::remote(
2839            project_id,
2840            client_b.clone(),
2841            client_b.user_store.clone(),
2842            lang_registry.clone(),
2843            fs.clone(),
2844            &mut cx_b.to_async(),
2845        )
2846        .await
2847        .unwrap();
2848
2849        let buffer_b = cx_b
2850            .background()
2851            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2852            .await
2853            .unwrap();
2854
2855        let fake_language_server = fake_language_servers.next().await.unwrap();
2856        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
2857            Ok(Some(vec![
2858                lsp::TextEdit {
2859                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2860                    new_text: "h".to_string(),
2861                },
2862                lsp::TextEdit {
2863                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2864                    new_text: "y".to_string(),
2865                },
2866            ]))
2867        });
2868
2869        project_b
2870            .update(cx_b, |project, cx| {
2871                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2872            })
2873            .await
2874            .unwrap();
2875        assert_eq!(
2876            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2877            "let honey = two"
2878        );
2879    }
2880
2881    #[gpui::test(iterations = 10)]
2882    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2883        cx_a.foreground().forbid_parking();
2884        let lang_registry = Arc::new(LanguageRegistry::test());
2885        let fs = FakeFs::new(cx_a.background());
2886        fs.insert_tree(
2887            "/root-1",
2888            json!({
2889                ".zed.toml": r#"collaborators = ["user_b"]"#,
2890                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2891            }),
2892        )
2893        .await;
2894        fs.insert_tree(
2895            "/root-2",
2896            json!({
2897                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2898            }),
2899        )
2900        .await;
2901
2902        // Set up a fake language server.
2903        let mut language = Language::new(
2904            LanguageConfig {
2905                name: "Rust".into(),
2906                path_suffixes: vec!["rs".to_string()],
2907                ..Default::default()
2908            },
2909            Some(tree_sitter_rust::language()),
2910        );
2911        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2912        lang_registry.add(Arc::new(language));
2913
2914        // Connect to a server as 2 clients.
2915        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2916        let client_a = server.create_client(cx_a, "user_a").await;
2917        let client_b = server.create_client(cx_b, "user_b").await;
2918
2919        // Share a project as client A
2920        let project_a = cx_a.update(|cx| {
2921            Project::local(
2922                client_a.clone(),
2923                client_a.user_store.clone(),
2924                lang_registry.clone(),
2925                fs.clone(),
2926                cx,
2927            )
2928        });
2929        let (worktree_a, _) = project_a
2930            .update(cx_a, |p, cx| {
2931                p.find_or_create_local_worktree("/root-1", true, cx)
2932            })
2933            .await
2934            .unwrap();
2935        worktree_a
2936            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2937            .await;
2938        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2939        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2940        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2941
2942        // Join the worktree as client B.
2943        let project_b = Project::remote(
2944            project_id,
2945            client_b.clone(),
2946            client_b.user_store.clone(),
2947            lang_registry.clone(),
2948            fs.clone(),
2949            &mut cx_b.to_async(),
2950        )
2951        .await
2952        .unwrap();
2953
2954        // Open the file on client B.
2955        let buffer_b = cx_b
2956            .background()
2957            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2958            .await
2959            .unwrap();
2960
2961        // Request the definition of a symbol as the guest.
2962        let fake_language_server = fake_language_servers.next().await.unwrap();
2963        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2964            |_, _| async move {
2965                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2966                    lsp::Location::new(
2967                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2968                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2969                    ),
2970                )))
2971            },
2972        );
2973
2974        let definitions_1 = project_b
2975            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2976            .await
2977            .unwrap();
2978        cx_b.read(|cx| {
2979            assert_eq!(definitions_1.len(), 1);
2980            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2981            let target_buffer = definitions_1[0].buffer.read(cx);
2982            assert_eq!(
2983                target_buffer.text(),
2984                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2985            );
2986            assert_eq!(
2987                definitions_1[0].range.to_point(target_buffer),
2988                Point::new(0, 6)..Point::new(0, 9)
2989            );
2990        });
2991
2992        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2993        // the previous call to `definition`.
2994        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2995            |_, _| async move {
2996                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2997                    lsp::Location::new(
2998                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2999                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3000                    ),
3001                )))
3002            },
3003        );
3004
3005        let definitions_2 = project_b
3006            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3007            .await
3008            .unwrap();
3009        cx_b.read(|cx| {
3010            assert_eq!(definitions_2.len(), 1);
3011            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3012            let target_buffer = definitions_2[0].buffer.read(cx);
3013            assert_eq!(
3014                target_buffer.text(),
3015                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3016            );
3017            assert_eq!(
3018                definitions_2[0].range.to_point(target_buffer),
3019                Point::new(1, 6)..Point::new(1, 11)
3020            );
3021        });
3022        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3023    }
3024
3025    #[gpui::test(iterations = 10)]
3026    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3027        cx_a.foreground().forbid_parking();
3028        let lang_registry = Arc::new(LanguageRegistry::test());
3029        let fs = FakeFs::new(cx_a.background());
3030        fs.insert_tree(
3031            "/root-1",
3032            json!({
3033                ".zed.toml": r#"collaborators = ["user_b"]"#,
3034                "one.rs": "const ONE: usize = 1;",
3035                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3036            }),
3037        )
3038        .await;
3039        fs.insert_tree(
3040            "/root-2",
3041            json!({
3042                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3043            }),
3044        )
3045        .await;
3046
3047        // Set up a fake language server.
3048        let mut language = Language::new(
3049            LanguageConfig {
3050                name: "Rust".into(),
3051                path_suffixes: vec!["rs".to_string()],
3052                ..Default::default()
3053            },
3054            Some(tree_sitter_rust::language()),
3055        );
3056        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3057        lang_registry.add(Arc::new(language));
3058
3059        // Connect to a server as 2 clients.
3060        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3061        let client_a = server.create_client(cx_a, "user_a").await;
3062        let client_b = server.create_client(cx_b, "user_b").await;
3063
3064        // Share a project as client A
3065        let project_a = cx_a.update(|cx| {
3066            Project::local(
3067                client_a.clone(),
3068                client_a.user_store.clone(),
3069                lang_registry.clone(),
3070                fs.clone(),
3071                cx,
3072            )
3073        });
3074        let (worktree_a, _) = project_a
3075            .update(cx_a, |p, cx| {
3076                p.find_or_create_local_worktree("/root-1", true, cx)
3077            })
3078            .await
3079            .unwrap();
3080        worktree_a
3081            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3082            .await;
3083        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3084        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3085        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3086
3087        // Join the worktree as client B.
3088        let project_b = Project::remote(
3089            project_id,
3090            client_b.clone(),
3091            client_b.user_store.clone(),
3092            lang_registry.clone(),
3093            fs.clone(),
3094            &mut cx_b.to_async(),
3095        )
3096        .await
3097        .unwrap();
3098
3099        // Open the file on client B.
3100        let buffer_b = cx_b
3101            .background()
3102            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3103            .await
3104            .unwrap();
3105
3106        // Request references to a symbol as the guest.
3107        let fake_language_server = fake_language_servers.next().await.unwrap();
3108        fake_language_server.handle_request::<lsp::request::References, _, _>(
3109            |params, _| async move {
3110                assert_eq!(
3111                    params.text_document_position.text_document.uri.as_str(),
3112                    "file:///root-1/one.rs"
3113                );
3114                Ok(Some(vec![
3115                    lsp::Location {
3116                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3117                        range: lsp::Range::new(
3118                            lsp::Position::new(0, 24),
3119                            lsp::Position::new(0, 27),
3120                        ),
3121                    },
3122                    lsp::Location {
3123                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3124                        range: lsp::Range::new(
3125                            lsp::Position::new(0, 35),
3126                            lsp::Position::new(0, 38),
3127                        ),
3128                    },
3129                    lsp::Location {
3130                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3131                        range: lsp::Range::new(
3132                            lsp::Position::new(0, 37),
3133                            lsp::Position::new(0, 40),
3134                        ),
3135                    },
3136                ]))
3137            },
3138        );
3139
3140        let references = project_b
3141            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3142            .await
3143            .unwrap();
3144        cx_b.read(|cx| {
3145            assert_eq!(references.len(), 3);
3146            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3147
3148            let two_buffer = references[0].buffer.read(cx);
3149            let three_buffer = references[2].buffer.read(cx);
3150            assert_eq!(
3151                two_buffer.file().unwrap().path().as_ref(),
3152                Path::new("two.rs")
3153            );
3154            assert_eq!(references[1].buffer, references[0].buffer);
3155            assert_eq!(
3156                three_buffer.file().unwrap().full_path(cx),
3157                Path::new("three.rs")
3158            );
3159
3160            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3161            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3162            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3163        });
3164    }
3165
3166    #[gpui::test(iterations = 10)]
3167    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3168        cx_a.foreground().forbid_parking();
3169        let lang_registry = Arc::new(LanguageRegistry::test());
3170        let fs = FakeFs::new(cx_a.background());
3171        fs.insert_tree(
3172            "/root-1",
3173            json!({
3174                ".zed.toml": r#"collaborators = ["user_b"]"#,
3175                "a": "hello world",
3176                "b": "goodnight moon",
3177                "c": "a world of goo",
3178                "d": "world champion of clown world",
3179            }),
3180        )
3181        .await;
3182        fs.insert_tree(
3183            "/root-2",
3184            json!({
3185                "e": "disney world is fun",
3186            }),
3187        )
3188        .await;
3189
3190        // Connect to a server as 2 clients.
3191        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3192        let client_a = server.create_client(cx_a, "user_a").await;
3193        let client_b = server.create_client(cx_b, "user_b").await;
3194
3195        // Share a project as client A
3196        let project_a = cx_a.update(|cx| {
3197            Project::local(
3198                client_a.clone(),
3199                client_a.user_store.clone(),
3200                lang_registry.clone(),
3201                fs.clone(),
3202                cx,
3203            )
3204        });
3205        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3206
3207        let (worktree_1, _) = project_a
3208            .update(cx_a, |p, cx| {
3209                p.find_or_create_local_worktree("/root-1", true, cx)
3210            })
3211            .await
3212            .unwrap();
3213        worktree_1
3214            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3215            .await;
3216        let (worktree_2, _) = project_a
3217            .update(cx_a, |p, cx| {
3218                p.find_or_create_local_worktree("/root-2", true, cx)
3219            })
3220            .await
3221            .unwrap();
3222        worktree_2
3223            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3224            .await;
3225
3226        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3227
3228        // Join the worktree as client B.
3229        let project_b = Project::remote(
3230            project_id,
3231            client_b.clone(),
3232            client_b.user_store.clone(),
3233            lang_registry.clone(),
3234            fs.clone(),
3235            &mut cx_b.to_async(),
3236        )
3237        .await
3238        .unwrap();
3239
3240        let results = project_b
3241            .update(cx_b, |project, cx| {
3242                project.search(SearchQuery::text("world", false, false), cx)
3243            })
3244            .await
3245            .unwrap();
3246
3247        let mut ranges_by_path = results
3248            .into_iter()
3249            .map(|(buffer, ranges)| {
3250                buffer.read_with(cx_b, |buffer, cx| {
3251                    let path = buffer.file().unwrap().full_path(cx);
3252                    let offset_ranges = ranges
3253                        .into_iter()
3254                        .map(|range| range.to_offset(buffer))
3255                        .collect::<Vec<_>>();
3256                    (path, offset_ranges)
3257                })
3258            })
3259            .collect::<Vec<_>>();
3260        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3261
3262        assert_eq!(
3263            ranges_by_path,
3264            &[
3265                (PathBuf::from("root-1/a"), vec![6..11]),
3266                (PathBuf::from("root-1/c"), vec![2..7]),
3267                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3268                (PathBuf::from("root-2/e"), vec![7..12]),
3269            ]
3270        );
3271    }
3272
3273    #[gpui::test(iterations = 10)]
3274    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3275        cx_a.foreground().forbid_parking();
3276        let lang_registry = Arc::new(LanguageRegistry::test());
3277        let fs = FakeFs::new(cx_a.background());
3278        fs.insert_tree(
3279            "/root-1",
3280            json!({
3281                ".zed.toml": r#"collaborators = ["user_b"]"#,
3282                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3283            }),
3284        )
3285        .await;
3286
3287        // Set up a fake language server.
3288        let mut language = Language::new(
3289            LanguageConfig {
3290                name: "Rust".into(),
3291                path_suffixes: vec!["rs".to_string()],
3292                ..Default::default()
3293            },
3294            Some(tree_sitter_rust::language()),
3295        );
3296        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3297        lang_registry.add(Arc::new(language));
3298
3299        // Connect to a server as 2 clients.
3300        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3301        let client_a = server.create_client(cx_a, "user_a").await;
3302        let client_b = server.create_client(cx_b, "user_b").await;
3303
3304        // Share a project as client A
3305        let project_a = cx_a.update(|cx| {
3306            Project::local(
3307                client_a.clone(),
3308                client_a.user_store.clone(),
3309                lang_registry.clone(),
3310                fs.clone(),
3311                cx,
3312            )
3313        });
3314        let (worktree_a, _) = project_a
3315            .update(cx_a, |p, cx| {
3316                p.find_or_create_local_worktree("/root-1", true, cx)
3317            })
3318            .await
3319            .unwrap();
3320        worktree_a
3321            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3322            .await;
3323        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3324        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3325        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3326
3327        // Join the worktree as client B.
3328        let project_b = Project::remote(
3329            project_id,
3330            client_b.clone(),
3331            client_b.user_store.clone(),
3332            lang_registry.clone(),
3333            fs.clone(),
3334            &mut cx_b.to_async(),
3335        )
3336        .await
3337        .unwrap();
3338
3339        // Open the file on client B.
3340        let buffer_b = cx_b
3341            .background()
3342            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3343            .await
3344            .unwrap();
3345
3346        // Request document highlights as the guest.
3347        let fake_language_server = fake_language_servers.next().await.unwrap();
3348        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3349            |params, _| async move {
3350                assert_eq!(
3351                    params
3352                        .text_document_position_params
3353                        .text_document
3354                        .uri
3355                        .as_str(),
3356                    "file:///root-1/main.rs"
3357                );
3358                assert_eq!(
3359                    params.text_document_position_params.position,
3360                    lsp::Position::new(0, 34)
3361                );
3362                Ok(Some(vec![
3363                    lsp::DocumentHighlight {
3364                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3365                        range: lsp::Range::new(
3366                            lsp::Position::new(0, 10),
3367                            lsp::Position::new(0, 16),
3368                        ),
3369                    },
3370                    lsp::DocumentHighlight {
3371                        kind: Some(lsp::DocumentHighlightKind::READ),
3372                        range: lsp::Range::new(
3373                            lsp::Position::new(0, 32),
3374                            lsp::Position::new(0, 38),
3375                        ),
3376                    },
3377                    lsp::DocumentHighlight {
3378                        kind: Some(lsp::DocumentHighlightKind::READ),
3379                        range: lsp::Range::new(
3380                            lsp::Position::new(0, 41),
3381                            lsp::Position::new(0, 47),
3382                        ),
3383                    },
3384                ]))
3385            },
3386        );
3387
3388        let highlights = project_b
3389            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3390            .await
3391            .unwrap();
3392        buffer_b.read_with(cx_b, |buffer, _| {
3393            let snapshot = buffer.snapshot();
3394
3395            let highlights = highlights
3396                .into_iter()
3397                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3398                .collect::<Vec<_>>();
3399            assert_eq!(
3400                highlights,
3401                &[
3402                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3403                    (lsp::DocumentHighlightKind::READ, 32..38),
3404                    (lsp::DocumentHighlightKind::READ, 41..47)
3405                ]
3406            )
3407        });
3408    }
3409
3410    #[gpui::test(iterations = 10)]
3411    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3412        cx_a.foreground().forbid_parking();
3413        let lang_registry = Arc::new(LanguageRegistry::test());
3414        let fs = FakeFs::new(cx_a.background());
3415        fs.insert_tree(
3416            "/code",
3417            json!({
3418                "crate-1": {
3419                    ".zed.toml": r#"collaborators = ["user_b"]"#,
3420                    "one.rs": "const ONE: usize = 1;",
3421                },
3422                "crate-2": {
3423                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3424                },
3425                "private": {
3426                    "passwords.txt": "the-password",
3427                }
3428            }),
3429        )
3430        .await;
3431
3432        // Set up a fake language server.
3433        let mut language = Language::new(
3434            LanguageConfig {
3435                name: "Rust".into(),
3436                path_suffixes: vec!["rs".to_string()],
3437                ..Default::default()
3438            },
3439            Some(tree_sitter_rust::language()),
3440        );
3441        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3442        lang_registry.add(Arc::new(language));
3443
3444        // Connect to a server as 2 clients.
3445        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3446        let client_a = server.create_client(cx_a, "user_a").await;
3447        let client_b = server.create_client(cx_b, "user_b").await;
3448
3449        // Share a project as client A
3450        let project_a = cx_a.update(|cx| {
3451            Project::local(
3452                client_a.clone(),
3453                client_a.user_store.clone(),
3454                lang_registry.clone(),
3455                fs.clone(),
3456                cx,
3457            )
3458        });
3459        let (worktree_a, _) = project_a
3460            .update(cx_a, |p, cx| {
3461                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3462            })
3463            .await
3464            .unwrap();
3465        worktree_a
3466            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3467            .await;
3468        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3469        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3470        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3471
3472        // Join the worktree as client B.
3473        let project_b = Project::remote(
3474            project_id,
3475            client_b.clone(),
3476            client_b.user_store.clone(),
3477            lang_registry.clone(),
3478            fs.clone(),
3479            &mut cx_b.to_async(),
3480        )
3481        .await
3482        .unwrap();
3483
3484        // Cause the language server to start.
3485        let _buffer = cx_b
3486            .background()
3487            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3488            .await
3489            .unwrap();
3490
3491        let fake_language_server = fake_language_servers.next().await.unwrap();
3492        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3493            |_, _| async move {
3494                #[allow(deprecated)]
3495                Ok(Some(vec![lsp::SymbolInformation {
3496                    name: "TWO".into(),
3497                    location: lsp::Location {
3498                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3499                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3500                    },
3501                    kind: lsp::SymbolKind::CONSTANT,
3502                    tags: None,
3503                    container_name: None,
3504                    deprecated: None,
3505                }]))
3506            },
3507        );
3508
3509        // Request the definition of a symbol as the guest.
3510        let symbols = project_b
3511            .update(cx_b, |p, cx| p.symbols("two", cx))
3512            .await
3513            .unwrap();
3514        assert_eq!(symbols.len(), 1);
3515        assert_eq!(symbols[0].name, "TWO");
3516
3517        // Open one of the returned symbols.
3518        let buffer_b_2 = project_b
3519            .update(cx_b, |project, cx| {
3520                project.open_buffer_for_symbol(&symbols[0], cx)
3521            })
3522            .await
3523            .unwrap();
3524        buffer_b_2.read_with(cx_b, |buffer, _| {
3525            assert_eq!(
3526                buffer.file().unwrap().path().as_ref(),
3527                Path::new("../crate-2/two.rs")
3528            );
3529        });
3530
3531        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3532        let mut fake_symbol = symbols[0].clone();
3533        fake_symbol.path = Path::new("/code/secrets").into();
3534        let error = project_b
3535            .update(cx_b, |project, cx| {
3536                project.open_buffer_for_symbol(&fake_symbol, cx)
3537            })
3538            .await
3539            .unwrap_err();
3540        assert!(error.to_string().contains("invalid symbol signature"));
3541    }
3542
3543    #[gpui::test(iterations = 10)]
3544    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3545        cx_a: &mut TestAppContext,
3546        cx_b: &mut TestAppContext,
3547        mut rng: StdRng,
3548    ) {
3549        cx_a.foreground().forbid_parking();
3550        let lang_registry = Arc::new(LanguageRegistry::test());
3551        let fs = FakeFs::new(cx_a.background());
3552        fs.insert_tree(
3553            "/root",
3554            json!({
3555                ".zed.toml": r#"collaborators = ["user_b"]"#,
3556                "a.rs": "const ONE: usize = b::TWO;",
3557                "b.rs": "const TWO: usize = 2",
3558            }),
3559        )
3560        .await;
3561
3562        // Set up a fake language server.
3563        let mut language = Language::new(
3564            LanguageConfig {
3565                name: "Rust".into(),
3566                path_suffixes: vec!["rs".to_string()],
3567                ..Default::default()
3568            },
3569            Some(tree_sitter_rust::language()),
3570        );
3571        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3572        lang_registry.add(Arc::new(language));
3573
3574        // Connect to a server as 2 clients.
3575        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3576        let client_a = server.create_client(cx_a, "user_a").await;
3577        let client_b = server.create_client(cx_b, "user_b").await;
3578
3579        // Share a project as client A
3580        let project_a = cx_a.update(|cx| {
3581            Project::local(
3582                client_a.clone(),
3583                client_a.user_store.clone(),
3584                lang_registry.clone(),
3585                fs.clone(),
3586                cx,
3587            )
3588        });
3589
3590        let (worktree_a, _) = project_a
3591            .update(cx_a, |p, cx| {
3592                p.find_or_create_local_worktree("/root", true, cx)
3593            })
3594            .await
3595            .unwrap();
3596        worktree_a
3597            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3598            .await;
3599        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3600        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3601        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3602
3603        // Join the worktree as client B.
3604        let project_b = Project::remote(
3605            project_id,
3606            client_b.clone(),
3607            client_b.user_store.clone(),
3608            lang_registry.clone(),
3609            fs.clone(),
3610            &mut cx_b.to_async(),
3611        )
3612        .await
3613        .unwrap();
3614
3615        let buffer_b1 = cx_b
3616            .background()
3617            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3618            .await
3619            .unwrap();
3620
3621        let fake_language_server = fake_language_servers.next().await.unwrap();
3622        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3623            |_, _| async move {
3624                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3625                    lsp::Location::new(
3626                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
3627                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3628                    ),
3629                )))
3630            },
3631        );
3632
3633        let definitions;
3634        let buffer_b2;
3635        if rng.gen() {
3636            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3637            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3638        } else {
3639            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3640            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3641        }
3642
3643        let buffer_b2 = buffer_b2.await.unwrap();
3644        let definitions = definitions.await.unwrap();
3645        assert_eq!(definitions.len(), 1);
3646        assert_eq!(definitions[0].buffer, buffer_b2);
3647    }
3648
3649    #[gpui::test(iterations = 10)]
3650    async fn test_collaborating_with_code_actions(
3651        cx_a: &mut TestAppContext,
3652        cx_b: &mut TestAppContext,
3653    ) {
3654        cx_a.foreground().forbid_parking();
3655        let lang_registry = Arc::new(LanguageRegistry::test());
3656        let fs = FakeFs::new(cx_a.background());
3657        cx_b.update(|cx| editor::init(cx));
3658
3659        // Set up a fake language server.
3660        let mut language = Language::new(
3661            LanguageConfig {
3662                name: "Rust".into(),
3663                path_suffixes: vec!["rs".to_string()],
3664                ..Default::default()
3665            },
3666            Some(tree_sitter_rust::language()),
3667        );
3668        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3669        lang_registry.add(Arc::new(language));
3670
3671        // Connect to a server as 2 clients.
3672        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3673        let client_a = server.create_client(cx_a, "user_a").await;
3674        let client_b = server.create_client(cx_b, "user_b").await;
3675
3676        // Share a project as client A
3677        fs.insert_tree(
3678            "/a",
3679            json!({
3680                ".zed.toml": r#"collaborators = ["user_b"]"#,
3681                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3682                "other.rs": "pub fn foo() -> usize { 4 }",
3683            }),
3684        )
3685        .await;
3686        let project_a = cx_a.update(|cx| {
3687            Project::local(
3688                client_a.clone(),
3689                client_a.user_store.clone(),
3690                lang_registry.clone(),
3691                fs.clone(),
3692                cx,
3693            )
3694        });
3695        let (worktree_a, _) = project_a
3696            .update(cx_a, |p, cx| {
3697                p.find_or_create_local_worktree("/a", true, cx)
3698            })
3699            .await
3700            .unwrap();
3701        worktree_a
3702            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3703            .await;
3704        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3705        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3706        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3707
3708        // Join the worktree as client B.
3709        let project_b = Project::remote(
3710            project_id,
3711            client_b.clone(),
3712            client_b.user_store.clone(),
3713            lang_registry.clone(),
3714            fs.clone(),
3715            &mut cx_b.to_async(),
3716        )
3717        .await
3718        .unwrap();
3719        let mut params = cx_b.update(WorkspaceParams::test);
3720        params.languages = lang_registry.clone();
3721        params.client = client_b.client.clone();
3722        params.user_store = client_b.user_store.clone();
3723        params.project = project_b;
3724
3725        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3726        let editor_b = workspace_b
3727            .update(cx_b, |workspace, cx| {
3728                workspace.open_path((worktree_id, "main.rs"), cx)
3729            })
3730            .await
3731            .unwrap()
3732            .downcast::<Editor>()
3733            .unwrap();
3734
3735        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3736        fake_language_server
3737            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3738                assert_eq!(
3739                    params.text_document.uri,
3740                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3741                );
3742                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3743                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3744                Ok(None)
3745            })
3746            .next()
3747            .await;
3748
3749        // Move cursor to a location that contains code actions.
3750        editor_b.update(cx_b, |editor, cx| {
3751            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3752            cx.focus(&editor_b);
3753        });
3754
3755        fake_language_server
3756            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3757                assert_eq!(
3758                    params.text_document.uri,
3759                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3760                );
3761                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3762                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3763
3764                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
3765                    lsp::CodeAction {
3766                        title: "Inline into all callers".to_string(),
3767                        edit: Some(lsp::WorkspaceEdit {
3768                            changes: Some(
3769                                [
3770                                    (
3771                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3772                                        vec![lsp::TextEdit::new(
3773                                            lsp::Range::new(
3774                                                lsp::Position::new(1, 22),
3775                                                lsp::Position::new(1, 34),
3776                                            ),
3777                                            "4".to_string(),
3778                                        )],
3779                                    ),
3780                                    (
3781                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3782                                        vec![lsp::TextEdit::new(
3783                                            lsp::Range::new(
3784                                                lsp::Position::new(0, 0),
3785                                                lsp::Position::new(0, 27),
3786                                            ),
3787                                            "".to_string(),
3788                                        )],
3789                                    ),
3790                                ]
3791                                .into_iter()
3792                                .collect(),
3793                            ),
3794                            ..Default::default()
3795                        }),
3796                        data: Some(json!({
3797                            "codeActionParams": {
3798                                "range": {
3799                                    "start": {"line": 1, "column": 31},
3800                                    "end": {"line": 1, "column": 31},
3801                                }
3802                            }
3803                        })),
3804                        ..Default::default()
3805                    },
3806                )]))
3807            })
3808            .next()
3809            .await;
3810
3811        // Toggle code actions and wait for them to display.
3812        editor_b.update(cx_b, |editor, cx| {
3813            editor.toggle_code_actions(
3814                &ToggleCodeActions {
3815                    deployed_from_indicator: false,
3816                },
3817                cx,
3818            );
3819        });
3820        editor_b
3821            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3822            .await;
3823
3824        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3825
3826        // Confirming the code action will trigger a resolve request.
3827        let confirm_action = workspace_b
3828            .update(cx_b, |workspace, cx| {
3829                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
3830            })
3831            .unwrap();
3832        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
3833            |_, _| async move {
3834                Ok(lsp::CodeAction {
3835                    title: "Inline into all callers".to_string(),
3836                    edit: Some(lsp::WorkspaceEdit {
3837                        changes: Some(
3838                            [
3839                                (
3840                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3841                                    vec![lsp::TextEdit::new(
3842                                        lsp::Range::new(
3843                                            lsp::Position::new(1, 22),
3844                                            lsp::Position::new(1, 34),
3845                                        ),
3846                                        "4".to_string(),
3847                                    )],
3848                                ),
3849                                (
3850                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
3851                                    vec![lsp::TextEdit::new(
3852                                        lsp::Range::new(
3853                                            lsp::Position::new(0, 0),
3854                                            lsp::Position::new(0, 27),
3855                                        ),
3856                                        "".to_string(),
3857                                    )],
3858                                ),
3859                            ]
3860                            .into_iter()
3861                            .collect(),
3862                        ),
3863                        ..Default::default()
3864                    }),
3865                    ..Default::default()
3866                })
3867            },
3868        );
3869
3870        // After the action is confirmed, an editor containing both modified files is opened.
3871        confirm_action.await.unwrap();
3872        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3873            workspace
3874                .active_item(cx)
3875                .unwrap()
3876                .downcast::<Editor>()
3877                .unwrap()
3878        });
3879        code_action_editor.update(cx_b, |editor, cx| {
3880            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
3881            editor.undo(&Undo, cx);
3882            assert_eq!(
3883                editor.text(cx),
3884                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
3885            );
3886            editor.redo(&Redo, cx);
3887            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
3888        });
3889    }
3890
3891    #[gpui::test(iterations = 10)]
3892    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3893        cx_a.foreground().forbid_parking();
3894        let lang_registry = Arc::new(LanguageRegistry::test());
3895        let fs = FakeFs::new(cx_a.background());
3896        cx_b.update(|cx| editor::init(cx));
3897
3898        // Set up a fake language server.
3899        let mut language = Language::new(
3900            LanguageConfig {
3901                name: "Rust".into(),
3902                path_suffixes: vec!["rs".to_string()],
3903                ..Default::default()
3904            },
3905            Some(tree_sitter_rust::language()),
3906        );
3907        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
3908            capabilities: lsp::ServerCapabilities {
3909                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
3910                    prepare_provider: Some(true),
3911                    work_done_progress_options: Default::default(),
3912                })),
3913                ..Default::default()
3914            },
3915            ..Default::default()
3916        });
3917        lang_registry.add(Arc::new(language));
3918
3919        // Connect to a server as 2 clients.
3920        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3921        let client_a = server.create_client(cx_a, "user_a").await;
3922        let client_b = server.create_client(cx_b, "user_b").await;
3923
3924        // Share a project as client A
3925        fs.insert_tree(
3926            "/dir",
3927            json!({
3928                ".zed.toml": r#"collaborators = ["user_b"]"#,
3929                "one.rs": "const ONE: usize = 1;",
3930                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3931            }),
3932        )
3933        .await;
3934        let project_a = cx_a.update(|cx| {
3935            Project::local(
3936                client_a.clone(),
3937                client_a.user_store.clone(),
3938                lang_registry.clone(),
3939                fs.clone(),
3940                cx,
3941            )
3942        });
3943        let (worktree_a, _) = project_a
3944            .update(cx_a, |p, cx| {
3945                p.find_or_create_local_worktree("/dir", true, cx)
3946            })
3947            .await
3948            .unwrap();
3949        worktree_a
3950            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3951            .await;
3952        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3953        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3954        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3955
3956        // Join the worktree as client B.
3957        let project_b = Project::remote(
3958            project_id,
3959            client_b.clone(),
3960            client_b.user_store.clone(),
3961            lang_registry.clone(),
3962            fs.clone(),
3963            &mut cx_b.to_async(),
3964        )
3965        .await
3966        .unwrap();
3967        let mut params = cx_b.update(WorkspaceParams::test);
3968        params.languages = lang_registry.clone();
3969        params.client = client_b.client.clone();
3970        params.user_store = client_b.user_store.clone();
3971        params.project = project_b;
3972
3973        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3974        let editor_b = workspace_b
3975            .update(cx_b, |workspace, cx| {
3976                workspace.open_path((worktree_id, "one.rs"), cx)
3977            })
3978            .await
3979            .unwrap()
3980            .downcast::<Editor>()
3981            .unwrap();
3982        let fake_language_server = fake_language_servers.next().await.unwrap();
3983
3984        // Move cursor to a location that can be renamed.
3985        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3986            editor.select_ranges([7..7], None, cx);
3987            editor.rename(&Rename, cx).unwrap()
3988        });
3989
3990        fake_language_server
3991            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
3992                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3993                assert_eq!(params.position, lsp::Position::new(0, 7));
3994                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3995                    lsp::Position::new(0, 6),
3996                    lsp::Position::new(0, 9),
3997                ))))
3998            })
3999            .next()
4000            .await
4001            .unwrap();
4002        prepare_rename.await.unwrap();
4003        editor_b.update(cx_b, |editor, cx| {
4004            let rename = editor.pending_rename().unwrap();
4005            let buffer = editor.buffer().read(cx).snapshot(cx);
4006            assert_eq!(
4007                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4008                6..9
4009            );
4010            rename.editor.update(cx, |rename_editor, cx| {
4011                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4012                    rename_buffer.edit([(0..3, "THREE")], cx);
4013                });
4014            });
4015        });
4016
4017        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4018            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4019        });
4020        fake_language_server
4021            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4022                assert_eq!(
4023                    params.text_document_position.text_document.uri.as_str(),
4024                    "file:///dir/one.rs"
4025                );
4026                assert_eq!(
4027                    params.text_document_position.position,
4028                    lsp::Position::new(0, 6)
4029                );
4030                assert_eq!(params.new_name, "THREE");
4031                Ok(Some(lsp::WorkspaceEdit {
4032                    changes: Some(
4033                        [
4034                            (
4035                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4036                                vec![lsp::TextEdit::new(
4037                                    lsp::Range::new(
4038                                        lsp::Position::new(0, 6),
4039                                        lsp::Position::new(0, 9),
4040                                    ),
4041                                    "THREE".to_string(),
4042                                )],
4043                            ),
4044                            (
4045                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4046                                vec![
4047                                    lsp::TextEdit::new(
4048                                        lsp::Range::new(
4049                                            lsp::Position::new(0, 24),
4050                                            lsp::Position::new(0, 27),
4051                                        ),
4052                                        "THREE".to_string(),
4053                                    ),
4054                                    lsp::TextEdit::new(
4055                                        lsp::Range::new(
4056                                            lsp::Position::new(0, 35),
4057                                            lsp::Position::new(0, 38),
4058                                        ),
4059                                        "THREE".to_string(),
4060                                    ),
4061                                ],
4062                            ),
4063                        ]
4064                        .into_iter()
4065                        .collect(),
4066                    ),
4067                    ..Default::default()
4068                }))
4069            })
4070            .next()
4071            .await
4072            .unwrap();
4073        confirm_rename.await.unwrap();
4074
4075        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4076            workspace
4077                .active_item(cx)
4078                .unwrap()
4079                .downcast::<Editor>()
4080                .unwrap()
4081        });
4082        rename_editor.update(cx_b, |editor, cx| {
4083            assert_eq!(
4084                editor.text(cx),
4085                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4086            );
4087            editor.undo(&Undo, cx);
4088            assert_eq!(
4089                editor.text(cx),
4090                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4091            );
4092            editor.redo(&Redo, cx);
4093            assert_eq!(
4094                editor.text(cx),
4095                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4096            );
4097        });
4098
4099        // Ensure temporary rename edits cannot be undone/redone.
4100        editor_b.update(cx_b, |editor, cx| {
4101            editor.undo(&Undo, cx);
4102            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4103            editor.undo(&Undo, cx);
4104            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4105            editor.redo(&Redo, cx);
4106            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4107        })
4108    }
4109
4110    #[gpui::test(iterations = 10)]
4111    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4112        cx_a.foreground().forbid_parking();
4113
4114        // Connect to a server as 2 clients.
4115        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4116        let client_a = server.create_client(cx_a, "user_a").await;
4117        let client_b = server.create_client(cx_b, "user_b").await;
4118
4119        // Create an org that includes these 2 users.
4120        let db = &server.app_state.db;
4121        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4122        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4123            .await
4124            .unwrap();
4125        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4126            .await
4127            .unwrap();
4128
4129        // Create a channel that includes all the users.
4130        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4131        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4132            .await
4133            .unwrap();
4134        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4135            .await
4136            .unwrap();
4137        db.create_channel_message(
4138            channel_id,
4139            client_b.current_user_id(&cx_b),
4140            "hello A, it's B.",
4141            OffsetDateTime::now_utc(),
4142            1,
4143        )
4144        .await
4145        .unwrap();
4146
4147        let channels_a = cx_a
4148            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4149        channels_a
4150            .condition(cx_a, |list, _| list.available_channels().is_some())
4151            .await;
4152        channels_a.read_with(cx_a, |list, _| {
4153            assert_eq!(
4154                list.available_channels().unwrap(),
4155                &[ChannelDetails {
4156                    id: channel_id.to_proto(),
4157                    name: "test-channel".to_string()
4158                }]
4159            )
4160        });
4161        let channel_a = channels_a.update(cx_a, |this, cx| {
4162            this.get_channel(channel_id.to_proto(), cx).unwrap()
4163        });
4164        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4165        channel_a
4166            .condition(&cx_a, |channel, _| {
4167                channel_messages(channel)
4168                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4169            })
4170            .await;
4171
4172        let channels_b = cx_b
4173            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4174        channels_b
4175            .condition(cx_b, |list, _| list.available_channels().is_some())
4176            .await;
4177        channels_b.read_with(cx_b, |list, _| {
4178            assert_eq!(
4179                list.available_channels().unwrap(),
4180                &[ChannelDetails {
4181                    id: channel_id.to_proto(),
4182                    name: "test-channel".to_string()
4183                }]
4184            )
4185        });
4186
4187        let channel_b = channels_b.update(cx_b, |this, cx| {
4188            this.get_channel(channel_id.to_proto(), cx).unwrap()
4189        });
4190        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4191        channel_b
4192            .condition(&cx_b, |channel, _| {
4193                channel_messages(channel)
4194                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4195            })
4196            .await;
4197
4198        channel_a
4199            .update(cx_a, |channel, cx| {
4200                channel
4201                    .send_message("oh, hi B.".to_string(), cx)
4202                    .unwrap()
4203                    .detach();
4204                let task = channel.send_message("sup".to_string(), cx).unwrap();
4205                assert_eq!(
4206                    channel_messages(channel),
4207                    &[
4208                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4209                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4210                        ("user_a".to_string(), "sup".to_string(), true)
4211                    ]
4212                );
4213                task
4214            })
4215            .await
4216            .unwrap();
4217
4218        channel_b
4219            .condition(&cx_b, |channel, _| {
4220                channel_messages(channel)
4221                    == [
4222                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4223                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4224                        ("user_a".to_string(), "sup".to_string(), false),
4225                    ]
4226            })
4227            .await;
4228
4229        assert_eq!(
4230            server
4231                .state()
4232                .await
4233                .channel(channel_id)
4234                .unwrap()
4235                .connection_ids
4236                .len(),
4237            2
4238        );
4239        cx_b.update(|_| drop(channel_b));
4240        server
4241            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4242            .await;
4243
4244        cx_a.update(|_| drop(channel_a));
4245        server
4246            .condition(|state| state.channel(channel_id).is_none())
4247            .await;
4248    }
4249
4250    #[gpui::test(iterations = 10)]
4251    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4252        cx_a.foreground().forbid_parking();
4253
4254        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4255        let client_a = server.create_client(cx_a, "user_a").await;
4256
4257        let db = &server.app_state.db;
4258        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4259        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4260        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4261            .await
4262            .unwrap();
4263        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4264            .await
4265            .unwrap();
4266
4267        let channels_a = cx_a
4268            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4269        channels_a
4270            .condition(cx_a, |list, _| list.available_channels().is_some())
4271            .await;
4272        let channel_a = channels_a.update(cx_a, |this, cx| {
4273            this.get_channel(channel_id.to_proto(), cx).unwrap()
4274        });
4275
4276        // Messages aren't allowed to be too long.
4277        channel_a
4278            .update(cx_a, |channel, cx| {
4279                let long_body = "this is long.\n".repeat(1024);
4280                channel.send_message(long_body, cx).unwrap()
4281            })
4282            .await
4283            .unwrap_err();
4284
4285        // Messages aren't allowed to be blank.
4286        channel_a.update(cx_a, |channel, cx| {
4287            channel.send_message(String::new(), cx).unwrap_err()
4288        });
4289
4290        // Leading and trailing whitespace are trimmed.
4291        channel_a
4292            .update(cx_a, |channel, cx| {
4293                channel
4294                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4295                    .unwrap()
4296            })
4297            .await
4298            .unwrap();
4299        assert_eq!(
4300            db.get_channel_messages(channel_id, 10, None)
4301                .await
4302                .unwrap()
4303                .iter()
4304                .map(|m| &m.body)
4305                .collect::<Vec<_>>(),
4306            &["surrounded by whitespace"]
4307        );
4308    }
4309
4310    #[gpui::test(iterations = 10)]
4311    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4312        cx_a.foreground().forbid_parking();
4313
4314        // Connect to a server as 2 clients.
4315        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4316        let client_a = server.create_client(cx_a, "user_a").await;
4317        let client_b = server.create_client(cx_b, "user_b").await;
4318        let mut status_b = client_b.status();
4319
4320        // Create an org that includes these 2 users.
4321        let db = &server.app_state.db;
4322        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4323        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4324            .await
4325            .unwrap();
4326        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4327            .await
4328            .unwrap();
4329
4330        // Create a channel that includes all the users.
4331        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4332        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4333            .await
4334            .unwrap();
4335        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4336            .await
4337            .unwrap();
4338        db.create_channel_message(
4339            channel_id,
4340            client_b.current_user_id(&cx_b),
4341            "hello A, it's B.",
4342            OffsetDateTime::now_utc(),
4343            2,
4344        )
4345        .await
4346        .unwrap();
4347
4348        let channels_a = cx_a
4349            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4350        channels_a
4351            .condition(cx_a, |list, _| list.available_channels().is_some())
4352            .await;
4353
4354        channels_a.read_with(cx_a, |list, _| {
4355            assert_eq!(
4356                list.available_channels().unwrap(),
4357                &[ChannelDetails {
4358                    id: channel_id.to_proto(),
4359                    name: "test-channel".to_string()
4360                }]
4361            )
4362        });
4363        let channel_a = channels_a.update(cx_a, |this, cx| {
4364            this.get_channel(channel_id.to_proto(), cx).unwrap()
4365        });
4366        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4367        channel_a
4368            .condition(&cx_a, |channel, _| {
4369                channel_messages(channel)
4370                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4371            })
4372            .await;
4373
4374        let channels_b = cx_b
4375            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4376        channels_b
4377            .condition(cx_b, |list, _| list.available_channels().is_some())
4378            .await;
4379        channels_b.read_with(cx_b, |list, _| {
4380            assert_eq!(
4381                list.available_channels().unwrap(),
4382                &[ChannelDetails {
4383                    id: channel_id.to_proto(),
4384                    name: "test-channel".to_string()
4385                }]
4386            )
4387        });
4388
4389        let channel_b = channels_b.update(cx_b, |this, cx| {
4390            this.get_channel(channel_id.to_proto(), cx).unwrap()
4391        });
4392        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4393        channel_b
4394            .condition(&cx_b, |channel, _| {
4395                channel_messages(channel)
4396                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4397            })
4398            .await;
4399
4400        // Disconnect client B, ensuring we can still access its cached channel data.
4401        server.forbid_connections();
4402        server.disconnect_client(client_b.current_user_id(&cx_b));
4403        cx_b.foreground().advance_clock(Duration::from_secs(3));
4404        while !matches!(
4405            status_b.next().await,
4406            Some(client::Status::ReconnectionError { .. })
4407        ) {}
4408
4409        channels_b.read_with(cx_b, |channels, _| {
4410            assert_eq!(
4411                channels.available_channels().unwrap(),
4412                [ChannelDetails {
4413                    id: channel_id.to_proto(),
4414                    name: "test-channel".to_string()
4415                }]
4416            )
4417        });
4418        channel_b.read_with(cx_b, |channel, _| {
4419            assert_eq!(
4420                channel_messages(channel),
4421                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4422            )
4423        });
4424
4425        // Send a message from client B while it is disconnected.
4426        channel_b
4427            .update(cx_b, |channel, cx| {
4428                let task = channel
4429                    .send_message("can you see this?".to_string(), cx)
4430                    .unwrap();
4431                assert_eq!(
4432                    channel_messages(channel),
4433                    &[
4434                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4435                        ("user_b".to_string(), "can you see this?".to_string(), true)
4436                    ]
4437                );
4438                task
4439            })
4440            .await
4441            .unwrap_err();
4442
4443        // Send a message from client A while B is disconnected.
4444        channel_a
4445            .update(cx_a, |channel, cx| {
4446                channel
4447                    .send_message("oh, hi B.".to_string(), cx)
4448                    .unwrap()
4449                    .detach();
4450                let task = channel.send_message("sup".to_string(), cx).unwrap();
4451                assert_eq!(
4452                    channel_messages(channel),
4453                    &[
4454                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4455                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4456                        ("user_a".to_string(), "sup".to_string(), true)
4457                    ]
4458                );
4459                task
4460            })
4461            .await
4462            .unwrap();
4463
4464        // Give client B a chance to reconnect.
4465        server.allow_connections();
4466        cx_b.foreground().advance_clock(Duration::from_secs(10));
4467
4468        // Verify that B sees the new messages upon reconnection, as well as the message client B
4469        // sent while offline.
4470        channel_b
4471            .condition(&cx_b, |channel, _| {
4472                channel_messages(channel)
4473                    == [
4474                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4475                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4476                        ("user_a".to_string(), "sup".to_string(), false),
4477                        ("user_b".to_string(), "can you see this?".to_string(), false),
4478                    ]
4479            })
4480            .await;
4481
4482        // Ensure client A and B can communicate normally after reconnection.
4483        channel_a
4484            .update(cx_a, |channel, cx| {
4485                channel.send_message("you online?".to_string(), cx).unwrap()
4486            })
4487            .await
4488            .unwrap();
4489        channel_b
4490            .condition(&cx_b, |channel, _| {
4491                channel_messages(channel)
4492                    == [
4493                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4494                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4495                        ("user_a".to_string(), "sup".to_string(), false),
4496                        ("user_b".to_string(), "can you see this?".to_string(), false),
4497                        ("user_a".to_string(), "you online?".to_string(), false),
4498                    ]
4499            })
4500            .await;
4501
4502        channel_b
4503            .update(cx_b, |channel, cx| {
4504                channel.send_message("yep".to_string(), cx).unwrap()
4505            })
4506            .await
4507            .unwrap();
4508        channel_a
4509            .condition(&cx_a, |channel, _| {
4510                channel_messages(channel)
4511                    == [
4512                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4513                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4514                        ("user_a".to_string(), "sup".to_string(), false),
4515                        ("user_b".to_string(), "can you see this?".to_string(), false),
4516                        ("user_a".to_string(), "you online?".to_string(), false),
4517                        ("user_b".to_string(), "yep".to_string(), false),
4518                    ]
4519            })
4520            .await;
4521    }
4522
4523    #[gpui::test(iterations = 10)]
4524    async fn test_contacts(
4525        cx_a: &mut TestAppContext,
4526        cx_b: &mut TestAppContext,
4527        cx_c: &mut TestAppContext,
4528    ) {
4529        cx_a.foreground().forbid_parking();
4530        let lang_registry = Arc::new(LanguageRegistry::test());
4531        let fs = FakeFs::new(cx_a.background());
4532
4533        // Connect to a server as 3 clients.
4534        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4535        let client_a = server.create_client(cx_a, "user_a").await;
4536        let client_b = server.create_client(cx_b, "user_b").await;
4537        let client_c = server.create_client(cx_c, "user_c").await;
4538
4539        // Share a worktree as client A.
4540        fs.insert_tree(
4541            "/a",
4542            json!({
4543                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4544            }),
4545        )
4546        .await;
4547
4548        let project_a = cx_a.update(|cx| {
4549            Project::local(
4550                client_a.clone(),
4551                client_a.user_store.clone(),
4552                lang_registry.clone(),
4553                fs.clone(),
4554                cx,
4555            )
4556        });
4557        let (worktree_a, _) = project_a
4558            .update(cx_a, |p, cx| {
4559                p.find_or_create_local_worktree("/a", true, cx)
4560            })
4561            .await
4562            .unwrap();
4563        worktree_a
4564            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4565            .await;
4566
4567        client_a
4568            .user_store
4569            .condition(&cx_a, |user_store, _| {
4570                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4571            })
4572            .await;
4573        client_b
4574            .user_store
4575            .condition(&cx_b, |user_store, _| {
4576                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4577            })
4578            .await;
4579        client_c
4580            .user_store
4581            .condition(&cx_c, |user_store, _| {
4582                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4583            })
4584            .await;
4585
4586        let project_id = project_a
4587            .update(cx_a, |project, _| project.next_remote_id())
4588            .await;
4589        project_a
4590            .update(cx_a, |project, cx| project.share(cx))
4591            .await
4592            .unwrap();
4593        client_a
4594            .user_store
4595            .condition(&cx_a, |user_store, _| {
4596                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4597            })
4598            .await;
4599        client_b
4600            .user_store
4601            .condition(&cx_b, |user_store, _| {
4602                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4603            })
4604            .await;
4605        client_c
4606            .user_store
4607            .condition(&cx_c, |user_store, _| {
4608                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4609            })
4610            .await;
4611
4612        let _project_b = Project::remote(
4613            project_id,
4614            client_b.clone(),
4615            client_b.user_store.clone(),
4616            lang_registry.clone(),
4617            fs.clone(),
4618            &mut cx_b.to_async(),
4619        )
4620        .await
4621        .unwrap();
4622
4623        client_a
4624            .user_store
4625            .condition(&cx_a, |user_store, _| {
4626                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4627            })
4628            .await;
4629        client_b
4630            .user_store
4631            .condition(&cx_b, |user_store, _| {
4632                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4633            })
4634            .await;
4635        client_c
4636            .user_store
4637            .condition(&cx_c, |user_store, _| {
4638                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4639            })
4640            .await;
4641
4642        project_a
4643            .condition(&cx_a, |project, _| {
4644                project.collaborators().contains_key(&client_b.peer_id)
4645            })
4646            .await;
4647
4648        cx_a.update(move |_| drop(project_a));
4649        client_a
4650            .user_store
4651            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4652            .await;
4653        client_b
4654            .user_store
4655            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4656            .await;
4657        client_c
4658            .user_store
4659            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4660            .await;
4661
4662        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
4663            user_store
4664                .contacts()
4665                .iter()
4666                .map(|contact| {
4667                    let worktrees = contact
4668                        .projects
4669                        .iter()
4670                        .map(|p| {
4671                            (
4672                                p.worktree_root_names[0].as_str(),
4673                                p.is_shared,
4674                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4675                            )
4676                        })
4677                        .collect();
4678                    (contact.user.github_login.as_str(), worktrees)
4679                })
4680                .collect()
4681        }
4682    }
4683
4684    #[gpui::test(iterations = 10)]
4685    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4686        cx_a.foreground().forbid_parking();
4687        let fs = FakeFs::new(cx_a.background());
4688
4689        // 2 clients connect to a server.
4690        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4691        let mut client_a = server.create_client(cx_a, "user_a").await;
4692        let mut client_b = server.create_client(cx_b, "user_b").await;
4693        cx_a.update(editor::init);
4694        cx_b.update(editor::init);
4695
4696        // Client A shares a project.
4697        fs.insert_tree(
4698            "/a",
4699            json!({
4700                ".zed.toml": r#"collaborators = ["user_b"]"#,
4701                "1.txt": "one",
4702                "2.txt": "two",
4703                "3.txt": "three",
4704            }),
4705        )
4706        .await;
4707        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4708        project_a
4709            .update(cx_a, |project, cx| project.share(cx))
4710            .await
4711            .unwrap();
4712
4713        // Client B joins the project.
4714        let project_b = client_b
4715            .build_remote_project(
4716                project_a
4717                    .read_with(cx_a, |project, _| project.remote_id())
4718                    .unwrap(),
4719                cx_b,
4720            )
4721            .await;
4722
4723        // Client A opens some editors.
4724        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4725        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4726        let editor_a1 = workspace_a
4727            .update(cx_a, |workspace, cx| {
4728                workspace.open_path((worktree_id, "1.txt"), cx)
4729            })
4730            .await
4731            .unwrap()
4732            .downcast::<Editor>()
4733            .unwrap();
4734        let editor_a2 = workspace_a
4735            .update(cx_a, |workspace, cx| {
4736                workspace.open_path((worktree_id, "2.txt"), cx)
4737            })
4738            .await
4739            .unwrap()
4740            .downcast::<Editor>()
4741            .unwrap();
4742
4743        // Client B opens an editor.
4744        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4745        let editor_b1 = workspace_b
4746            .update(cx_b, |workspace, cx| {
4747                workspace.open_path((worktree_id, "1.txt"), cx)
4748            })
4749            .await
4750            .unwrap()
4751            .downcast::<Editor>()
4752            .unwrap();
4753
4754        let client_a_id = project_b.read_with(cx_b, |project, _| {
4755            project.collaborators().values().next().unwrap().peer_id
4756        });
4757        let client_b_id = project_a.read_with(cx_a, |project, _| {
4758            project.collaborators().values().next().unwrap().peer_id
4759        });
4760
4761        // When client B starts following client A, all visible view states are replicated to client B.
4762        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4763        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4764        workspace_b
4765            .update(cx_b, |workspace, cx| {
4766                workspace
4767                    .toggle_follow(&ToggleFollow(client_a_id), cx)
4768                    .unwrap()
4769            })
4770            .await
4771            .unwrap();
4772        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4773            workspace
4774                .active_item(cx)
4775                .unwrap()
4776                .downcast::<Editor>()
4777                .unwrap()
4778        });
4779        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4780        assert_eq!(
4781            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4782            Some((worktree_id, "2.txt").into())
4783        );
4784        assert_eq!(
4785            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4786            vec![2..3]
4787        );
4788        assert_eq!(
4789            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4790            vec![0..1]
4791        );
4792
4793        // When client A activates a different editor, client B does so as well.
4794        workspace_a.update(cx_a, |workspace, cx| {
4795            workspace.activate_item(&editor_a1, cx)
4796        });
4797        workspace_b
4798            .condition(cx_b, |workspace, cx| {
4799                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4800            })
4801            .await;
4802
4803        // When client A navigates back and forth, client B does so as well.
4804        workspace_a
4805            .update(cx_a, |workspace, cx| {
4806                workspace::Pane::go_back(workspace, None, cx)
4807            })
4808            .await;
4809        workspace_b
4810            .condition(cx_b, |workspace, cx| {
4811                workspace.active_item(cx).unwrap().id() == editor_b2.id()
4812            })
4813            .await;
4814
4815        workspace_a
4816            .update(cx_a, |workspace, cx| {
4817                workspace::Pane::go_forward(workspace, None, cx)
4818            })
4819            .await;
4820        workspace_b
4821            .condition(cx_b, |workspace, cx| {
4822                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4823            })
4824            .await;
4825
4826        // Changes to client A's editor are reflected on client B.
4827        editor_a1.update(cx_a, |editor, cx| {
4828            editor.select_ranges([1..1, 2..2], None, cx);
4829        });
4830        editor_b1
4831            .condition(cx_b, |editor, cx| {
4832                editor.selected_ranges(cx) == vec![1..1, 2..2]
4833            })
4834            .await;
4835
4836        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
4837        editor_b1
4838            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
4839            .await;
4840
4841        editor_a1.update(cx_a, |editor, cx| {
4842            editor.select_ranges([3..3], None, cx);
4843            editor.set_scroll_position(vec2f(0., 100.), cx);
4844        });
4845        editor_b1
4846            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
4847            .await;
4848
4849        // After unfollowing, client B stops receiving updates from client A.
4850        workspace_b.update(cx_b, |workspace, cx| {
4851            workspace.unfollow(&workspace.active_pane().clone(), cx)
4852        });
4853        workspace_a.update(cx_a, |workspace, cx| {
4854            workspace.activate_item(&editor_a2, cx)
4855        });
4856        cx_a.foreground().run_until_parked();
4857        assert_eq!(
4858            workspace_b.read_with(cx_b, |workspace, cx| workspace
4859                .active_item(cx)
4860                .unwrap()
4861                .id()),
4862            editor_b1.id()
4863        );
4864
4865        // Client A starts following client B.
4866        workspace_a
4867            .update(cx_a, |workspace, cx| {
4868                workspace
4869                    .toggle_follow(&ToggleFollow(client_b_id), cx)
4870                    .unwrap()
4871            })
4872            .await
4873            .unwrap();
4874        assert_eq!(
4875            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4876            Some(client_b_id)
4877        );
4878        assert_eq!(
4879            workspace_a.read_with(cx_a, |workspace, cx| workspace
4880                .active_item(cx)
4881                .unwrap()
4882                .id()),
4883            editor_a1.id()
4884        );
4885
4886        // Following interrupts when client B disconnects.
4887        client_b.disconnect(&cx_b.to_async()).unwrap();
4888        cx_a.foreground().run_until_parked();
4889        assert_eq!(
4890            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4891            None
4892        );
4893    }
4894
4895    #[gpui::test(iterations = 10)]
4896    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4897        cx_a.foreground().forbid_parking();
4898        let fs = FakeFs::new(cx_a.background());
4899
4900        // 2 clients connect to a server.
4901        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4902        let mut client_a = server.create_client(cx_a, "user_a").await;
4903        let mut client_b = server.create_client(cx_b, "user_b").await;
4904        cx_a.update(editor::init);
4905        cx_b.update(editor::init);
4906
4907        // Client A shares a project.
4908        fs.insert_tree(
4909            "/a",
4910            json!({
4911                ".zed.toml": r#"collaborators = ["user_b"]"#,
4912                "1.txt": "one",
4913                "2.txt": "two",
4914                "3.txt": "three",
4915                "4.txt": "four",
4916            }),
4917        )
4918        .await;
4919        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4920        project_a
4921            .update(cx_a, |project, cx| project.share(cx))
4922            .await
4923            .unwrap();
4924
4925        // Client B joins the project.
4926        let project_b = client_b
4927            .build_remote_project(
4928                project_a
4929                    .read_with(cx_a, |project, _| project.remote_id())
4930                    .unwrap(),
4931                cx_b,
4932            )
4933            .await;
4934
4935        // Client A opens some editors.
4936        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4937        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4938        let _editor_a1 = workspace_a
4939            .update(cx_a, |workspace, cx| {
4940                workspace.open_path((worktree_id, "1.txt"), cx)
4941            })
4942            .await
4943            .unwrap()
4944            .downcast::<Editor>()
4945            .unwrap();
4946
4947        // Client B opens an editor.
4948        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4949        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
4950        let _editor_b1 = workspace_b
4951            .update(cx_b, |workspace, cx| {
4952                workspace.open_path((worktree_id, "2.txt"), cx)
4953            })
4954            .await
4955            .unwrap()
4956            .downcast::<Editor>()
4957            .unwrap();
4958
4959        // Clients A and B follow each other in split panes
4960        workspace_a
4961            .update(cx_a, |workspace, cx| {
4962                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4963                assert_ne!(*workspace.active_pane(), pane_a1);
4964                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
4965                workspace
4966                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4967                    .unwrap()
4968            })
4969            .await
4970            .unwrap();
4971        workspace_b
4972            .update(cx_b, |workspace, cx| {
4973                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4974                assert_ne!(*workspace.active_pane(), pane_b1);
4975                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
4976                workspace
4977                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4978                    .unwrap()
4979            })
4980            .await
4981            .unwrap();
4982
4983        workspace_a
4984            .update(cx_a, |workspace, cx| {
4985                workspace.activate_next_pane(cx);
4986                assert_eq!(*workspace.active_pane(), pane_a1);
4987                workspace.open_path((worktree_id, "3.txt"), cx)
4988            })
4989            .await
4990            .unwrap();
4991        workspace_b
4992            .update(cx_b, |workspace, cx| {
4993                workspace.activate_next_pane(cx);
4994                assert_eq!(*workspace.active_pane(), pane_b1);
4995                workspace.open_path((worktree_id, "4.txt"), cx)
4996            })
4997            .await
4998            .unwrap();
4999        cx_a.foreground().run_until_parked();
5000
5001        // Ensure leader updates don't change the active pane of followers
5002        workspace_a.read_with(cx_a, |workspace, _| {
5003            assert_eq!(*workspace.active_pane(), pane_a1);
5004        });
5005        workspace_b.read_with(cx_b, |workspace, _| {
5006            assert_eq!(*workspace.active_pane(), pane_b1);
5007        });
5008
5009        // Ensure peers following each other doesn't cause an infinite loop.
5010        assert_eq!(
5011            workspace_a.read_with(cx_a, |workspace, cx| workspace
5012                .active_item(cx)
5013                .unwrap()
5014                .project_path(cx)),
5015            Some((worktree_id, "3.txt").into())
5016        );
5017        workspace_a.update(cx_a, |workspace, cx| {
5018            assert_eq!(
5019                workspace.active_item(cx).unwrap().project_path(cx),
5020                Some((worktree_id, "3.txt").into())
5021            );
5022            workspace.activate_next_pane(cx);
5023            assert_eq!(
5024                workspace.active_item(cx).unwrap().project_path(cx),
5025                Some((worktree_id, "4.txt").into())
5026            );
5027        });
5028        workspace_b.update(cx_b, |workspace, cx| {
5029            assert_eq!(
5030                workspace.active_item(cx).unwrap().project_path(cx),
5031                Some((worktree_id, "4.txt").into())
5032            );
5033            workspace.activate_next_pane(cx);
5034            assert_eq!(
5035                workspace.active_item(cx).unwrap().project_path(cx),
5036                Some((worktree_id, "3.txt").into())
5037            );
5038        });
5039    }
5040
5041    #[gpui::test(iterations = 10)]
5042    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5043        cx_a.foreground().forbid_parking();
5044        let fs = FakeFs::new(cx_a.background());
5045
5046        // 2 clients connect to a server.
5047        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5048        let mut client_a = server.create_client(cx_a, "user_a").await;
5049        let mut client_b = server.create_client(cx_b, "user_b").await;
5050        cx_a.update(editor::init);
5051        cx_b.update(editor::init);
5052
5053        // Client A shares a project.
5054        fs.insert_tree(
5055            "/a",
5056            json!({
5057                ".zed.toml": r#"collaborators = ["user_b"]"#,
5058                "1.txt": "one",
5059                "2.txt": "two",
5060                "3.txt": "three",
5061            }),
5062        )
5063        .await;
5064        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5065        project_a
5066            .update(cx_a, |project, cx| project.share(cx))
5067            .await
5068            .unwrap();
5069
5070        // Client B joins the project.
5071        let project_b = client_b
5072            .build_remote_project(
5073                project_a
5074                    .read_with(cx_a, |project, _| project.remote_id())
5075                    .unwrap(),
5076                cx_b,
5077            )
5078            .await;
5079
5080        // Client A opens some editors.
5081        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5082        let _editor_a1 = workspace_a
5083            .update(cx_a, |workspace, cx| {
5084                workspace.open_path((worktree_id, "1.txt"), cx)
5085            })
5086            .await
5087            .unwrap()
5088            .downcast::<Editor>()
5089            .unwrap();
5090
5091        // Client B starts following client A.
5092        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5093        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5094        let leader_id = project_b.read_with(cx_b, |project, _| {
5095            project.collaborators().values().next().unwrap().peer_id
5096        });
5097        workspace_b
5098            .update(cx_b, |workspace, cx| {
5099                workspace
5100                    .toggle_follow(&ToggleFollow(leader_id), cx)
5101                    .unwrap()
5102            })
5103            .await
5104            .unwrap();
5105        assert_eq!(
5106            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5107            Some(leader_id)
5108        );
5109        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5110            workspace
5111                .active_item(cx)
5112                .unwrap()
5113                .downcast::<Editor>()
5114                .unwrap()
5115        });
5116
5117        // When client B moves, it automatically stops following client A.
5118        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5119        assert_eq!(
5120            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5121            None
5122        );
5123
5124        workspace_b
5125            .update(cx_b, |workspace, cx| {
5126                workspace
5127                    .toggle_follow(&ToggleFollow(leader_id), cx)
5128                    .unwrap()
5129            })
5130            .await
5131            .unwrap();
5132        assert_eq!(
5133            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5134            Some(leader_id)
5135        );
5136
5137        // When client B edits, it automatically stops following client A.
5138        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5139        assert_eq!(
5140            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5141            None
5142        );
5143
5144        workspace_b
5145            .update(cx_b, |workspace, cx| {
5146                workspace
5147                    .toggle_follow(&ToggleFollow(leader_id), cx)
5148                    .unwrap()
5149            })
5150            .await
5151            .unwrap();
5152        assert_eq!(
5153            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5154            Some(leader_id)
5155        );
5156
5157        // When client B scrolls, it automatically stops following client A.
5158        editor_b2.update(cx_b, |editor, cx| {
5159            editor.set_scroll_position(vec2f(0., 3.), cx)
5160        });
5161        assert_eq!(
5162            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5163            None
5164        );
5165
5166        workspace_b
5167            .update(cx_b, |workspace, cx| {
5168                workspace
5169                    .toggle_follow(&ToggleFollow(leader_id), cx)
5170                    .unwrap()
5171            })
5172            .await
5173            .unwrap();
5174        assert_eq!(
5175            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5176            Some(leader_id)
5177        );
5178
5179        // When client B activates a different pane, it continues following client A in the original pane.
5180        workspace_b.update(cx_b, |workspace, cx| {
5181            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5182        });
5183        assert_eq!(
5184            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5185            Some(leader_id)
5186        );
5187
5188        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5189        assert_eq!(
5190            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5191            Some(leader_id)
5192        );
5193
5194        // When client B activates a different item in the original pane, it automatically stops following client A.
5195        workspace_b
5196            .update(cx_b, |workspace, cx| {
5197                workspace.open_path((worktree_id, "2.txt"), cx)
5198            })
5199            .await
5200            .unwrap();
5201        assert_eq!(
5202            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5203            None
5204        );
5205    }
5206
5207    #[gpui::test(iterations = 100)]
5208    async fn test_random_collaboration(
5209        cx: &mut TestAppContext,
5210        deterministic: Arc<Deterministic>,
5211        rng: StdRng,
5212    ) {
5213        cx.foreground().forbid_parking();
5214        let max_peers = env::var("MAX_PEERS")
5215            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5216            .unwrap_or(5);
5217        assert!(max_peers <= 5);
5218
5219        let max_operations = env::var("OPERATIONS")
5220            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5221            .unwrap_or(10);
5222
5223        let rng = Arc::new(Mutex::new(rng));
5224
5225        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5226        let host_language_registry = Arc::new(LanguageRegistry::test());
5227
5228        let fs = FakeFs::new(cx.background());
5229        fs.insert_tree(
5230            "/_collab",
5231            json!({
5232                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5233            }),
5234        )
5235        .await;
5236
5237        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5238        let mut clients = Vec::new();
5239        let mut user_ids = Vec::new();
5240        let mut op_start_signals = Vec::new();
5241        let files = Arc::new(Mutex::new(Vec::new()));
5242
5243        let mut next_entity_id = 100000;
5244        let mut host_cx = TestAppContext::new(
5245            cx.foreground_platform(),
5246            cx.platform(),
5247            deterministic.build_foreground(next_entity_id),
5248            deterministic.build_background(),
5249            cx.font_cache(),
5250            cx.leak_detector(),
5251            next_entity_id,
5252        );
5253        let host = server.create_client(&mut host_cx, "host").await;
5254        let host_project = host_cx.update(|cx| {
5255            Project::local(
5256                host.client.clone(),
5257                host.user_store.clone(),
5258                host_language_registry.clone(),
5259                fs.clone(),
5260                cx,
5261            )
5262        });
5263        let host_project_id = host_project
5264            .update(&mut host_cx, |p, _| p.next_remote_id())
5265            .await;
5266
5267        let (collab_worktree, _) = host_project
5268            .update(&mut host_cx, |project, cx| {
5269                project.find_or_create_local_worktree("/_collab", true, cx)
5270            })
5271            .await
5272            .unwrap();
5273        collab_worktree
5274            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5275            .await;
5276        host_project
5277            .update(&mut host_cx, |project, cx| project.share(cx))
5278            .await
5279            .unwrap();
5280
5281        // Set up fake language servers.
5282        let mut language = Language::new(
5283            LanguageConfig {
5284                name: "Rust".into(),
5285                path_suffixes: vec!["rs".to_string()],
5286                ..Default::default()
5287            },
5288            None,
5289        );
5290        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5291            name: "the-fake-language-server",
5292            capabilities: lsp::LanguageServer::full_capabilities(),
5293            initializer: Some(Box::new({
5294                let rng = rng.clone();
5295                let files = files.clone();
5296                let project = host_project.downgrade();
5297                move |fake_server: &mut FakeLanguageServer| {
5298                    fake_server.handle_request::<lsp::request::Completion, _, _>(
5299                        |_, _| async move {
5300                            Ok(Some(lsp::CompletionResponse::Array(vec![
5301                                lsp::CompletionItem {
5302                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5303                                        range: lsp::Range::new(
5304                                            lsp::Position::new(0, 0),
5305                                            lsp::Position::new(0, 0),
5306                                        ),
5307                                        new_text: "the-new-text".to_string(),
5308                                    })),
5309                                    ..Default::default()
5310                                },
5311                            ])))
5312                        },
5313                    );
5314
5315                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5316                        |_, _| async move {
5317                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5318                                lsp::CodeAction {
5319                                    title: "the-code-action".to_string(),
5320                                    ..Default::default()
5321                                },
5322                            )]))
5323                        },
5324                    );
5325
5326                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5327                        |params, _| async move {
5328                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5329                                params.position,
5330                                params.position,
5331                            ))))
5332                        },
5333                    );
5334
5335                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5336                        let files = files.clone();
5337                        let rng = rng.clone();
5338                        move |_, _| {
5339                            let files = files.clone();
5340                            let rng = rng.clone();
5341                            async move {
5342                                let files = files.lock();
5343                                let mut rng = rng.lock();
5344                                let count = rng.gen_range::<usize, _>(1..3);
5345                                let files = (0..count)
5346                                    .map(|_| files.choose(&mut *rng).unwrap())
5347                                    .collect::<Vec<_>>();
5348                                log::info!("LSP: Returning definitions in files {:?}", &files);
5349                                Ok(Some(lsp::GotoDefinitionResponse::Array(
5350                                    files
5351                                        .into_iter()
5352                                        .map(|file| lsp::Location {
5353                                            uri: lsp::Url::from_file_path(file).unwrap(),
5354                                            range: Default::default(),
5355                                        })
5356                                        .collect(),
5357                                )))
5358                            }
5359                        }
5360                    });
5361
5362                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5363                        let rng = rng.clone();
5364                        let project = project.clone();
5365                        move |params, mut cx| {
5366                            let highlights = if let Some(project) = project.upgrade(&cx) {
5367                                project.update(&mut cx, |project, cx| {
5368                                    let path = params
5369                                        .text_document_position_params
5370                                        .text_document
5371                                        .uri
5372                                        .to_file_path()
5373                                        .unwrap();
5374                                    let (worktree, relative_path) =
5375                                        project.find_local_worktree(&path, cx)?;
5376                                    let project_path =
5377                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
5378                                    let buffer =
5379                                        project.get_open_buffer(&project_path, cx)?.read(cx);
5380
5381                                    let mut highlights = Vec::new();
5382                                    let highlight_count = rng.lock().gen_range(1..=5);
5383                                    let mut prev_end = 0;
5384                                    for _ in 0..highlight_count {
5385                                        let range =
5386                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
5387
5388                                        highlights.push(lsp::DocumentHighlight {
5389                                            range: range_to_lsp(range.to_point_utf16(buffer)),
5390                                            kind: Some(lsp::DocumentHighlightKind::READ),
5391                                        });
5392                                        prev_end = range.end;
5393                                    }
5394                                    Some(highlights)
5395                                })
5396                            } else {
5397                                None
5398                            };
5399                            async move { Ok(highlights) }
5400                        }
5401                    });
5402                }
5403            })),
5404            ..Default::default()
5405        });
5406        host_language_registry.add(Arc::new(language));
5407
5408        let op_start_signal = futures::channel::mpsc::unbounded();
5409        user_ids.push(host.current_user_id(&host_cx));
5410        op_start_signals.push(op_start_signal.0);
5411        clients.push(host_cx.foreground().spawn(host.simulate_host(
5412            host_project,
5413            files,
5414            op_start_signal.1,
5415            rng.clone(),
5416            host_cx,
5417        )));
5418
5419        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5420            rng.lock().gen_range(0..max_operations)
5421        } else {
5422            max_operations
5423        };
5424        let mut available_guests = vec![
5425            "guest-1".to_string(),
5426            "guest-2".to_string(),
5427            "guest-3".to_string(),
5428            "guest-4".to_string(),
5429        ];
5430        let mut operations = 0;
5431        while operations < max_operations {
5432            if operations == disconnect_host_at {
5433                server.disconnect_client(user_ids[0]);
5434                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5435                drop(op_start_signals);
5436                let mut clients = futures::future::join_all(clients).await;
5437                cx.foreground().run_until_parked();
5438
5439                let (host, mut host_cx, host_err) = clients.remove(0);
5440                if let Some(host_err) = host_err {
5441                    log::error!("host error - {}", host_err);
5442                }
5443                host.project
5444                    .as_ref()
5445                    .unwrap()
5446                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5447                for (guest, mut guest_cx, guest_err) in clients {
5448                    if let Some(guest_err) = guest_err {
5449                        log::error!("{} error - {}", guest.username, guest_err);
5450                    }
5451                    let contacts = server
5452                        .store
5453                        .read()
5454                        .await
5455                        .contacts_for_user(guest.current_user_id(&guest_cx));
5456                    assert!(!contacts
5457                        .iter()
5458                        .flat_map(|contact| &contact.projects)
5459                        .any(|project| project.id == host_project_id));
5460                    guest
5461                        .project
5462                        .as_ref()
5463                        .unwrap()
5464                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5465                    guest_cx.update(|_| drop(guest));
5466                }
5467                host_cx.update(|_| drop(host));
5468
5469                return;
5470            }
5471
5472            let distribution = rng.lock().gen_range(0..100);
5473            match distribution {
5474                0..=19 if !available_guests.is_empty() => {
5475                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
5476                    let guest_username = available_guests.remove(guest_ix);
5477                    log::info!("Adding new connection for {}", guest_username);
5478                    next_entity_id += 100000;
5479                    let mut guest_cx = TestAppContext::new(
5480                        cx.foreground_platform(),
5481                        cx.platform(),
5482                        deterministic.build_foreground(next_entity_id),
5483                        deterministic.build_background(),
5484                        cx.font_cache(),
5485                        cx.leak_detector(),
5486                        next_entity_id,
5487                    );
5488                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
5489                    let guest_project = Project::remote(
5490                        host_project_id,
5491                        guest.client.clone(),
5492                        guest.user_store.clone(),
5493                        guest_lang_registry.clone(),
5494                        FakeFs::new(cx.background()),
5495                        &mut guest_cx.to_async(),
5496                    )
5497                    .await
5498                    .unwrap();
5499                    let op_start_signal = futures::channel::mpsc::unbounded();
5500                    user_ids.push(guest.current_user_id(&guest_cx));
5501                    op_start_signals.push(op_start_signal.0);
5502                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5503                        guest_username.clone(),
5504                        guest_project,
5505                        op_start_signal.1,
5506                        rng.clone(),
5507                        guest_cx,
5508                    )));
5509
5510                    log::info!("Added connection for {}", guest_username);
5511                    operations += 1;
5512                }
5513                20..=29 if clients.len() > 1 => {
5514                    log::info!("Removing guest");
5515                    let guest_ix = rng.lock().gen_range(1..clients.len());
5516                    let removed_guest_id = user_ids.remove(guest_ix);
5517                    let guest = clients.remove(guest_ix);
5518                    op_start_signals.remove(guest_ix);
5519                    server.disconnect_client(removed_guest_id);
5520                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5521                    let (guest, mut guest_cx, guest_err) = guest.await;
5522                    if let Some(guest_err) = guest_err {
5523                        log::error!("{} error - {}", guest.username, guest_err);
5524                    }
5525                    guest
5526                        .project
5527                        .as_ref()
5528                        .unwrap()
5529                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5530                    for user_id in &user_ids {
5531                        for contact in server.store.read().await.contacts_for_user(*user_id) {
5532                            assert_ne!(
5533                                contact.user_id, removed_guest_id.0 as u64,
5534                                "removed guest is still a contact of another peer"
5535                            );
5536                            for project in contact.projects {
5537                                for project_guest_id in project.guests {
5538                                    assert_ne!(
5539                                        project_guest_id, removed_guest_id.0 as u64,
5540                                        "removed guest appears as still participating on a project"
5541                                    );
5542                                }
5543                            }
5544                        }
5545                    }
5546
5547                    log::info!("{} removed", guest.username);
5548                    available_guests.push(guest.username.clone());
5549                    guest_cx.update(|_| drop(guest));
5550
5551                    operations += 1;
5552                }
5553                _ => {
5554                    while operations < max_operations && rng.lock().gen_bool(0.7) {
5555                        op_start_signals
5556                            .choose(&mut *rng.lock())
5557                            .unwrap()
5558                            .unbounded_send(())
5559                            .unwrap();
5560                        operations += 1;
5561                    }
5562
5563                    if rng.lock().gen_bool(0.8) {
5564                        cx.foreground().run_until_parked();
5565                    }
5566                }
5567            }
5568        }
5569
5570        drop(op_start_signals);
5571        let mut clients = futures::future::join_all(clients).await;
5572        cx.foreground().run_until_parked();
5573
5574        let (host_client, mut host_cx, host_err) = clients.remove(0);
5575        if let Some(host_err) = host_err {
5576            panic!("host error - {}", host_err);
5577        }
5578        let host_project = host_client.project.as_ref().unwrap();
5579        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5580            project
5581                .worktrees(cx)
5582                .map(|worktree| {
5583                    let snapshot = worktree.read(cx).snapshot();
5584                    (snapshot.id(), snapshot)
5585                })
5586                .collect::<BTreeMap<_, _>>()
5587        });
5588
5589        host_client
5590            .project
5591            .as_ref()
5592            .unwrap()
5593            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5594
5595        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5596            if let Some(guest_err) = guest_err {
5597                panic!("{} error - {}", guest_client.username, guest_err);
5598            }
5599            let worktree_snapshots =
5600                guest_client
5601                    .project
5602                    .as_ref()
5603                    .unwrap()
5604                    .read_with(&guest_cx, |project, cx| {
5605                        project
5606                            .worktrees(cx)
5607                            .map(|worktree| {
5608                                let worktree = worktree.read(cx);
5609                                (worktree.id(), worktree.snapshot())
5610                            })
5611                            .collect::<BTreeMap<_, _>>()
5612                    });
5613
5614            assert_eq!(
5615                worktree_snapshots.keys().collect::<Vec<_>>(),
5616                host_worktree_snapshots.keys().collect::<Vec<_>>(),
5617                "{} has different worktrees than the host",
5618                guest_client.username
5619            );
5620            for (id, host_snapshot) in &host_worktree_snapshots {
5621                let guest_snapshot = &worktree_snapshots[id];
5622                assert_eq!(
5623                    guest_snapshot.root_name(),
5624                    host_snapshot.root_name(),
5625                    "{} has different root name than the host for worktree {}",
5626                    guest_client.username,
5627                    id
5628                );
5629                assert_eq!(
5630                    guest_snapshot.entries(false).collect::<Vec<_>>(),
5631                    host_snapshot.entries(false).collect::<Vec<_>>(),
5632                    "{} has different snapshot than the host for worktree {}",
5633                    guest_client.username,
5634                    id
5635                );
5636            }
5637
5638            guest_client
5639                .project
5640                .as_ref()
5641                .unwrap()
5642                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5643
5644            for guest_buffer in &guest_client.buffers {
5645                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5646                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5647                    project.buffer_for_id(buffer_id, cx).expect(&format!(
5648                        "host does not have buffer for guest:{}, peer:{}, id:{}",
5649                        guest_client.username, guest_client.peer_id, buffer_id
5650                    ))
5651                });
5652                let path = host_buffer
5653                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5654
5655                assert_eq!(
5656                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5657                    0,
5658                    "{}, buffer {}, path {:?} has deferred operations",
5659                    guest_client.username,
5660                    buffer_id,
5661                    path,
5662                );
5663                assert_eq!(
5664                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5665                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5666                    "{}, buffer {}, path {:?}, differs from the host's buffer",
5667                    guest_client.username,
5668                    buffer_id,
5669                    path
5670                );
5671            }
5672
5673            guest_cx.update(|_| drop(guest_client));
5674        }
5675
5676        host_cx.update(|_| drop(host_client));
5677    }
5678
5679    struct TestServer {
5680        peer: Arc<Peer>,
5681        app_state: Arc<AppState>,
5682        server: Arc<Server>,
5683        foreground: Rc<executor::Foreground>,
5684        notifications: mpsc::UnboundedReceiver<()>,
5685        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5686        forbid_connections: Arc<AtomicBool>,
5687        _test_db: TestDb,
5688    }
5689
5690    impl TestServer {
5691        async fn start(
5692            foreground: Rc<executor::Foreground>,
5693            background: Arc<executor::Background>,
5694        ) -> Self {
5695            let test_db = TestDb::fake(background);
5696            let app_state = Self::build_app_state(&test_db).await;
5697            let peer = Peer::new();
5698            let notifications = mpsc::unbounded();
5699            let server = Server::new(app_state.clone(), Some(notifications.0));
5700            Self {
5701                peer,
5702                app_state,
5703                server,
5704                foreground,
5705                notifications: notifications.1,
5706                connection_killers: Default::default(),
5707                forbid_connections: Default::default(),
5708                _test_db: test_db,
5709            }
5710        }
5711
5712        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
5713            cx.update(|cx| {
5714                let settings = Settings::test(cx);
5715                cx.set_global(settings);
5716            });
5717
5718            let http = FakeHttpClient::with_404_response();
5719            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
5720            let client_name = name.to_string();
5721            let mut client = Client::new(http.clone());
5722            let server = self.server.clone();
5723            let connection_killers = self.connection_killers.clone();
5724            let forbid_connections = self.forbid_connections.clone();
5725            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
5726
5727            Arc::get_mut(&mut client)
5728                .unwrap()
5729                .override_authenticate(move |cx| {
5730                    cx.spawn(|_| async move {
5731                        let access_token = "the-token".to_string();
5732                        Ok(Credentials {
5733                            user_id: user_id.0 as u64,
5734                            access_token,
5735                        })
5736                    })
5737                })
5738                .override_establish_connection(move |credentials, cx| {
5739                    assert_eq!(credentials.user_id, user_id.0 as u64);
5740                    assert_eq!(credentials.access_token, "the-token");
5741
5742                    let server = server.clone();
5743                    let connection_killers = connection_killers.clone();
5744                    let forbid_connections = forbid_connections.clone();
5745                    let client_name = client_name.clone();
5746                    let connection_id_tx = connection_id_tx.clone();
5747                    cx.spawn(move |cx| async move {
5748                        if forbid_connections.load(SeqCst) {
5749                            Err(EstablishConnectionError::other(anyhow!(
5750                                "server is forbidding connections"
5751                            )))
5752                        } else {
5753                            let (client_conn, server_conn, killed) =
5754                                Connection::in_memory(cx.background());
5755                            connection_killers.lock().insert(user_id, killed);
5756                            cx.background()
5757                                .spawn(server.handle_connection(
5758                                    server_conn,
5759                                    client_name,
5760                                    user_id,
5761                                    Some(connection_id_tx),
5762                                    cx.background(),
5763                                ))
5764                                .detach();
5765                            Ok(client_conn)
5766                        }
5767                    })
5768                });
5769
5770            client
5771                .authenticate_and_connect(false, &cx.to_async())
5772                .await
5773                .unwrap();
5774
5775            Channel::init(&client);
5776            Project::init(&client);
5777            cx.update(|cx| {
5778                workspace::init(&client, cx);
5779            });
5780
5781            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5782            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5783
5784            let client = TestClient {
5785                client,
5786                peer_id,
5787                username: name.to_string(),
5788                user_store,
5789                language_registry: Arc::new(LanguageRegistry::test()),
5790                project: Default::default(),
5791                buffers: Default::default(),
5792            };
5793            client.wait_for_current_user(cx).await;
5794            client
5795        }
5796
5797        fn disconnect_client(&self, user_id: UserId) {
5798            self.connection_killers
5799                .lock()
5800                .remove(&user_id)
5801                .unwrap()
5802                .store(true, SeqCst);
5803        }
5804
5805        fn forbid_connections(&self) {
5806            self.forbid_connections.store(true, SeqCst);
5807        }
5808
5809        fn allow_connections(&self) {
5810            self.forbid_connections.store(false, SeqCst);
5811        }
5812
5813        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
5814            Arc::new(AppState {
5815                db: test_db.db().clone(),
5816                api_token: Default::default(),
5817            })
5818        }
5819
5820        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
5821            self.server.store.read().await
5822        }
5823
5824        async fn condition<F>(&mut self, mut predicate: F)
5825        where
5826            F: FnMut(&Store) -> bool,
5827        {
5828            assert!(
5829                self.foreground.parking_forbidden(),
5830                "you must call forbid_parking to use server conditions so we don't block indefinitely"
5831            );
5832            while !(predicate)(&*self.server.store.read().await) {
5833                self.foreground.start_waiting();
5834                self.notifications.next().await;
5835                self.foreground.finish_waiting();
5836            }
5837        }
5838    }
5839
5840    impl Deref for TestServer {
5841        type Target = Server;
5842
5843        fn deref(&self) -> &Self::Target {
5844            &self.server
5845        }
5846    }
5847
5848    impl Drop for TestServer {
5849        fn drop(&mut self) {
5850            self.peer.reset();
5851        }
5852    }
5853
5854    struct TestClient {
5855        client: Arc<Client>,
5856        username: String,
5857        pub peer_id: PeerId,
5858        pub user_store: ModelHandle<UserStore>,
5859        language_registry: Arc<LanguageRegistry>,
5860        project: Option<ModelHandle<Project>>,
5861        buffers: HashSet<ModelHandle<language::Buffer>>,
5862    }
5863
5864    impl Deref for TestClient {
5865        type Target = Arc<Client>;
5866
5867        fn deref(&self) -> &Self::Target {
5868            &self.client
5869        }
5870    }
5871
5872    impl TestClient {
5873        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
5874            UserId::from_proto(
5875                self.user_store
5876                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
5877            )
5878        }
5879
5880        async fn wait_for_current_user(&self, cx: &TestAppContext) {
5881            let mut authed_user = self
5882                .user_store
5883                .read_with(cx, |user_store, _| user_store.watch_current_user());
5884            while authed_user.next().await.unwrap().is_none() {}
5885        }
5886
5887        async fn build_local_project(
5888            &mut self,
5889            fs: Arc<FakeFs>,
5890            root_path: impl AsRef<Path>,
5891            cx: &mut TestAppContext,
5892        ) -> (ModelHandle<Project>, WorktreeId) {
5893            let project = cx.update(|cx| {
5894                Project::local(
5895                    self.client.clone(),
5896                    self.user_store.clone(),
5897                    self.language_registry.clone(),
5898                    fs,
5899                    cx,
5900                )
5901            });
5902            self.project = Some(project.clone());
5903            let (worktree, _) = project
5904                .update(cx, |p, cx| {
5905                    p.find_or_create_local_worktree(root_path, true, cx)
5906                })
5907                .await
5908                .unwrap();
5909            worktree
5910                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
5911                .await;
5912            project
5913                .update(cx, |project, _| project.next_remote_id())
5914                .await;
5915            (project, worktree.read_with(cx, |tree, _| tree.id()))
5916        }
5917
5918        async fn build_remote_project(
5919            &mut self,
5920            project_id: u64,
5921            cx: &mut TestAppContext,
5922        ) -> ModelHandle<Project> {
5923            let project = Project::remote(
5924                project_id,
5925                self.client.clone(),
5926                self.user_store.clone(),
5927                self.language_registry.clone(),
5928                FakeFs::new(cx.background()),
5929                &mut cx.to_async(),
5930            )
5931            .await
5932            .unwrap();
5933            self.project = Some(project.clone());
5934            project
5935        }
5936
5937        fn build_workspace(
5938            &self,
5939            project: &ModelHandle<Project>,
5940            cx: &mut TestAppContext,
5941        ) -> ViewHandle<Workspace> {
5942            let (window_id, _) = cx.add_window(|_| EmptyView);
5943            cx.add_view(window_id, |cx| {
5944                let fs = project.read(cx).fs().clone();
5945                Workspace::new(
5946                    &WorkspaceParams {
5947                        fs,
5948                        project: project.clone(),
5949                        user_store: self.user_store.clone(),
5950                        languages: self.language_registry.clone(),
5951                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
5952                        channel_list: cx.add_model(|cx| {
5953                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
5954                        }),
5955                        client: self.client.clone(),
5956                    },
5957                    cx,
5958                )
5959            })
5960        }
5961
5962        async fn simulate_host(
5963            mut self,
5964            project: ModelHandle<Project>,
5965            files: Arc<Mutex<Vec<PathBuf>>>,
5966            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5967            rng: Arc<Mutex<StdRng>>,
5968            mut cx: TestAppContext,
5969        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
5970            async fn simulate_host_internal(
5971                client: &mut TestClient,
5972                project: ModelHandle<Project>,
5973                files: Arc<Mutex<Vec<PathBuf>>>,
5974                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5975                rng: Arc<Mutex<StdRng>>,
5976                cx: &mut TestAppContext,
5977            ) -> anyhow::Result<()> {
5978                let fs = project.read_with(cx, |project, _| project.fs().clone());
5979
5980                while op_start_signal.next().await.is_some() {
5981                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
5982                    match distribution {
5983                        0..=20 if !files.lock().is_empty() => {
5984                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5985                            let mut path = path.as_path();
5986                            while let Some(parent_path) = path.parent() {
5987                                path = parent_path;
5988                                if rng.lock().gen() {
5989                                    break;
5990                                }
5991                            }
5992
5993                            log::info!("Host: find/create local worktree {:?}", path);
5994                            let find_or_create_worktree = project.update(cx, |project, cx| {
5995                                project.find_or_create_local_worktree(path, true, cx)
5996                            });
5997                            if rng.lock().gen() {
5998                                cx.background().spawn(find_or_create_worktree).detach();
5999                            } else {
6000                                find_or_create_worktree.await?;
6001                            }
6002                        }
6003                        10..=80 if !files.lock().is_empty() => {
6004                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6005                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6006                                let (worktree, path) = project
6007                                    .update(cx, |project, cx| {
6008                                        project.find_or_create_local_worktree(
6009                                            file.clone(),
6010                                            true,
6011                                            cx,
6012                                        )
6013                                    })
6014                                    .await?;
6015                                let project_path =
6016                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6017                                log::info!(
6018                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6019                                    file,
6020                                    project_path.0,
6021                                    project_path.1
6022                                );
6023                                let buffer = project
6024                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6025                                    .await
6026                                    .unwrap();
6027                                client.buffers.insert(buffer.clone());
6028                                buffer
6029                            } else {
6030                                client
6031                                    .buffers
6032                                    .iter()
6033                                    .choose(&mut *rng.lock())
6034                                    .unwrap()
6035                                    .clone()
6036                            };
6037
6038                            if rng.lock().gen_bool(0.1) {
6039                                cx.update(|cx| {
6040                                    log::info!(
6041                                        "Host: dropping buffer {:?}",
6042                                        buffer.read(cx).file().unwrap().full_path(cx)
6043                                    );
6044                                    client.buffers.remove(&buffer);
6045                                    drop(buffer);
6046                                });
6047                            } else {
6048                                buffer.update(cx, |buffer, cx| {
6049                                    log::info!(
6050                                        "Host: updating buffer {:?} ({})",
6051                                        buffer.file().unwrap().full_path(cx),
6052                                        buffer.remote_id()
6053                                    );
6054
6055                                    if rng.lock().gen_bool(0.7) {
6056                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6057                                    } else {
6058                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6059                                    }
6060                                });
6061                            }
6062                        }
6063                        _ => loop {
6064                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6065                            let mut path = PathBuf::new();
6066                            path.push("/");
6067                            for _ in 0..path_component_count {
6068                                let letter = rng.lock().gen_range(b'a'..=b'z');
6069                                path.push(std::str::from_utf8(&[letter]).unwrap());
6070                            }
6071                            path.set_extension("rs");
6072                            let parent_path = path.parent().unwrap();
6073
6074                            log::info!("Host: creating file {:?}", path,);
6075
6076                            if fs.create_dir(&parent_path).await.is_ok()
6077                                && fs.create_file(&path, Default::default()).await.is_ok()
6078                            {
6079                                files.lock().push(path);
6080                                break;
6081                            } else {
6082                                log::info!("Host: cannot create file");
6083                            }
6084                        },
6085                    }
6086
6087                    cx.background().simulate_random_delay().await;
6088                }
6089
6090                Ok(())
6091            }
6092
6093            let result = simulate_host_internal(
6094                &mut self,
6095                project.clone(),
6096                files,
6097                op_start_signal,
6098                rng,
6099                &mut cx,
6100            )
6101            .await;
6102            log::info!("Host done");
6103            self.project = Some(project);
6104            (self, cx, result.err())
6105        }
6106
6107        pub async fn simulate_guest(
6108            mut self,
6109            guest_username: String,
6110            project: ModelHandle<Project>,
6111            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6112            rng: Arc<Mutex<StdRng>>,
6113            mut cx: TestAppContext,
6114        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6115            async fn simulate_guest_internal(
6116                client: &mut TestClient,
6117                guest_username: &str,
6118                project: ModelHandle<Project>,
6119                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6120                rng: Arc<Mutex<StdRng>>,
6121                cx: &mut TestAppContext,
6122            ) -> anyhow::Result<()> {
6123                while op_start_signal.next().await.is_some() {
6124                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6125                        let worktree = if let Some(worktree) =
6126                            project.read_with(cx, |project, cx| {
6127                                project
6128                                    .worktrees(&cx)
6129                                    .filter(|worktree| {
6130                                        let worktree = worktree.read(cx);
6131                                        worktree.is_visible()
6132                                            && worktree.entries(false).any(|e| e.is_file())
6133                                    })
6134                                    .choose(&mut *rng.lock())
6135                            }) {
6136                            worktree
6137                        } else {
6138                            cx.background().simulate_random_delay().await;
6139                            continue;
6140                        };
6141
6142                        let (worktree_root_name, project_path) =
6143                            worktree.read_with(cx, |worktree, _| {
6144                                let entry = worktree
6145                                    .entries(false)
6146                                    .filter(|e| e.is_file())
6147                                    .choose(&mut *rng.lock())
6148                                    .unwrap();
6149                                (
6150                                    worktree.root_name().to_string(),
6151                                    (worktree.id(), entry.path.clone()),
6152                                )
6153                            });
6154                        log::info!(
6155                            "{}: opening path {:?} in worktree {} ({})",
6156                            guest_username,
6157                            project_path.1,
6158                            project_path.0,
6159                            worktree_root_name,
6160                        );
6161                        let buffer = project
6162                            .update(cx, |project, cx| {
6163                                project.open_buffer(project_path.clone(), cx)
6164                            })
6165                            .await?;
6166                        log::info!(
6167                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6168                            guest_username,
6169                            project_path.1,
6170                            project_path.0,
6171                            worktree_root_name,
6172                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6173                        );
6174                        client.buffers.insert(buffer.clone());
6175                        buffer
6176                    } else {
6177                        client
6178                            .buffers
6179                            .iter()
6180                            .choose(&mut *rng.lock())
6181                            .unwrap()
6182                            .clone()
6183                    };
6184
6185                    let choice = rng.lock().gen_range(0..100);
6186                    match choice {
6187                        0..=9 => {
6188                            cx.update(|cx| {
6189                                log::info!(
6190                                    "{}: dropping buffer {:?}",
6191                                    guest_username,
6192                                    buffer.read(cx).file().unwrap().full_path(cx)
6193                                );
6194                                client.buffers.remove(&buffer);
6195                                drop(buffer);
6196                            });
6197                        }
6198                        10..=19 => {
6199                            let completions = project.update(cx, |project, cx| {
6200                                log::info!(
6201                                    "{}: requesting completions for buffer {} ({:?})",
6202                                    guest_username,
6203                                    buffer.read(cx).remote_id(),
6204                                    buffer.read(cx).file().unwrap().full_path(cx)
6205                                );
6206                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6207                                project.completions(&buffer, offset, cx)
6208                            });
6209                            let completions = cx.background().spawn(async move {
6210                                completions
6211                                    .await
6212                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6213                            });
6214                            if rng.lock().gen_bool(0.3) {
6215                                log::info!("{}: detaching completions request", guest_username);
6216                                cx.update(|cx| completions.detach_and_log_err(cx));
6217                            } else {
6218                                completions.await?;
6219                            }
6220                        }
6221                        20..=29 => {
6222                            let code_actions = project.update(cx, |project, cx| {
6223                                log::info!(
6224                                    "{}: requesting code actions for buffer {} ({:?})",
6225                                    guest_username,
6226                                    buffer.read(cx).remote_id(),
6227                                    buffer.read(cx).file().unwrap().full_path(cx)
6228                                );
6229                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6230                                project.code_actions(&buffer, range, cx)
6231                            });
6232                            let code_actions = cx.background().spawn(async move {
6233                                code_actions.await.map_err(|err| {
6234                                    anyhow!("code actions request failed: {:?}", err)
6235                                })
6236                            });
6237                            if rng.lock().gen_bool(0.3) {
6238                                log::info!("{}: detaching code actions request", guest_username);
6239                                cx.update(|cx| code_actions.detach_and_log_err(cx));
6240                            } else {
6241                                code_actions.await?;
6242                            }
6243                        }
6244                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6245                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6246                                log::info!(
6247                                    "{}: saving buffer {} ({:?})",
6248                                    guest_username,
6249                                    buffer.remote_id(),
6250                                    buffer.file().unwrap().full_path(cx)
6251                                );
6252                                (buffer.version(), buffer.save(cx))
6253                            });
6254                            let save = cx.background().spawn(async move {
6255                                let (saved_version, _) = save
6256                                    .await
6257                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6258                                assert!(saved_version.observed_all(&requested_version));
6259                                Ok::<_, anyhow::Error>(())
6260                            });
6261                            if rng.lock().gen_bool(0.3) {
6262                                log::info!("{}: detaching save request", guest_username);
6263                                cx.update(|cx| save.detach_and_log_err(cx));
6264                            } else {
6265                                save.await?;
6266                            }
6267                        }
6268                        40..=44 => {
6269                            let prepare_rename = project.update(cx, |project, cx| {
6270                                log::info!(
6271                                    "{}: preparing rename for buffer {} ({:?})",
6272                                    guest_username,
6273                                    buffer.read(cx).remote_id(),
6274                                    buffer.read(cx).file().unwrap().full_path(cx)
6275                                );
6276                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6277                                project.prepare_rename(buffer, offset, cx)
6278                            });
6279                            let prepare_rename = cx.background().spawn(async move {
6280                                prepare_rename.await.map_err(|err| {
6281                                    anyhow!("prepare rename request failed: {:?}", err)
6282                                })
6283                            });
6284                            if rng.lock().gen_bool(0.3) {
6285                                log::info!("{}: detaching prepare rename request", guest_username);
6286                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6287                            } else {
6288                                prepare_rename.await?;
6289                            }
6290                        }
6291                        45..=49 => {
6292                            let definitions = project.update(cx, |project, cx| {
6293                                log::info!(
6294                                    "{}: requesting definitions for buffer {} ({:?})",
6295                                    guest_username,
6296                                    buffer.read(cx).remote_id(),
6297                                    buffer.read(cx).file().unwrap().full_path(cx)
6298                                );
6299                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6300                                project.definition(&buffer, offset, cx)
6301                            });
6302                            let definitions = cx.background().spawn(async move {
6303                                definitions
6304                                    .await
6305                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6306                            });
6307                            if rng.lock().gen_bool(0.3) {
6308                                log::info!("{}: detaching definitions request", guest_username);
6309                                cx.update(|cx| definitions.detach_and_log_err(cx));
6310                            } else {
6311                                client
6312                                    .buffers
6313                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6314                            }
6315                        }
6316                        50..=54 => {
6317                            let highlights = project.update(cx, |project, cx| {
6318                                log::info!(
6319                                    "{}: requesting highlights for buffer {} ({:?})",
6320                                    guest_username,
6321                                    buffer.read(cx).remote_id(),
6322                                    buffer.read(cx).file().unwrap().full_path(cx)
6323                                );
6324                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6325                                project.document_highlights(&buffer, offset, cx)
6326                            });
6327                            let highlights = cx.background().spawn(async move {
6328                                highlights
6329                                    .await
6330                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6331                            });
6332                            if rng.lock().gen_bool(0.3) {
6333                                log::info!("{}: detaching highlights request", guest_username);
6334                                cx.update(|cx| highlights.detach_and_log_err(cx));
6335                            } else {
6336                                highlights.await?;
6337                            }
6338                        }
6339                        55..=59 => {
6340                            let search = project.update(cx, |project, cx| {
6341                                let query = rng.lock().gen_range('a'..='z');
6342                                log::info!("{}: project-wide search {:?}", guest_username, query);
6343                                project.search(SearchQuery::text(query, false, false), cx)
6344                            });
6345                            let search = cx.background().spawn(async move {
6346                                search
6347                                    .await
6348                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
6349                            });
6350                            if rng.lock().gen_bool(0.3) {
6351                                log::info!("{}: detaching search request", guest_username);
6352                                cx.update(|cx| search.detach_and_log_err(cx));
6353                            } else {
6354                                client.buffers.extend(search.await?.into_keys());
6355                            }
6356                        }
6357                        _ => {
6358                            buffer.update(cx, |buffer, cx| {
6359                                log::info!(
6360                                    "{}: updating buffer {} ({:?})",
6361                                    guest_username,
6362                                    buffer.remote_id(),
6363                                    buffer.file().unwrap().full_path(cx)
6364                                );
6365                                if rng.lock().gen_bool(0.7) {
6366                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6367                                } else {
6368                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6369                                }
6370                            });
6371                        }
6372                    }
6373                    cx.background().simulate_random_delay().await;
6374                }
6375                Ok(())
6376            }
6377
6378            let result = simulate_guest_internal(
6379                &mut self,
6380                &guest_username,
6381                project.clone(),
6382                op_start_signal,
6383                rng,
6384                &mut cx,
6385            )
6386            .await;
6387            log::info!("{}: done", guest_username);
6388
6389            self.project = Some(project);
6390            (self, cx, result.err())
6391        }
6392    }
6393
6394    impl Drop for TestClient {
6395        fn drop(&mut self) {
6396            self.client.tear_down();
6397        }
6398    }
6399
6400    impl Executor for Arc<gpui::executor::Background> {
6401        type Sleep = gpui::executor::Timer;
6402
6403        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6404            self.spawn(future).detach();
6405        }
6406
6407        fn sleep(&self, duration: Duration) -> Self::Sleep {
6408            self.as_ref().timer(duration)
6409        }
6410    }
6411
6412    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6413        channel
6414            .messages()
6415            .cursor::<()>()
6416            .map(|m| {
6417                (
6418                    m.sender.github_login.clone(),
6419                    m.body.clone(),
6420                    m.is_pending(),
6421                )
6422            })
6423            .collect()
6424    }
6425
6426    struct EmptyView;
6427
6428    impl gpui::Entity for EmptyView {
6429        type Event = ();
6430    }
6431
6432    impl gpui::View for EmptyView {
6433        fn ui_name() -> &'static str {
6434            "empty view"
6435        }
6436
6437        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6438            gpui::Element::boxed(gpui::elements::Empty)
6439        }
6440    }
6441}