rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::{IntoResponse, Response},
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::{HashMap, HashSet};
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::Arc,
  40    time::Duration,
  41};
  42use store::{Store, Worktree};
  43use time::OffsetDateTime;
  44use tokio::{
  45    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  46    time::Sleep,
  47};
  48use tower::ServiceBuilder;
  49use tracing::{info_span, instrument, Instrument};
  50
  51type MessageHandler =
  52    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  53
  54pub struct Server {
  55    peer: Arc<Peer>,
  56    store: RwLock<Store>,
  57    app_state: Arc<AppState>,
  58    handlers: HashMap<TypeId, MessageHandler>,
  59    notifications: Option<mpsc::UnboundedSender<()>>,
  60}
  61
  62pub trait Executor: Send + Clone {
  63    type Sleep: Send + Future;
  64    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  65    fn sleep(&self, duration: Duration) -> Self::Sleep;
  66}
  67
  68#[derive(Clone)]
  69pub struct RealExecutor;
  70
  71const MESSAGE_COUNT_PER_PAGE: usize = 100;
  72const MAX_MESSAGE_LEN: usize = 1024;
  73
  74struct StoreReadGuard<'a> {
  75    guard: RwLockReadGuard<'a, Store>,
  76    _not_send: PhantomData<Rc<()>>,
  77}
  78
  79struct StoreWriteGuard<'a> {
  80    guard: RwLockWriteGuard<'a, Store>,
  81    _not_send: PhantomData<Rc<()>>,
  82}
  83
  84impl Server {
  85    pub fn new(
  86        app_state: Arc<AppState>,
  87        notifications: Option<mpsc::UnboundedSender<()>>,
  88    ) -> Arc<Self> {
  89        let mut server = Self {
  90            peer: Peer::new(),
  91            app_state,
  92            store: Default::default(),
  93            handlers: Default::default(),
  94            notifications,
  95        };
  96
  97        server
  98            .add_request_handler(Server::ping)
  99            .add_request_handler(Server::register_project)
 100            .add_message_handler(Server::unregister_project)
 101            .add_request_handler(Server::share_project)
 102            .add_message_handler(Server::unshare_project)
 103            .add_sync_request_handler(Server::join_project)
 104            .add_message_handler(Server::leave_project)
 105            .add_request_handler(Server::register_worktree)
 106            .add_message_handler(Server::unregister_worktree)
 107            .add_request_handler(Server::update_worktree)
 108            .add_message_handler(Server::start_language_server)
 109            .add_message_handler(Server::update_language_server)
 110            .add_message_handler(Server::update_diagnostic_summary)
 111            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 112            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 113            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 114            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 115            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 116            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 117            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 118            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 119            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 120            .add_request_handler(
 121                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 122            )
 123            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 124            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 125            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 126            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 127            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 128            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 129            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 130            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 131            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 132            .add_request_handler(Server::update_buffer)
 133            .add_message_handler(Server::update_buffer_file)
 134            .add_message_handler(Server::buffer_reloaded)
 135            .add_message_handler(Server::buffer_saved)
 136            .add_request_handler(Server::save_buffer)
 137            .add_request_handler(Server::get_channels)
 138            .add_request_handler(Server::get_users)
 139            .add_request_handler(Server::fuzzy_search_users)
 140            .add_request_handler(Server::join_channel)
 141            .add_message_handler(Server::leave_channel)
 142            .add_request_handler(Server::send_channel_message)
 143            .add_request_handler(Server::follow)
 144            .add_message_handler(Server::unfollow)
 145            .add_message_handler(Server::update_followers)
 146            .add_request_handler(Server::get_channel_messages);
 147
 148        Arc::new(server)
 149    }
 150
 151    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 152    where
 153        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 154        Fut: 'static + Send + Future<Output = Result<()>>,
 155        M: EnvelopedMessage,
 156    {
 157        let prev_handler = self.handlers.insert(
 158            TypeId::of::<M>(),
 159            Box::new(move |server, envelope| {
 160                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 161                let span = info_span!(
 162                    "handle message",
 163                    payload_type = envelope.payload_type_name(),
 164                    payload = format!("{:?}", envelope.payload).as_str(),
 165                );
 166                let future = (handler)(server, *envelope);
 167                async move {
 168                    if let Err(error) = future.await {
 169                        tracing::error!(%error, "error handling message");
 170                    }
 171                }
 172                .instrument(span)
 173                .boxed()
 174            }),
 175        );
 176        if prev_handler.is_some() {
 177            panic!("registered a handler for the same message twice");
 178        }
 179        self
 180    }
 181
 182    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 183    where
 184        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 185        Fut: 'static + Send + Future<Output = Result<M::Response>>,
 186        M: RequestMessage,
 187    {
 188        self.add_message_handler(move |server, envelope| {
 189            let receipt = envelope.receipt();
 190            let response = (handler)(server.clone(), envelope);
 191            async move {
 192                match response.await {
 193                    Ok(response) => {
 194                        server.peer.respond(receipt, response)?;
 195                        Ok(())
 196                    }
 197                    Err(error) => {
 198                        server.peer.respond_with_error(
 199                            receipt,
 200                            proto::Error {
 201                                message: error.to_string(),
 202                            },
 203                        )?;
 204                        Err(error)
 205                    }
 206                }
 207            }
 208        })
 209    }
 210
 211    /// Handle a request while holding a lock to the store. This is useful when we're registering
 212    /// a connection but we want to respond on the connection before anybody else can send on it.
 213    fn add_sync_request_handler<F, M>(&mut self, handler: F) -> &mut Self
 214    where
 215        F: 'static
 216            + Send
 217            + Sync
 218            + Fn(Arc<Self>, &mut Store, TypedEnvelope<M>) -> Result<M::Response>,
 219        M: RequestMessage,
 220    {
 221        let handler = Arc::new(handler);
 222        self.add_message_handler(move |server, envelope| {
 223            let receipt = envelope.receipt();
 224            let handler = handler.clone();
 225            async move {
 226                let mut store = server.state_mut().await;
 227                let response = (handler)(server.clone(), &mut *store, envelope);
 228                match response {
 229                    Ok(response) => {
 230                        server.peer.respond(receipt, response)?;
 231                        Ok(())
 232                    }
 233                    Err(error) => {
 234                        server.peer.respond_with_error(
 235                            receipt,
 236                            proto::Error {
 237                                message: error.to_string(),
 238                            },
 239                        )?;
 240                        Err(error)
 241                    }
 242                }
 243            }
 244        })
 245    }
 246
 247    pub fn handle_connection<E: Executor>(
 248        self: &Arc<Self>,
 249        connection: Connection,
 250        address: String,
 251        user_id: UserId,
 252        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 253        executor: E,
 254    ) -> impl Future<Output = ()> {
 255        let mut this = self.clone();
 256        let span = info_span!("handle connection", %user_id, %address);
 257        async move {
 258            let (connection_id, handle_io, mut incoming_rx) = this
 259                .peer
 260                .add_connection(connection, {
 261                    let executor = executor.clone();
 262                    move |duration| {
 263                        let timer = executor.sleep(duration);
 264                        async move {
 265                            timer.await;
 266                        }
 267                    }
 268                })
 269                .await;
 270
 271            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 272
 273            if let Some(send_connection_id) = send_connection_id.as_mut() {
 274                let _ = send_connection_id.send(connection_id).await;
 275            }
 276
 277            {
 278                let mut state = this.state_mut().await;
 279                state.add_connection(connection_id, user_id);
 280                this.update_contacts_for_users(&*state, &[user_id]);
 281            }
 282
 283            let handle_io = handle_io.fuse();
 284            futures::pin_mut!(handle_io);
 285            loop {
 286                let next_message = incoming_rx.next().fuse();
 287                futures::pin_mut!(next_message);
 288                futures::select_biased! {
 289                    result = handle_io => {
 290                        if let Err(error) = result {
 291                            tracing::error!(%error, "error handling I/O");
 292                        }
 293                        break;
 294                    }
 295                    message = next_message => {
 296                        if let Some(message) = message {
 297                            let type_name = message.payload_type_name();
 298                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 299                            async {
 300                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 301                                    let notifications = this.notifications.clone();
 302                                    let is_background = message.is_background();
 303                                    let handle_message = (handler)(this.clone(), message);
 304                                    let handle_message = async move {
 305                                        handle_message.await;
 306                                        if let Some(mut notifications) = notifications {
 307                                            let _ = notifications.send(()).await;
 308                                        }
 309                                    };
 310                                    if is_background {
 311                                        executor.spawn_detached(handle_message);
 312                                    } else {
 313                                        handle_message.await;
 314                                    }
 315                                } else {
 316                                    tracing::error!("no message handler");
 317                                }
 318                            }.instrument(span).await;
 319                        } else {
 320                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 321                            break;
 322                        }
 323                    }
 324                }
 325            }
 326
 327            if let Err(error) = this.sign_out(connection_id).await {
 328                tracing::error!(%error, "error signing out");
 329            }
 330        }.instrument(span)
 331    }
 332
 333    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 334        self.peer.disconnect(connection_id);
 335        let mut state = self.state_mut().await;
 336        let removed_connection = state.remove_connection(connection_id)?;
 337
 338        for (project_id, project) in removed_connection.hosted_projects {
 339            if let Some(share) = project.share {
 340                broadcast(
 341                    connection_id,
 342                    share.guests.keys().copied().collect(),
 343                    |conn_id| {
 344                        self.peer
 345                            .send(conn_id, proto::UnshareProject { project_id })
 346                    },
 347                );
 348            }
 349        }
 350
 351        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 352            broadcast(connection_id, peer_ids, |conn_id| {
 353                self.peer.send(
 354                    conn_id,
 355                    proto::RemoveProjectCollaborator {
 356                        project_id,
 357                        peer_id: connection_id.0,
 358                    },
 359                )
 360            });
 361        }
 362
 363        self.update_contacts_for_users(&*state, removed_connection.contact_ids.iter());
 364        Ok(())
 365    }
 366
 367    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> Result<proto::Ack> {
 368        Ok(proto::Ack {})
 369    }
 370
 371    async fn register_project(
 372        self: Arc<Server>,
 373        request: TypedEnvelope<proto::RegisterProject>,
 374    ) -> Result<proto::RegisterProjectResponse> {
 375        let project_id = {
 376            let mut state = self.state_mut().await;
 377            let user_id = state.user_id_for_connection(request.sender_id)?;
 378            state.register_project(request.sender_id, user_id)
 379        };
 380        Ok(proto::RegisterProjectResponse { project_id })
 381    }
 382
 383    async fn unregister_project(
 384        self: Arc<Server>,
 385        request: TypedEnvelope<proto::UnregisterProject>,
 386    ) -> Result<()> {
 387        let mut state = self.state_mut().await;
 388        let project = state.unregister_project(request.payload.project_id, request.sender_id)?;
 389        self.update_contacts_for_users(&*state, &project.authorized_user_ids());
 390        Ok(())
 391    }
 392
 393    async fn share_project(
 394        self: Arc<Server>,
 395        request: TypedEnvelope<proto::ShareProject>,
 396    ) -> Result<proto::Ack> {
 397        let mut state = self.state_mut().await;
 398        let project = state.share_project(request.payload.project_id, request.sender_id)?;
 399        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 400        Ok(proto::Ack {})
 401    }
 402
 403    async fn unshare_project(
 404        self: Arc<Server>,
 405        request: TypedEnvelope<proto::UnshareProject>,
 406    ) -> Result<()> {
 407        let project_id = request.payload.project_id;
 408        let mut state = self.state_mut().await;
 409        let project = state.unshare_project(project_id, request.sender_id)?;
 410        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 411            self.peer
 412                .send(conn_id, proto::UnshareProject { project_id })
 413        });
 414        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 415        Ok(())
 416    }
 417
 418    fn join_project(
 419        self: Arc<Server>,
 420        state: &mut Store,
 421        request: TypedEnvelope<proto::JoinProject>,
 422    ) -> Result<proto::JoinProjectResponse> {
 423        let project_id = request.payload.project_id;
 424
 425        let user_id = state.user_id_for_connection(request.sender_id)?;
 426        let (response, connection_ids, contact_user_ids) = state
 427            .join_project(request.sender_id, user_id, project_id)
 428            .and_then(|joined| {
 429                let share = joined.project.share()?;
 430                let peer_count = share.guests.len();
 431                let mut collaborators = Vec::with_capacity(peer_count);
 432                collaborators.push(proto::Collaborator {
 433                    peer_id: joined.project.host_connection_id.0,
 434                    replica_id: 0,
 435                    user_id: joined.project.host_user_id.to_proto(),
 436                });
 437                let worktrees = share
 438                    .worktrees
 439                    .iter()
 440                    .filter_map(|(id, shared_worktree)| {
 441                        let worktree = joined.project.worktrees.get(&id)?;
 442                        Some(proto::Worktree {
 443                            id: *id,
 444                            root_name: worktree.root_name.clone(),
 445                            entries: shared_worktree.entries.values().cloned().collect(),
 446                            diagnostic_summaries: shared_worktree
 447                                .diagnostic_summaries
 448                                .values()
 449                                .cloned()
 450                                .collect(),
 451                            visible: worktree.visible,
 452                            scan_id: shared_worktree.scan_id,
 453                        })
 454                    })
 455                    .collect();
 456                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 457                    if *peer_conn_id != request.sender_id {
 458                        collaborators.push(proto::Collaborator {
 459                            peer_id: peer_conn_id.0,
 460                            replica_id: *peer_replica_id as u32,
 461                            user_id: peer_user_id.to_proto(),
 462                        });
 463                    }
 464                }
 465                let response = proto::JoinProjectResponse {
 466                    worktrees,
 467                    replica_id: joined.replica_id as u32,
 468                    collaborators,
 469                    language_servers: joined.project.language_servers.clone(),
 470                };
 471                let connection_ids = joined.project.connection_ids();
 472                let contact_user_ids = joined.project.authorized_user_ids();
 473                Ok((response, connection_ids, contact_user_ids))
 474            })?;
 475
 476        broadcast(request.sender_id, connection_ids, |conn_id| {
 477            self.peer.send(
 478                conn_id,
 479                proto::AddProjectCollaborator {
 480                    project_id,
 481                    collaborator: Some(proto::Collaborator {
 482                        peer_id: request.sender_id.0,
 483                        replica_id: response.replica_id,
 484                        user_id: user_id.to_proto(),
 485                    }),
 486                },
 487            )
 488        });
 489        self.update_contacts_for_users(state, &contact_user_ids);
 490        Ok(response)
 491    }
 492
 493    async fn leave_project(
 494        self: Arc<Server>,
 495        request: TypedEnvelope<proto::LeaveProject>,
 496    ) -> Result<()> {
 497        let sender_id = request.sender_id;
 498        let project_id = request.payload.project_id;
 499        let mut state = self.state_mut().await;
 500        let worktree = state.leave_project(sender_id, project_id)?;
 501        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 502            self.peer.send(
 503                conn_id,
 504                proto::RemoveProjectCollaborator {
 505                    project_id,
 506                    peer_id: sender_id.0,
 507                },
 508            )
 509        });
 510        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 511        Ok(())
 512    }
 513
 514    async fn register_worktree(
 515        self: Arc<Server>,
 516        request: TypedEnvelope<proto::RegisterWorktree>,
 517    ) -> Result<proto::Ack> {
 518        let mut contact_user_ids = HashSet::default();
 519        for github_login in &request.payload.authorized_logins {
 520            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 521            contact_user_ids.insert(contact_user_id);
 522        }
 523
 524        let mut state = self.state_mut().await;
 525        let host_user_id = state.user_id_for_connection(request.sender_id)?;
 526        contact_user_ids.insert(host_user_id);
 527
 528        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 529        let guest_connection_ids = state
 530            .read_project(request.payload.project_id, request.sender_id)?
 531            .guest_connection_ids();
 532        state.register_worktree(
 533            request.payload.project_id,
 534            request.payload.worktree_id,
 535            request.sender_id,
 536            Worktree {
 537                authorized_user_ids: contact_user_ids.clone(),
 538                root_name: request.payload.root_name.clone(),
 539                visible: request.payload.visible,
 540            },
 541        )?;
 542
 543        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 544            self.peer
 545                .forward_send(request.sender_id, connection_id, request.payload.clone())
 546        });
 547        self.update_contacts_for_users(&*state, &contact_user_ids);
 548        Ok(proto::Ack {})
 549    }
 550
 551    async fn unregister_worktree(
 552        self: Arc<Server>,
 553        request: TypedEnvelope<proto::UnregisterWorktree>,
 554    ) -> Result<()> {
 555        let project_id = request.payload.project_id;
 556        let worktree_id = request.payload.worktree_id;
 557        let mut state = self.state_mut().await;
 558        let (worktree, guest_connection_ids) =
 559            state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 560        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 561            self.peer.send(
 562                conn_id,
 563                proto::UnregisterWorktree {
 564                    project_id,
 565                    worktree_id,
 566                },
 567            )
 568        });
 569        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 570        Ok(())
 571    }
 572
 573    async fn update_worktree(
 574        self: Arc<Server>,
 575        request: TypedEnvelope<proto::UpdateWorktree>,
 576    ) -> Result<proto::Ack> {
 577        let connection_ids = self.state_mut().await.update_worktree(
 578            request.sender_id,
 579            request.payload.project_id,
 580            request.payload.worktree_id,
 581            &request.payload.removed_entries,
 582            &request.payload.updated_entries,
 583            request.payload.scan_id,
 584        )?;
 585
 586        broadcast(request.sender_id, connection_ids, |connection_id| {
 587            self.peer
 588                .forward_send(request.sender_id, connection_id, request.payload.clone())
 589        });
 590
 591        Ok(proto::Ack {})
 592    }
 593
 594    async fn update_diagnostic_summary(
 595        self: Arc<Server>,
 596        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 597    ) -> Result<()> {
 598        let summary = request
 599            .payload
 600            .summary
 601            .clone()
 602            .ok_or_else(|| anyhow!("invalid summary"))?;
 603        let receiver_ids = self.state_mut().await.update_diagnostic_summary(
 604            request.payload.project_id,
 605            request.payload.worktree_id,
 606            request.sender_id,
 607            summary,
 608        )?;
 609
 610        broadcast(request.sender_id, receiver_ids, |connection_id| {
 611            self.peer
 612                .forward_send(request.sender_id, connection_id, request.payload.clone())
 613        });
 614        Ok(())
 615    }
 616
 617    async fn start_language_server(
 618        self: Arc<Server>,
 619        request: TypedEnvelope<proto::StartLanguageServer>,
 620    ) -> Result<()> {
 621        let receiver_ids = self.state_mut().await.start_language_server(
 622            request.payload.project_id,
 623            request.sender_id,
 624            request
 625                .payload
 626                .server
 627                .clone()
 628                .ok_or_else(|| anyhow!("invalid language server"))?,
 629        )?;
 630        broadcast(request.sender_id, receiver_ids, |connection_id| {
 631            self.peer
 632                .forward_send(request.sender_id, connection_id, request.payload.clone())
 633        });
 634        Ok(())
 635    }
 636
 637    async fn update_language_server(
 638        self: Arc<Server>,
 639        request: TypedEnvelope<proto::UpdateLanguageServer>,
 640    ) -> Result<()> {
 641        let receiver_ids = self
 642            .state()
 643            .await
 644            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 645        broadcast(request.sender_id, receiver_ids, |connection_id| {
 646            self.peer
 647                .forward_send(request.sender_id, connection_id, request.payload.clone())
 648        });
 649        Ok(())
 650    }
 651
 652    async fn forward_project_request<T>(
 653        self: Arc<Server>,
 654        request: TypedEnvelope<T>,
 655    ) -> Result<T::Response>
 656    where
 657        T: EntityMessage + RequestMessage,
 658    {
 659        let host_connection_id = self
 660            .state()
 661            .await
 662            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 663            .host_connection_id;
 664        Ok(self
 665            .peer
 666            .forward_request(request.sender_id, host_connection_id, request.payload)
 667            .await?)
 668    }
 669
 670    async fn save_buffer(
 671        self: Arc<Server>,
 672        request: TypedEnvelope<proto::SaveBuffer>,
 673    ) -> Result<proto::BufferSaved> {
 674        let host = self
 675            .state()
 676            .await
 677            .read_project(request.payload.project_id, request.sender_id)?
 678            .host_connection_id;
 679        let response = self
 680            .peer
 681            .forward_request(request.sender_id, host, request.payload.clone())
 682            .await?;
 683
 684        let mut guests = self
 685            .state()
 686            .await
 687            .read_project(request.payload.project_id, request.sender_id)?
 688            .connection_ids();
 689        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 690        broadcast(host, guests, |conn_id| {
 691            self.peer.forward_send(host, conn_id, response.clone())
 692        });
 693
 694        Ok(response)
 695    }
 696
 697    async fn update_buffer(
 698        self: Arc<Server>,
 699        request: TypedEnvelope<proto::UpdateBuffer>,
 700    ) -> Result<proto::Ack> {
 701        let receiver_ids = self
 702            .state()
 703            .await
 704            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 705        broadcast(request.sender_id, receiver_ids, |connection_id| {
 706            self.peer
 707                .forward_send(request.sender_id, connection_id, request.payload.clone())
 708        });
 709        Ok(proto::Ack {})
 710    }
 711
 712    async fn update_buffer_file(
 713        self: Arc<Server>,
 714        request: TypedEnvelope<proto::UpdateBufferFile>,
 715    ) -> Result<()> {
 716        let receiver_ids = self
 717            .state()
 718            .await
 719            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 720        broadcast(request.sender_id, receiver_ids, |connection_id| {
 721            self.peer
 722                .forward_send(request.sender_id, connection_id, request.payload.clone())
 723        });
 724        Ok(())
 725    }
 726
 727    async fn buffer_reloaded(
 728        self: Arc<Server>,
 729        request: TypedEnvelope<proto::BufferReloaded>,
 730    ) -> Result<()> {
 731        let receiver_ids = self
 732            .state()
 733            .await
 734            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 735        broadcast(request.sender_id, receiver_ids, |connection_id| {
 736            self.peer
 737                .forward_send(request.sender_id, connection_id, request.payload.clone())
 738        });
 739        Ok(())
 740    }
 741
 742    async fn buffer_saved(
 743        self: Arc<Server>,
 744        request: TypedEnvelope<proto::BufferSaved>,
 745    ) -> Result<()> {
 746        let receiver_ids = self
 747            .state()
 748            .await
 749            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 750        broadcast(request.sender_id, receiver_ids, |connection_id| {
 751            self.peer
 752                .forward_send(request.sender_id, connection_id, request.payload.clone())
 753        });
 754        Ok(())
 755    }
 756
 757    async fn follow(
 758        self: Arc<Self>,
 759        request: TypedEnvelope<proto::Follow>,
 760    ) -> Result<proto::FollowResponse> {
 761        let leader_id = ConnectionId(request.payload.leader_id);
 762        let follower_id = request.sender_id;
 763        if !self
 764            .state()
 765            .await
 766            .project_connection_ids(request.payload.project_id, follower_id)?
 767            .contains(&leader_id)
 768        {
 769            Err(anyhow!("no such peer"))?;
 770        }
 771        let mut response = self
 772            .peer
 773            .forward_request(request.sender_id, leader_id, request.payload)
 774            .await?;
 775        response
 776            .views
 777            .retain(|view| view.leader_id != Some(follower_id.0));
 778        Ok(response)
 779    }
 780
 781    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 782        let leader_id = ConnectionId(request.payload.leader_id);
 783        if !self
 784            .state()
 785            .await
 786            .project_connection_ids(request.payload.project_id, request.sender_id)?
 787            .contains(&leader_id)
 788        {
 789            Err(anyhow!("no such peer"))?;
 790        }
 791        self.peer
 792            .forward_send(request.sender_id, leader_id, request.payload)?;
 793        Ok(())
 794    }
 795
 796    async fn update_followers(
 797        self: Arc<Self>,
 798        request: TypedEnvelope<proto::UpdateFollowers>,
 799    ) -> Result<()> {
 800        let connection_ids = self
 801            .state()
 802            .await
 803            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 804        let leader_id = request
 805            .payload
 806            .variant
 807            .as_ref()
 808            .and_then(|variant| match variant {
 809                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 810                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 811                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 812            });
 813        for follower_id in &request.payload.follower_ids {
 814            let follower_id = ConnectionId(*follower_id);
 815            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 816                self.peer
 817                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 818            }
 819        }
 820        Ok(())
 821    }
 822
 823    async fn get_channels(
 824        self: Arc<Server>,
 825        request: TypedEnvelope<proto::GetChannels>,
 826    ) -> Result<proto::GetChannelsResponse> {
 827        let user_id = self
 828            .state()
 829            .await
 830            .user_id_for_connection(request.sender_id)?;
 831        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 832        Ok(proto::GetChannelsResponse {
 833            channels: channels
 834                .into_iter()
 835                .map(|chan| proto::Channel {
 836                    id: chan.id.to_proto(),
 837                    name: chan.name,
 838                })
 839                .collect(),
 840        })
 841    }
 842
 843    async fn get_users(
 844        self: Arc<Server>,
 845        request: TypedEnvelope<proto::GetUsers>,
 846    ) -> Result<proto::UsersResponse> {
 847        let user_ids = request
 848            .payload
 849            .user_ids
 850            .into_iter()
 851            .map(UserId::from_proto)
 852            .collect();
 853        let users = self
 854            .app_state
 855            .db
 856            .get_users_by_ids(user_ids)
 857            .await?
 858            .into_iter()
 859            .map(|user| proto::User {
 860                id: user.id.to_proto(),
 861                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 862                github_login: user.github_login,
 863            })
 864            .collect();
 865        Ok(proto::UsersResponse { users })
 866    }
 867
 868    async fn fuzzy_search_users(
 869        self: Arc<Server>,
 870        request: TypedEnvelope<proto::FuzzySearchUsers>,
 871    ) -> Result<proto::UsersResponse> {
 872        let query = request.payload.query;
 873        let db = &self.app_state.db;
 874        let users = match query.len() {
 875            0 => vec![],
 876            1 | 2 => db
 877                .get_user_by_github_login(&query)
 878                .await?
 879                .into_iter()
 880                .collect(),
 881            _ => db.fuzzy_search_users(&query, 10).await?,
 882        };
 883        let users = users
 884            .into_iter()
 885            .map(|user| proto::User {
 886                id: user.id.to_proto(),
 887                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 888                github_login: user.github_login,
 889            })
 890            .collect();
 891        Ok(proto::UsersResponse { users })
 892    }
 893
 894    #[instrument(skip(self, state, user_ids))]
 895    fn update_contacts_for_users<'a>(
 896        self: &Arc<Self>,
 897        state: &Store,
 898        user_ids: impl IntoIterator<Item = &'a UserId>,
 899    ) {
 900        for user_id in user_ids {
 901            let contacts = state.contacts_for_user(*user_id);
 902            for connection_id in state.connection_ids_for_user(*user_id) {
 903                self.peer
 904                    .send(
 905                        connection_id,
 906                        proto::UpdateContacts {
 907                            contacts: contacts.clone(),
 908                        },
 909                    )
 910                    .trace_err();
 911            }
 912        }
 913    }
 914
 915    async fn join_channel(
 916        self: Arc<Self>,
 917        request: TypedEnvelope<proto::JoinChannel>,
 918    ) -> Result<proto::JoinChannelResponse> {
 919        let user_id = self
 920            .state()
 921            .await
 922            .user_id_for_connection(request.sender_id)?;
 923        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 924        if !self
 925            .app_state
 926            .db
 927            .can_user_access_channel(user_id, channel_id)
 928            .await?
 929        {
 930            Err(anyhow!("access denied"))?;
 931        }
 932
 933        self.state_mut()
 934            .await
 935            .join_channel(request.sender_id, channel_id);
 936        let messages = self
 937            .app_state
 938            .db
 939            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 940            .await?
 941            .into_iter()
 942            .map(|msg| proto::ChannelMessage {
 943                id: msg.id.to_proto(),
 944                body: msg.body,
 945                timestamp: msg.sent_at.unix_timestamp() as u64,
 946                sender_id: msg.sender_id.to_proto(),
 947                nonce: Some(msg.nonce.as_u128().into()),
 948            })
 949            .collect::<Vec<_>>();
 950        Ok(proto::JoinChannelResponse {
 951            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 952            messages,
 953        })
 954    }
 955
 956    async fn leave_channel(
 957        self: Arc<Self>,
 958        request: TypedEnvelope<proto::LeaveChannel>,
 959    ) -> Result<()> {
 960        let user_id = self
 961            .state()
 962            .await
 963            .user_id_for_connection(request.sender_id)?;
 964        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 965        if !self
 966            .app_state
 967            .db
 968            .can_user_access_channel(user_id, channel_id)
 969            .await?
 970        {
 971            Err(anyhow!("access denied"))?;
 972        }
 973
 974        self.state_mut()
 975            .await
 976            .leave_channel(request.sender_id, channel_id);
 977
 978        Ok(())
 979    }
 980
 981    async fn send_channel_message(
 982        self: Arc<Self>,
 983        request: TypedEnvelope<proto::SendChannelMessage>,
 984    ) -> Result<proto::SendChannelMessageResponse> {
 985        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 986        let user_id;
 987        let connection_ids;
 988        {
 989            let state = self.state().await;
 990            user_id = state.user_id_for_connection(request.sender_id)?;
 991            connection_ids = state.channel_connection_ids(channel_id)?;
 992        }
 993
 994        // Validate the message body.
 995        let body = request.payload.body.trim().to_string();
 996        if body.len() > MAX_MESSAGE_LEN {
 997            return Err(anyhow!("message is too long"))?;
 998        }
 999        if body.is_empty() {
1000            return Err(anyhow!("message can't be blank"))?;
1001        }
1002
1003        let timestamp = OffsetDateTime::now_utc();
1004        let nonce = request
1005            .payload
1006            .nonce
1007            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1008
1009        let message_id = self
1010            .app_state
1011            .db
1012            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1013            .await?
1014            .to_proto();
1015        let message = proto::ChannelMessage {
1016            sender_id: user_id.to_proto(),
1017            id: message_id,
1018            body,
1019            timestamp: timestamp.unix_timestamp() as u64,
1020            nonce: Some(nonce),
1021        };
1022        broadcast(request.sender_id, connection_ids, |conn_id| {
1023            self.peer.send(
1024                conn_id,
1025                proto::ChannelMessageSent {
1026                    channel_id: channel_id.to_proto(),
1027                    message: Some(message.clone()),
1028                },
1029            )
1030        });
1031        Ok(proto::SendChannelMessageResponse {
1032            message: Some(message),
1033        })
1034    }
1035
1036    async fn get_channel_messages(
1037        self: Arc<Self>,
1038        request: TypedEnvelope<proto::GetChannelMessages>,
1039    ) -> Result<proto::GetChannelMessagesResponse> {
1040        let user_id = self
1041            .state()
1042            .await
1043            .user_id_for_connection(request.sender_id)?;
1044        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1045        if !self
1046            .app_state
1047            .db
1048            .can_user_access_channel(user_id, channel_id)
1049            .await?
1050        {
1051            Err(anyhow!("access denied"))?;
1052        }
1053
1054        let messages = self
1055            .app_state
1056            .db
1057            .get_channel_messages(
1058                channel_id,
1059                MESSAGE_COUNT_PER_PAGE,
1060                Some(MessageId::from_proto(request.payload.before_message_id)),
1061            )
1062            .await?
1063            .into_iter()
1064            .map(|msg| proto::ChannelMessage {
1065                id: msg.id.to_proto(),
1066                body: msg.body,
1067                timestamp: msg.sent_at.unix_timestamp() as u64,
1068                sender_id: msg.sender_id.to_proto(),
1069                nonce: Some(msg.nonce.as_u128().into()),
1070            })
1071            .collect::<Vec<_>>();
1072
1073        Ok(proto::GetChannelMessagesResponse {
1074            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1075            messages,
1076        })
1077    }
1078
1079    async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1080        #[cfg(test)]
1081        tokio::task::yield_now().await;
1082        let guard = self.store.read().await;
1083        #[cfg(test)]
1084        tokio::task::yield_now().await;
1085        StoreReadGuard {
1086            guard,
1087            _not_send: PhantomData,
1088        }
1089    }
1090
1091    async fn state_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1092        #[cfg(test)]
1093        tokio::task::yield_now().await;
1094        let guard = self.store.write().await;
1095        #[cfg(test)]
1096        tokio::task::yield_now().await;
1097        StoreWriteGuard {
1098            guard,
1099            _not_send: PhantomData,
1100        }
1101    }
1102}
1103
1104impl<'a> Deref for StoreReadGuard<'a> {
1105    type Target = Store;
1106
1107    fn deref(&self) -> &Self::Target {
1108        &*self.guard
1109    }
1110}
1111
1112impl<'a> Deref for StoreWriteGuard<'a> {
1113    type Target = Store;
1114
1115    fn deref(&self) -> &Self::Target {
1116        &*self.guard
1117    }
1118}
1119
1120impl<'a> DerefMut for StoreWriteGuard<'a> {
1121    fn deref_mut(&mut self) -> &mut Self::Target {
1122        &mut *self.guard
1123    }
1124}
1125
1126impl<'a> Drop for StoreWriteGuard<'a> {
1127    fn drop(&mut self) {
1128        #[cfg(test)]
1129        self.check_invariants();
1130
1131        let metrics = self.metrics();
1132        tracing::info!(
1133            connections = metrics.connections,
1134            registered_projects = metrics.registered_projects,
1135            shared_projects = metrics.shared_projects,
1136            "metrics"
1137        );
1138    }
1139}
1140
1141impl Executor for RealExecutor {
1142    type Sleep = Sleep;
1143
1144    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1145        tokio::task::spawn(future);
1146    }
1147
1148    fn sleep(&self, duration: Duration) -> Self::Sleep {
1149        tokio::time::sleep(duration)
1150    }
1151}
1152
1153#[instrument(skip(f))]
1154fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1155where
1156    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1157{
1158    for receiver_id in receiver_ids {
1159        if receiver_id != sender_id {
1160            f(receiver_id).trace_err();
1161        }
1162    }
1163}
1164
1165lazy_static! {
1166    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1167}
1168
1169pub struct ProtocolVersion(u32);
1170
1171impl Header for ProtocolVersion {
1172    fn name() -> &'static HeaderName {
1173        &ZED_PROTOCOL_VERSION
1174    }
1175
1176    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1177    where
1178        Self: Sized,
1179        I: Iterator<Item = &'i axum::http::HeaderValue>,
1180    {
1181        let version = values
1182            .next()
1183            .ok_or_else(|| axum::headers::Error::invalid())?
1184            .to_str()
1185            .map_err(|_| axum::headers::Error::invalid())?
1186            .parse()
1187            .map_err(|_| axum::headers::Error::invalid())?;
1188        Ok(Self(version))
1189    }
1190
1191    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1192        values.extend([self.0.to_string().parse().unwrap()]);
1193    }
1194}
1195
1196pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1197    let server = Server::new(app_state.clone(), None);
1198    Router::new()
1199        .route("/rpc", get(handle_websocket_request))
1200        .layer(
1201            ServiceBuilder::new()
1202                .layer(Extension(app_state))
1203                .layer(middleware::from_fn(auth::validate_header))
1204                .layer(Extension(server)),
1205        )
1206}
1207
1208pub async fn handle_websocket_request(
1209    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1210    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1211    Extension(server): Extension<Arc<Server>>,
1212    Extension(user_id): Extension<UserId>,
1213    ws: WebSocketUpgrade,
1214) -> Response {
1215    if protocol_version != rpc::PROTOCOL_VERSION {
1216        return (
1217            StatusCode::UPGRADE_REQUIRED,
1218            "client must be upgraded".to_string(),
1219        )
1220            .into_response();
1221    }
1222    let socket_address = socket_address.to_string();
1223    ws.on_upgrade(move |socket| {
1224        let socket = socket
1225            .map_ok(to_tungstenite_message)
1226            .err_into()
1227            .with(|message| async move { Ok(to_axum_message(message)) });
1228        let connection = Connection::new(Box::pin(socket));
1229        server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
1230    })
1231}
1232
1233fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1234    match message {
1235        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1236        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1237        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1238        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1239        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1240            code: frame.code.into(),
1241            reason: frame.reason,
1242        })),
1243    }
1244}
1245
1246fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1247    match message {
1248        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1249        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1250        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1251        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1252        AxumMessage::Close(frame) => {
1253            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1254                code: frame.code.into(),
1255                reason: frame.reason,
1256            }))
1257        }
1258    }
1259}
1260
1261pub trait ResultExt {
1262    type Ok;
1263
1264    fn trace_err(self) -> Option<Self::Ok>;
1265}
1266
1267impl<T, E> ResultExt for Result<T, E>
1268where
1269    E: std::fmt::Debug,
1270{
1271    type Ok = T;
1272
1273    fn trace_err(self) -> Option<T> {
1274        match self {
1275            Ok(value) => Some(value),
1276            Err(error) => {
1277                tracing::error!("{:?}", error);
1278                None
1279            }
1280        }
1281    }
1282}
1283
1284#[cfg(test)]
1285mod tests {
1286    use super::*;
1287    use crate::{
1288        db::{tests::TestDb, UserId},
1289        AppState,
1290    };
1291    use ::rpc::Peer;
1292    use client::{
1293        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1294        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1295    };
1296    use collections::BTreeMap;
1297    use editor::{
1298        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1299        ToOffset, ToggleCodeActions, Undo,
1300    };
1301    use gpui::{
1302        executor::{self, Deterministic},
1303        geometry::vector::vec2f,
1304        ModelHandle, TestAppContext, ViewHandle,
1305    };
1306    use language::{
1307        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1308        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1309    };
1310    use lsp::{self, FakeLanguageServer};
1311    use parking_lot::Mutex;
1312    use project::{
1313        fs::{FakeFs, Fs as _},
1314        search::SearchQuery,
1315        worktree::WorktreeHandle,
1316        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1317    };
1318    use rand::prelude::*;
1319    use rpc::PeerId;
1320    use serde_json::json;
1321    use settings::Settings;
1322    use sqlx::types::time::OffsetDateTime;
1323    use std::{
1324        env,
1325        ops::Deref,
1326        path::{Path, PathBuf},
1327        rc::Rc,
1328        sync::{
1329            atomic::{AtomicBool, Ordering::SeqCst},
1330            Arc,
1331        },
1332        time::Duration,
1333    };
1334    use theme::ThemeRegistry;
1335    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1336
1337    #[cfg(test)]
1338    #[ctor::ctor]
1339    fn init_logger() {
1340        if std::env::var("RUST_LOG").is_ok() {
1341            env_logger::init();
1342        }
1343    }
1344
1345    #[gpui::test(iterations = 10)]
1346    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1347        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1348        let lang_registry = Arc::new(LanguageRegistry::test());
1349        let fs = FakeFs::new(cx_a.background());
1350        cx_a.foreground().forbid_parking();
1351
1352        // Connect to a server as 2 clients.
1353        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1354        let client_a = server.create_client(cx_a, "user_a").await;
1355        let client_b = server.create_client(cx_b, "user_b").await;
1356
1357        // Share a project as client A
1358        fs.insert_tree(
1359            "/a",
1360            json!({
1361                ".zed.toml": r#"collaborators = ["user_b"]"#,
1362                "a.txt": "a-contents",
1363                "b.txt": "b-contents",
1364            }),
1365        )
1366        .await;
1367        let project_a = cx_a.update(|cx| {
1368            Project::local(
1369                client_a.clone(),
1370                client_a.user_store.clone(),
1371                lang_registry.clone(),
1372                fs.clone(),
1373                cx,
1374            )
1375        });
1376        let (worktree_a, _) = project_a
1377            .update(cx_a, |p, cx| {
1378                p.find_or_create_local_worktree("/a", true, cx)
1379            })
1380            .await
1381            .unwrap();
1382        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1383        worktree_a
1384            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1385            .await;
1386        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1387        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1388
1389        // Join that project as client B
1390        let project_b = Project::remote(
1391            project_id,
1392            client_b.clone(),
1393            client_b.user_store.clone(),
1394            lang_registry.clone(),
1395            fs.clone(),
1396            &mut cx_b.to_async(),
1397        )
1398        .await
1399        .unwrap();
1400
1401        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1402            assert_eq!(
1403                project
1404                    .collaborators()
1405                    .get(&client_a.peer_id)
1406                    .unwrap()
1407                    .user
1408                    .github_login,
1409                "user_a"
1410            );
1411            project.replica_id()
1412        });
1413        project_a
1414            .condition(&cx_a, |tree, _| {
1415                tree.collaborators()
1416                    .get(&client_b.peer_id)
1417                    .map_or(false, |collaborator| {
1418                        collaborator.replica_id == replica_id_b
1419                            && collaborator.user.github_login == "user_b"
1420                    })
1421            })
1422            .await;
1423
1424        // Open the same file as client B and client A.
1425        let buffer_b = project_b
1426            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1427            .await
1428            .unwrap();
1429        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1430        project_a.read_with(cx_a, |project, cx| {
1431            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1432        });
1433        let buffer_a = project_a
1434            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1435            .await
1436            .unwrap();
1437
1438        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1439
1440        // TODO
1441        // // Create a selection set as client B and see that selection set as client A.
1442        // buffer_a
1443        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1444        //     .await;
1445
1446        // Edit the buffer as client B and see that edit as client A.
1447        editor_b.update(cx_b, |editor, cx| {
1448            editor.handle_input(&Input("ok, ".into()), cx)
1449        });
1450        buffer_a
1451            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1452            .await;
1453
1454        // TODO
1455        // // Remove the selection set as client B, see those selections disappear as client A.
1456        cx_b.update(move |_| drop(editor_b));
1457        // buffer_a
1458        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1459        //     .await;
1460
1461        // Dropping the client B's project removes client B from client A's collaborators.
1462        cx_b.update(move |_| drop(project_b));
1463        project_a
1464            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1465            .await;
1466    }
1467
1468    #[gpui::test(iterations = 10)]
1469    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1470        let lang_registry = Arc::new(LanguageRegistry::test());
1471        let fs = FakeFs::new(cx_a.background());
1472        cx_a.foreground().forbid_parking();
1473
1474        // Connect to a server as 2 clients.
1475        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1476        let client_a = server.create_client(cx_a, "user_a").await;
1477        let client_b = server.create_client(cx_b, "user_b").await;
1478
1479        // Share a project as client A
1480        fs.insert_tree(
1481            "/a",
1482            json!({
1483                ".zed.toml": r#"collaborators = ["user_b"]"#,
1484                "a.txt": "a-contents",
1485                "b.txt": "b-contents",
1486            }),
1487        )
1488        .await;
1489        let project_a = cx_a.update(|cx| {
1490            Project::local(
1491                client_a.clone(),
1492                client_a.user_store.clone(),
1493                lang_registry.clone(),
1494                fs.clone(),
1495                cx,
1496            )
1497        });
1498        let (worktree_a, _) = project_a
1499            .update(cx_a, |p, cx| {
1500                p.find_or_create_local_worktree("/a", true, cx)
1501            })
1502            .await
1503            .unwrap();
1504        worktree_a
1505            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1506            .await;
1507        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1508        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1509        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1510        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1511
1512        // Join that project as client B
1513        let project_b = Project::remote(
1514            project_id,
1515            client_b.clone(),
1516            client_b.user_store.clone(),
1517            lang_registry.clone(),
1518            fs.clone(),
1519            &mut cx_b.to_async(),
1520        )
1521        .await
1522        .unwrap();
1523        project_b
1524            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1525            .await
1526            .unwrap();
1527
1528        // Unshare the project as client A
1529        project_a.update(cx_a, |project, cx| project.unshare(cx));
1530        project_b
1531            .condition(cx_b, |project, _| project.is_read_only())
1532            .await;
1533        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1534        cx_b.update(|_| {
1535            drop(project_b);
1536        });
1537
1538        // Share the project again and ensure guests can still join.
1539        project_a
1540            .update(cx_a, |project, cx| project.share(cx))
1541            .await
1542            .unwrap();
1543        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1544
1545        let project_b2 = Project::remote(
1546            project_id,
1547            client_b.clone(),
1548            client_b.user_store.clone(),
1549            lang_registry.clone(),
1550            fs.clone(),
1551            &mut cx_b.to_async(),
1552        )
1553        .await
1554        .unwrap();
1555        project_b2
1556            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1557            .await
1558            .unwrap();
1559    }
1560
1561    #[gpui::test(iterations = 10)]
1562    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1563        let lang_registry = Arc::new(LanguageRegistry::test());
1564        let fs = FakeFs::new(cx_a.background());
1565        cx_a.foreground().forbid_parking();
1566
1567        // Connect to a server as 2 clients.
1568        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1569        let client_a = server.create_client(cx_a, "user_a").await;
1570        let client_b = server.create_client(cx_b, "user_b").await;
1571
1572        // Share a project as client A
1573        fs.insert_tree(
1574            "/a",
1575            json!({
1576                ".zed.toml": r#"collaborators = ["user_b"]"#,
1577                "a.txt": "a-contents",
1578                "b.txt": "b-contents",
1579            }),
1580        )
1581        .await;
1582        let project_a = cx_a.update(|cx| {
1583            Project::local(
1584                client_a.clone(),
1585                client_a.user_store.clone(),
1586                lang_registry.clone(),
1587                fs.clone(),
1588                cx,
1589            )
1590        });
1591        let (worktree_a, _) = project_a
1592            .update(cx_a, |p, cx| {
1593                p.find_or_create_local_worktree("/a", true, cx)
1594            })
1595            .await
1596            .unwrap();
1597        worktree_a
1598            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1599            .await;
1600        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1601        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1602        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1603        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1604
1605        // Join that project as client B
1606        let project_b = Project::remote(
1607            project_id,
1608            client_b.clone(),
1609            client_b.user_store.clone(),
1610            lang_registry.clone(),
1611            fs.clone(),
1612            &mut cx_b.to_async(),
1613        )
1614        .await
1615        .unwrap();
1616        project_b
1617            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1618            .await
1619            .unwrap();
1620
1621        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1622        server.disconnect_client(client_a.current_user_id(cx_a));
1623        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1624        project_a
1625            .condition(cx_a, |project, _| project.collaborators().is_empty())
1626            .await;
1627        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1628        project_b
1629            .condition(cx_b, |project, _| project.is_read_only())
1630            .await;
1631        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1632        cx_b.update(|_| {
1633            drop(project_b);
1634        });
1635
1636        // Await reconnection
1637        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1638
1639        // Share the project again and ensure guests can still join.
1640        project_a
1641            .update(cx_a, |project, cx| project.share(cx))
1642            .await
1643            .unwrap();
1644        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1645
1646        let project_b2 = Project::remote(
1647            project_id,
1648            client_b.clone(),
1649            client_b.user_store.clone(),
1650            lang_registry.clone(),
1651            fs.clone(),
1652            &mut cx_b.to_async(),
1653        )
1654        .await
1655        .unwrap();
1656        project_b2
1657            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1658            .await
1659            .unwrap();
1660    }
1661
1662    #[gpui::test(iterations = 10)]
1663    async fn test_propagate_saves_and_fs_changes(
1664        cx_a: &mut TestAppContext,
1665        cx_b: &mut TestAppContext,
1666        cx_c: &mut TestAppContext,
1667    ) {
1668        let lang_registry = Arc::new(LanguageRegistry::test());
1669        let fs = FakeFs::new(cx_a.background());
1670        cx_a.foreground().forbid_parking();
1671
1672        // Connect to a server as 3 clients.
1673        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1674        let client_a = server.create_client(cx_a, "user_a").await;
1675        let client_b = server.create_client(cx_b, "user_b").await;
1676        let client_c = server.create_client(cx_c, "user_c").await;
1677
1678        // Share a worktree as client A.
1679        fs.insert_tree(
1680            "/a",
1681            json!({
1682                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1683                "file1": "",
1684                "file2": ""
1685            }),
1686        )
1687        .await;
1688        let project_a = cx_a.update(|cx| {
1689            Project::local(
1690                client_a.clone(),
1691                client_a.user_store.clone(),
1692                lang_registry.clone(),
1693                fs.clone(),
1694                cx,
1695            )
1696        });
1697        let (worktree_a, _) = project_a
1698            .update(cx_a, |p, cx| {
1699                p.find_or_create_local_worktree("/a", true, cx)
1700            })
1701            .await
1702            .unwrap();
1703        worktree_a
1704            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1705            .await;
1706        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1707        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1708        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1709
1710        // Join that worktree as clients B and C.
1711        let project_b = Project::remote(
1712            project_id,
1713            client_b.clone(),
1714            client_b.user_store.clone(),
1715            lang_registry.clone(),
1716            fs.clone(),
1717            &mut cx_b.to_async(),
1718        )
1719        .await
1720        .unwrap();
1721        let project_c = Project::remote(
1722            project_id,
1723            client_c.clone(),
1724            client_c.user_store.clone(),
1725            lang_registry.clone(),
1726            fs.clone(),
1727            &mut cx_c.to_async(),
1728        )
1729        .await
1730        .unwrap();
1731        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1732        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1733
1734        // Open and edit a buffer as both guests B and C.
1735        let buffer_b = project_b
1736            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1737            .await
1738            .unwrap();
1739        let buffer_c = project_c
1740            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1741            .await
1742            .unwrap();
1743        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1744        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1745
1746        // Open and edit that buffer as the host.
1747        let buffer_a = project_a
1748            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1749            .await
1750            .unwrap();
1751
1752        buffer_a
1753            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1754            .await;
1755        buffer_a.update(cx_a, |buf, cx| {
1756            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1757        });
1758
1759        // Wait for edits to propagate
1760        buffer_a
1761            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1762            .await;
1763        buffer_b
1764            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1765            .await;
1766        buffer_c
1767            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1768            .await;
1769
1770        // Edit the buffer as the host and concurrently save as guest B.
1771        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1772        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
1773        save_b.await.unwrap();
1774        assert_eq!(
1775            fs.load("/a/file1".as_ref()).await.unwrap(),
1776            "hi-a, i-am-c, i-am-b, i-am-a"
1777        );
1778        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1779        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1780        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1781
1782        worktree_a.flush_fs_events(cx_a).await;
1783
1784        // Make changes on host's file system, see those changes on guest worktrees.
1785        fs.rename(
1786            "/a/file1".as_ref(),
1787            "/a/file1-renamed".as_ref(),
1788            Default::default(),
1789        )
1790        .await
1791        .unwrap();
1792
1793        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1794            .await
1795            .unwrap();
1796        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1797
1798        worktree_a
1799            .condition(&cx_a, |tree, _| {
1800                tree.paths()
1801                    .map(|p| p.to_string_lossy())
1802                    .collect::<Vec<_>>()
1803                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1804            })
1805            .await;
1806        worktree_b
1807            .condition(&cx_b, |tree, _| {
1808                tree.paths()
1809                    .map(|p| p.to_string_lossy())
1810                    .collect::<Vec<_>>()
1811                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1812            })
1813            .await;
1814        worktree_c
1815            .condition(&cx_c, |tree, _| {
1816                tree.paths()
1817                    .map(|p| p.to_string_lossy())
1818                    .collect::<Vec<_>>()
1819                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1820            })
1821            .await;
1822
1823        // Ensure buffer files are updated as well.
1824        buffer_a
1825            .condition(&cx_a, |buf, _| {
1826                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1827            })
1828            .await;
1829        buffer_b
1830            .condition(&cx_b, |buf, _| {
1831                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1832            })
1833            .await;
1834        buffer_c
1835            .condition(&cx_c, |buf, _| {
1836                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1837            })
1838            .await;
1839    }
1840
1841    #[gpui::test(iterations = 10)]
1842    async fn test_fs_operations(
1843        executor: Arc<Deterministic>,
1844        cx_a: &mut TestAppContext,
1845        cx_b: &mut TestAppContext,
1846    ) {
1847        executor.forbid_parking();
1848        let fs = FakeFs::new(cx_a.background());
1849
1850        // Connect to a server as 2 clients.
1851        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1852        let mut client_a = server.create_client(cx_a, "user_a").await;
1853        let mut client_b = server.create_client(cx_b, "user_b").await;
1854
1855        // Share a project as client A
1856        fs.insert_tree(
1857            "/dir",
1858            json!({
1859                ".zed.toml": r#"collaborators = ["user_b"]"#,
1860                "a.txt": "a-contents",
1861                "b.txt": "b-contents",
1862            }),
1863        )
1864        .await;
1865
1866        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
1867        let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
1868        project_a
1869            .update(cx_a, |project, cx| project.share(cx))
1870            .await
1871            .unwrap();
1872
1873        let project_b = client_b.build_remote_project(project_id, cx_b).await;
1874
1875        let worktree_a =
1876            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
1877        let worktree_b =
1878            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
1879
1880        let entry = project_b
1881            .update(cx_b, |project, cx| {
1882                project
1883                    .create_entry((worktree_id, "c.txt"), false, cx)
1884                    .unwrap()
1885            })
1886            .await
1887            .unwrap();
1888        worktree_a.read_with(cx_a, |worktree, _| {
1889            assert_eq!(
1890                worktree
1891                    .paths()
1892                    .map(|p| p.to_string_lossy())
1893                    .collect::<Vec<_>>(),
1894                [".zed.toml", "a.txt", "b.txt", "c.txt"]
1895            );
1896        });
1897        worktree_b.read_with(cx_b, |worktree, _| {
1898            assert_eq!(
1899                worktree
1900                    .paths()
1901                    .map(|p| p.to_string_lossy())
1902                    .collect::<Vec<_>>(),
1903                [".zed.toml", "a.txt", "b.txt", "c.txt"]
1904            );
1905        });
1906
1907        project_b
1908            .update(cx_b, |project, cx| {
1909                project.rename_entry(entry.id, Path::new("d.txt"), cx)
1910            })
1911            .unwrap()
1912            .await
1913            .unwrap();
1914        worktree_a.read_with(cx_a, |worktree, _| {
1915            assert_eq!(
1916                worktree
1917                    .paths()
1918                    .map(|p| p.to_string_lossy())
1919                    .collect::<Vec<_>>(),
1920                [".zed.toml", "a.txt", "b.txt", "d.txt"]
1921            );
1922        });
1923        worktree_b.read_with(cx_b, |worktree, _| {
1924            assert_eq!(
1925                worktree
1926                    .paths()
1927                    .map(|p| p.to_string_lossy())
1928                    .collect::<Vec<_>>(),
1929                [".zed.toml", "a.txt", "b.txt", "d.txt"]
1930            );
1931        });
1932
1933        let dir_entry = project_b
1934            .update(cx_b, |project, cx| {
1935                project
1936                    .create_entry((worktree_id, "DIR"), true, cx)
1937                    .unwrap()
1938            })
1939            .await
1940            .unwrap();
1941        worktree_a.read_with(cx_a, |worktree, _| {
1942            assert_eq!(
1943                worktree
1944                    .paths()
1945                    .map(|p| p.to_string_lossy())
1946                    .collect::<Vec<_>>(),
1947                [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
1948            );
1949        });
1950        worktree_b.read_with(cx_b, |worktree, _| {
1951            assert_eq!(
1952                worktree
1953                    .paths()
1954                    .map(|p| p.to_string_lossy())
1955                    .collect::<Vec<_>>(),
1956                [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
1957            );
1958        });
1959
1960        project_b
1961            .update(cx_b, |project, cx| {
1962                project.delete_entry(dir_entry.id, cx).unwrap()
1963            })
1964            .await
1965            .unwrap();
1966        worktree_a.read_with(cx_a, |worktree, _| {
1967            assert_eq!(
1968                worktree
1969                    .paths()
1970                    .map(|p| p.to_string_lossy())
1971                    .collect::<Vec<_>>(),
1972                [".zed.toml", "a.txt", "b.txt", "d.txt"]
1973            );
1974        });
1975        worktree_b.read_with(cx_b, |worktree, _| {
1976            assert_eq!(
1977                worktree
1978                    .paths()
1979                    .map(|p| p.to_string_lossy())
1980                    .collect::<Vec<_>>(),
1981                [".zed.toml", "a.txt", "b.txt", "d.txt"]
1982            );
1983        });
1984
1985        project_b
1986            .update(cx_b, |project, cx| {
1987                project.delete_entry(entry.id, cx).unwrap()
1988            })
1989            .await
1990            .unwrap();
1991        worktree_a.read_with(cx_a, |worktree, _| {
1992            assert_eq!(
1993                worktree
1994                    .paths()
1995                    .map(|p| p.to_string_lossy())
1996                    .collect::<Vec<_>>(),
1997                [".zed.toml", "a.txt", "b.txt"]
1998            );
1999        });
2000        worktree_b.read_with(cx_b, |worktree, _| {
2001            assert_eq!(
2002                worktree
2003                    .paths()
2004                    .map(|p| p.to_string_lossy())
2005                    .collect::<Vec<_>>(),
2006                [".zed.toml", "a.txt", "b.txt"]
2007            );
2008        });
2009    }
2010
2011    #[gpui::test(iterations = 10)]
2012    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2013        cx_a.foreground().forbid_parking();
2014        let lang_registry = Arc::new(LanguageRegistry::test());
2015        let fs = FakeFs::new(cx_a.background());
2016
2017        // Connect to a server as 2 clients.
2018        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2019        let client_a = server.create_client(cx_a, "user_a").await;
2020        let client_b = server.create_client(cx_b, "user_b").await;
2021
2022        // Share a project as client A
2023        fs.insert_tree(
2024            "/dir",
2025            json!({
2026                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2027                "a.txt": "a-contents",
2028            }),
2029        )
2030        .await;
2031
2032        let project_a = cx_a.update(|cx| {
2033            Project::local(
2034                client_a.clone(),
2035                client_a.user_store.clone(),
2036                lang_registry.clone(),
2037                fs.clone(),
2038                cx,
2039            )
2040        });
2041        let (worktree_a, _) = project_a
2042            .update(cx_a, |p, cx| {
2043                p.find_or_create_local_worktree("/dir", true, cx)
2044            })
2045            .await
2046            .unwrap();
2047        worktree_a
2048            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2049            .await;
2050        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2051        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2052        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2053
2054        // Join that project as client B
2055        let project_b = Project::remote(
2056            project_id,
2057            client_b.clone(),
2058            client_b.user_store.clone(),
2059            lang_registry.clone(),
2060            fs.clone(),
2061            &mut cx_b.to_async(),
2062        )
2063        .await
2064        .unwrap();
2065
2066        // Open a buffer as client B
2067        let buffer_b = project_b
2068            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2069            .await
2070            .unwrap();
2071
2072        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2073        buffer_b.read_with(cx_b, |buf, _| {
2074            assert!(buf.is_dirty());
2075            assert!(!buf.has_conflict());
2076        });
2077
2078        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2079        buffer_b
2080            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2081            .await;
2082        buffer_b.read_with(cx_b, |buf, _| {
2083            assert!(!buf.has_conflict());
2084        });
2085
2086        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2087        buffer_b.read_with(cx_b, |buf, _| {
2088            assert!(buf.is_dirty());
2089            assert!(!buf.has_conflict());
2090        });
2091    }
2092
2093    #[gpui::test(iterations = 10)]
2094    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2095        cx_a.foreground().forbid_parking();
2096        let lang_registry = Arc::new(LanguageRegistry::test());
2097        let fs = FakeFs::new(cx_a.background());
2098
2099        // Connect to a server as 2 clients.
2100        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2101        let client_a = server.create_client(cx_a, "user_a").await;
2102        let client_b = server.create_client(cx_b, "user_b").await;
2103
2104        // Share a project as client A
2105        fs.insert_tree(
2106            "/dir",
2107            json!({
2108                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2109                "a.txt": "a-contents",
2110            }),
2111        )
2112        .await;
2113
2114        let project_a = cx_a.update(|cx| {
2115            Project::local(
2116                client_a.clone(),
2117                client_a.user_store.clone(),
2118                lang_registry.clone(),
2119                fs.clone(),
2120                cx,
2121            )
2122        });
2123        let (worktree_a, _) = project_a
2124            .update(cx_a, |p, cx| {
2125                p.find_or_create_local_worktree("/dir", true, cx)
2126            })
2127            .await
2128            .unwrap();
2129        worktree_a
2130            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2131            .await;
2132        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2133        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2134        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2135
2136        // Join that project as client B
2137        let project_b = Project::remote(
2138            project_id,
2139            client_b.clone(),
2140            client_b.user_store.clone(),
2141            lang_registry.clone(),
2142            fs.clone(),
2143            &mut cx_b.to_async(),
2144        )
2145        .await
2146        .unwrap();
2147        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2148
2149        // Open a buffer as client B
2150        let buffer_b = project_b
2151            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2152            .await
2153            .unwrap();
2154        buffer_b.read_with(cx_b, |buf, _| {
2155            assert!(!buf.is_dirty());
2156            assert!(!buf.has_conflict());
2157        });
2158
2159        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2160            .await
2161            .unwrap();
2162        buffer_b
2163            .condition(&cx_b, |buf, _| {
2164                buf.text() == "new contents" && !buf.is_dirty()
2165            })
2166            .await;
2167        buffer_b.read_with(cx_b, |buf, _| {
2168            assert!(!buf.has_conflict());
2169        });
2170    }
2171
2172    #[gpui::test(iterations = 10)]
2173    async fn test_editing_while_guest_opens_buffer(
2174        cx_a: &mut TestAppContext,
2175        cx_b: &mut TestAppContext,
2176    ) {
2177        cx_a.foreground().forbid_parking();
2178        let lang_registry = Arc::new(LanguageRegistry::test());
2179        let fs = FakeFs::new(cx_a.background());
2180
2181        // Connect to a server as 2 clients.
2182        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2183        let client_a = server.create_client(cx_a, "user_a").await;
2184        let client_b = server.create_client(cx_b, "user_b").await;
2185
2186        // Share a project as client A
2187        fs.insert_tree(
2188            "/dir",
2189            json!({
2190                ".zed.toml": r#"collaborators = ["user_b"]"#,
2191                "a.txt": "a-contents",
2192            }),
2193        )
2194        .await;
2195        let project_a = cx_a.update(|cx| {
2196            Project::local(
2197                client_a.clone(),
2198                client_a.user_store.clone(),
2199                lang_registry.clone(),
2200                fs.clone(),
2201                cx,
2202            )
2203        });
2204        let (worktree_a, _) = project_a
2205            .update(cx_a, |p, cx| {
2206                p.find_or_create_local_worktree("/dir", true, cx)
2207            })
2208            .await
2209            .unwrap();
2210        worktree_a
2211            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2212            .await;
2213        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2214        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2215        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2216
2217        // Join that project as client B
2218        let project_b = Project::remote(
2219            project_id,
2220            client_b.clone(),
2221            client_b.user_store.clone(),
2222            lang_registry.clone(),
2223            fs.clone(),
2224            &mut cx_b.to_async(),
2225        )
2226        .await
2227        .unwrap();
2228
2229        // Open a buffer as client A
2230        let buffer_a = project_a
2231            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2232            .await
2233            .unwrap();
2234
2235        // Start opening the same buffer as client B
2236        let buffer_b = cx_b
2237            .background()
2238            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2239
2240        // Edit the buffer as client A while client B is still opening it.
2241        cx_b.background().simulate_random_delay().await;
2242        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2243        cx_b.background().simulate_random_delay().await;
2244        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2245
2246        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2247        let buffer_b = buffer_b.await.unwrap();
2248        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2249    }
2250
2251    #[gpui::test(iterations = 10)]
2252    async fn test_leaving_worktree_while_opening_buffer(
2253        cx_a: &mut TestAppContext,
2254        cx_b: &mut TestAppContext,
2255    ) {
2256        cx_a.foreground().forbid_parking();
2257        let lang_registry = Arc::new(LanguageRegistry::test());
2258        let fs = FakeFs::new(cx_a.background());
2259
2260        // Connect to a server as 2 clients.
2261        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2262        let client_a = server.create_client(cx_a, "user_a").await;
2263        let client_b = server.create_client(cx_b, "user_b").await;
2264
2265        // Share a project as client A
2266        fs.insert_tree(
2267            "/dir",
2268            json!({
2269                ".zed.toml": r#"collaborators = ["user_b"]"#,
2270                "a.txt": "a-contents",
2271            }),
2272        )
2273        .await;
2274        let project_a = cx_a.update(|cx| {
2275            Project::local(
2276                client_a.clone(),
2277                client_a.user_store.clone(),
2278                lang_registry.clone(),
2279                fs.clone(),
2280                cx,
2281            )
2282        });
2283        let (worktree_a, _) = project_a
2284            .update(cx_a, |p, cx| {
2285                p.find_or_create_local_worktree("/dir", true, cx)
2286            })
2287            .await
2288            .unwrap();
2289        worktree_a
2290            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2291            .await;
2292        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2293        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2294        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2295
2296        // Join that project as client B
2297        let project_b = Project::remote(
2298            project_id,
2299            client_b.clone(),
2300            client_b.user_store.clone(),
2301            lang_registry.clone(),
2302            fs.clone(),
2303            &mut cx_b.to_async(),
2304        )
2305        .await
2306        .unwrap();
2307
2308        // See that a guest has joined as client A.
2309        project_a
2310            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2311            .await;
2312
2313        // Begin opening a buffer as client B, but leave the project before the open completes.
2314        let buffer_b = cx_b
2315            .background()
2316            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2317        cx_b.update(|_| drop(project_b));
2318        drop(buffer_b);
2319
2320        // See that the guest has left.
2321        project_a
2322            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2323            .await;
2324    }
2325
2326    #[gpui::test(iterations = 10)]
2327    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2328        cx_a.foreground().forbid_parking();
2329        let lang_registry = Arc::new(LanguageRegistry::test());
2330        let fs = FakeFs::new(cx_a.background());
2331
2332        // Connect to a server as 2 clients.
2333        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2334        let client_a = server.create_client(cx_a, "user_a").await;
2335        let client_b = server.create_client(cx_b, "user_b").await;
2336
2337        // Share a project as client A
2338        fs.insert_tree(
2339            "/a",
2340            json!({
2341                ".zed.toml": r#"collaborators = ["user_b"]"#,
2342                "a.txt": "a-contents",
2343                "b.txt": "b-contents",
2344            }),
2345        )
2346        .await;
2347        let project_a = cx_a.update(|cx| {
2348            Project::local(
2349                client_a.clone(),
2350                client_a.user_store.clone(),
2351                lang_registry.clone(),
2352                fs.clone(),
2353                cx,
2354            )
2355        });
2356        let (worktree_a, _) = project_a
2357            .update(cx_a, |p, cx| {
2358                p.find_or_create_local_worktree("/a", true, cx)
2359            })
2360            .await
2361            .unwrap();
2362        worktree_a
2363            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2364            .await;
2365        let project_id = project_a
2366            .update(cx_a, |project, _| project.next_remote_id())
2367            .await;
2368        project_a
2369            .update(cx_a, |project, cx| project.share(cx))
2370            .await
2371            .unwrap();
2372
2373        // Join that project as client B
2374        let _project_b = Project::remote(
2375            project_id,
2376            client_b.clone(),
2377            client_b.user_store.clone(),
2378            lang_registry.clone(),
2379            fs.clone(),
2380            &mut cx_b.to_async(),
2381        )
2382        .await
2383        .unwrap();
2384
2385        // Client A sees that a guest has joined.
2386        project_a
2387            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2388            .await;
2389
2390        // Drop client B's connection and ensure client A observes client B leaving the project.
2391        client_b.disconnect(&cx_b.to_async()).unwrap();
2392        project_a
2393            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2394            .await;
2395
2396        // Rejoin the project as client B
2397        let _project_b = Project::remote(
2398            project_id,
2399            client_b.clone(),
2400            client_b.user_store.clone(),
2401            lang_registry.clone(),
2402            fs.clone(),
2403            &mut cx_b.to_async(),
2404        )
2405        .await
2406        .unwrap();
2407
2408        // Client A sees that a guest has re-joined.
2409        project_a
2410            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2411            .await;
2412
2413        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2414        client_b.wait_for_current_user(cx_b).await;
2415        server.disconnect_client(client_b.current_user_id(cx_b));
2416        cx_a.foreground().advance_clock(Duration::from_secs(3));
2417        project_a
2418            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2419            .await;
2420    }
2421
2422    #[gpui::test(iterations = 10)]
2423    async fn test_collaborating_with_diagnostics(
2424        cx_a: &mut TestAppContext,
2425        cx_b: &mut TestAppContext,
2426    ) {
2427        cx_a.foreground().forbid_parking();
2428        let lang_registry = Arc::new(LanguageRegistry::test());
2429        let fs = FakeFs::new(cx_a.background());
2430
2431        // Set up a fake language server.
2432        let mut language = Language::new(
2433            LanguageConfig {
2434                name: "Rust".into(),
2435                path_suffixes: vec!["rs".to_string()],
2436                ..Default::default()
2437            },
2438            Some(tree_sitter_rust::language()),
2439        );
2440        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2441        lang_registry.add(Arc::new(language));
2442
2443        // Connect to a server as 2 clients.
2444        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2445        let client_a = server.create_client(cx_a, "user_a").await;
2446        let client_b = server.create_client(cx_b, "user_b").await;
2447
2448        // Share a project as client A
2449        fs.insert_tree(
2450            "/a",
2451            json!({
2452                ".zed.toml": r#"collaborators = ["user_b"]"#,
2453                "a.rs": "let one = two",
2454                "other.rs": "",
2455            }),
2456        )
2457        .await;
2458        let project_a = cx_a.update(|cx| {
2459            Project::local(
2460                client_a.clone(),
2461                client_a.user_store.clone(),
2462                lang_registry.clone(),
2463                fs.clone(),
2464                cx,
2465            )
2466        });
2467        let (worktree_a, _) = project_a
2468            .update(cx_a, |p, cx| {
2469                p.find_or_create_local_worktree("/a", true, cx)
2470            })
2471            .await
2472            .unwrap();
2473        worktree_a
2474            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2475            .await;
2476        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2477        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2478        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2479
2480        // Cause the language server to start.
2481        let _ = cx_a
2482            .background()
2483            .spawn(project_a.update(cx_a, |project, cx| {
2484                project.open_buffer(
2485                    ProjectPath {
2486                        worktree_id,
2487                        path: Path::new("other.rs").into(),
2488                    },
2489                    cx,
2490                )
2491            }))
2492            .await
2493            .unwrap();
2494
2495        // Simulate a language server reporting errors for a file.
2496        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2497        fake_language_server
2498            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2499            .await;
2500        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2501            lsp::PublishDiagnosticsParams {
2502                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2503                version: None,
2504                diagnostics: vec![lsp::Diagnostic {
2505                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2506                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2507                    message: "message 1".to_string(),
2508                    ..Default::default()
2509                }],
2510            },
2511        );
2512
2513        // Wait for server to see the diagnostics update.
2514        server
2515            .condition(|store| {
2516                let worktree = store
2517                    .project(project_id)
2518                    .unwrap()
2519                    .share
2520                    .as_ref()
2521                    .unwrap()
2522                    .worktrees
2523                    .get(&worktree_id.to_proto())
2524                    .unwrap();
2525
2526                !worktree.diagnostic_summaries.is_empty()
2527            })
2528            .await;
2529
2530        // Join the worktree as client B.
2531        let project_b = Project::remote(
2532            project_id,
2533            client_b.clone(),
2534            client_b.user_store.clone(),
2535            lang_registry.clone(),
2536            fs.clone(),
2537            &mut cx_b.to_async(),
2538        )
2539        .await
2540        .unwrap();
2541
2542        project_b.read_with(cx_b, |project, cx| {
2543            assert_eq!(
2544                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2545                &[(
2546                    ProjectPath {
2547                        worktree_id,
2548                        path: Arc::from(Path::new("a.rs")),
2549                    },
2550                    DiagnosticSummary {
2551                        error_count: 1,
2552                        warning_count: 0,
2553                        ..Default::default()
2554                    },
2555                )]
2556            )
2557        });
2558
2559        // Simulate a language server reporting more errors for a file.
2560        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2561            lsp::PublishDiagnosticsParams {
2562                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2563                version: None,
2564                diagnostics: vec![
2565                    lsp::Diagnostic {
2566                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2567                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2568                        message: "message 1".to_string(),
2569                        ..Default::default()
2570                    },
2571                    lsp::Diagnostic {
2572                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2573                        range: lsp::Range::new(
2574                            lsp::Position::new(0, 10),
2575                            lsp::Position::new(0, 13),
2576                        ),
2577                        message: "message 2".to_string(),
2578                        ..Default::default()
2579                    },
2580                ],
2581            },
2582        );
2583
2584        // Client b gets the updated summaries
2585        project_b
2586            .condition(&cx_b, |project, cx| {
2587                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2588                    == &[(
2589                        ProjectPath {
2590                            worktree_id,
2591                            path: Arc::from(Path::new("a.rs")),
2592                        },
2593                        DiagnosticSummary {
2594                            error_count: 1,
2595                            warning_count: 1,
2596                            ..Default::default()
2597                        },
2598                    )]
2599            })
2600            .await;
2601
2602        // Open the file with the errors on client B. They should be present.
2603        let buffer_b = cx_b
2604            .background()
2605            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2606            .await
2607            .unwrap();
2608
2609        buffer_b.read_with(cx_b, |buffer, _| {
2610            assert_eq!(
2611                buffer
2612                    .snapshot()
2613                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2614                    .map(|entry| entry)
2615                    .collect::<Vec<_>>(),
2616                &[
2617                    DiagnosticEntry {
2618                        range: Point::new(0, 4)..Point::new(0, 7),
2619                        diagnostic: Diagnostic {
2620                            group_id: 0,
2621                            message: "message 1".to_string(),
2622                            severity: lsp::DiagnosticSeverity::ERROR,
2623                            is_primary: true,
2624                            ..Default::default()
2625                        }
2626                    },
2627                    DiagnosticEntry {
2628                        range: Point::new(0, 10)..Point::new(0, 13),
2629                        diagnostic: Diagnostic {
2630                            group_id: 1,
2631                            severity: lsp::DiagnosticSeverity::WARNING,
2632                            message: "message 2".to_string(),
2633                            is_primary: true,
2634                            ..Default::default()
2635                        }
2636                    }
2637                ]
2638            );
2639        });
2640
2641        // Simulate a language server reporting no errors for a file.
2642        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2643            lsp::PublishDiagnosticsParams {
2644                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2645                version: None,
2646                diagnostics: vec![],
2647            },
2648        );
2649        project_a
2650            .condition(cx_a, |project, cx| {
2651                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2652            })
2653            .await;
2654        project_b
2655            .condition(cx_b, |project, cx| {
2656                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2657            })
2658            .await;
2659    }
2660
2661    #[gpui::test(iterations = 10)]
2662    async fn test_collaborating_with_completion(
2663        cx_a: &mut TestAppContext,
2664        cx_b: &mut TestAppContext,
2665    ) {
2666        cx_a.foreground().forbid_parking();
2667        let lang_registry = Arc::new(LanguageRegistry::test());
2668        let fs = FakeFs::new(cx_a.background());
2669
2670        // Set up a fake language server.
2671        let mut language = Language::new(
2672            LanguageConfig {
2673                name: "Rust".into(),
2674                path_suffixes: vec!["rs".to_string()],
2675                ..Default::default()
2676            },
2677            Some(tree_sitter_rust::language()),
2678        );
2679        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2680            capabilities: lsp::ServerCapabilities {
2681                completion_provider: Some(lsp::CompletionOptions {
2682                    trigger_characters: Some(vec![".".to_string()]),
2683                    ..Default::default()
2684                }),
2685                ..Default::default()
2686            },
2687            ..Default::default()
2688        });
2689        lang_registry.add(Arc::new(language));
2690
2691        // Connect to a server as 2 clients.
2692        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2693        let client_a = server.create_client(cx_a, "user_a").await;
2694        let client_b = server.create_client(cx_b, "user_b").await;
2695
2696        // Share a project as client A
2697        fs.insert_tree(
2698            "/a",
2699            json!({
2700                ".zed.toml": r#"collaborators = ["user_b"]"#,
2701                "main.rs": "fn main() { a }",
2702                "other.rs": "",
2703            }),
2704        )
2705        .await;
2706        let project_a = cx_a.update(|cx| {
2707            Project::local(
2708                client_a.clone(),
2709                client_a.user_store.clone(),
2710                lang_registry.clone(),
2711                fs.clone(),
2712                cx,
2713            )
2714        });
2715        let (worktree_a, _) = project_a
2716            .update(cx_a, |p, cx| {
2717                p.find_or_create_local_worktree("/a", true, cx)
2718            })
2719            .await
2720            .unwrap();
2721        worktree_a
2722            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2723            .await;
2724        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2725        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2726        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2727
2728        // Join the worktree as client B.
2729        let project_b = Project::remote(
2730            project_id,
2731            client_b.clone(),
2732            client_b.user_store.clone(),
2733            lang_registry.clone(),
2734            fs.clone(),
2735            &mut cx_b.to_async(),
2736        )
2737        .await
2738        .unwrap();
2739
2740        // Open a file in an editor as the guest.
2741        let buffer_b = project_b
2742            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2743            .await
2744            .unwrap();
2745        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2746        let editor_b = cx_b.add_view(window_b, |cx| {
2747            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2748        });
2749
2750        let fake_language_server = fake_language_servers.next().await.unwrap();
2751        buffer_b
2752            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2753            .await;
2754
2755        // Type a completion trigger character as the guest.
2756        editor_b.update(cx_b, |editor, cx| {
2757            editor.select_ranges([13..13], None, cx);
2758            editor.handle_input(&Input(".".into()), cx);
2759            cx.focus(&editor_b);
2760        });
2761
2762        // Receive a completion request as the host's language server.
2763        // Return some completions from the host's language server.
2764        cx_a.foreground().start_waiting();
2765        fake_language_server
2766            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2767                assert_eq!(
2768                    params.text_document_position.text_document.uri,
2769                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2770                );
2771                assert_eq!(
2772                    params.text_document_position.position,
2773                    lsp::Position::new(0, 14),
2774                );
2775
2776                Ok(Some(lsp::CompletionResponse::Array(vec![
2777                    lsp::CompletionItem {
2778                        label: "first_method(…)".into(),
2779                        detail: Some("fn(&mut self, B) -> C".into()),
2780                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2781                            new_text: "first_method($1)".to_string(),
2782                            range: lsp::Range::new(
2783                                lsp::Position::new(0, 14),
2784                                lsp::Position::new(0, 14),
2785                            ),
2786                        })),
2787                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2788                        ..Default::default()
2789                    },
2790                    lsp::CompletionItem {
2791                        label: "second_method(…)".into(),
2792                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2793                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2794                            new_text: "second_method()".to_string(),
2795                            range: lsp::Range::new(
2796                                lsp::Position::new(0, 14),
2797                                lsp::Position::new(0, 14),
2798                            ),
2799                        })),
2800                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2801                        ..Default::default()
2802                    },
2803                ])))
2804            })
2805            .next()
2806            .await
2807            .unwrap();
2808        cx_a.foreground().finish_waiting();
2809
2810        // Open the buffer on the host.
2811        let buffer_a = project_a
2812            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2813            .await
2814            .unwrap();
2815        buffer_a
2816            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2817            .await;
2818
2819        // Confirm a completion on the guest.
2820        editor_b
2821            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2822            .await;
2823        editor_b.update(cx_b, |editor, cx| {
2824            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2825            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2826        });
2827
2828        // Return a resolved completion from the host's language server.
2829        // The resolved completion has an additional text edit.
2830        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2831            |params, _| async move {
2832                assert_eq!(params.label, "first_method(…)");
2833                Ok(lsp::CompletionItem {
2834                    label: "first_method(…)".into(),
2835                    detail: Some("fn(&mut self, B) -> C".into()),
2836                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2837                        new_text: "first_method($1)".to_string(),
2838                        range: lsp::Range::new(
2839                            lsp::Position::new(0, 14),
2840                            lsp::Position::new(0, 14),
2841                        ),
2842                    })),
2843                    additional_text_edits: Some(vec![lsp::TextEdit {
2844                        new_text: "use d::SomeTrait;\n".to_string(),
2845                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2846                    }]),
2847                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2848                    ..Default::default()
2849                })
2850            },
2851        );
2852
2853        // The additional edit is applied.
2854        buffer_a
2855            .condition(&cx_a, |buffer, _| {
2856                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2857            })
2858            .await;
2859        buffer_b
2860            .condition(&cx_b, |buffer, _| {
2861                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2862            })
2863            .await;
2864    }
2865
2866    #[gpui::test(iterations = 10)]
2867    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2868        cx_a.foreground().forbid_parking();
2869        let lang_registry = Arc::new(LanguageRegistry::test());
2870        let fs = FakeFs::new(cx_a.background());
2871
2872        // Connect to a server as 2 clients.
2873        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2874        let client_a = server.create_client(cx_a, "user_a").await;
2875        let client_b = server.create_client(cx_b, "user_b").await;
2876
2877        // Share a project as client A
2878        fs.insert_tree(
2879            "/a",
2880            json!({
2881                ".zed.toml": r#"collaborators = ["user_b"]"#,
2882                "a.rs": "let one = 1;",
2883            }),
2884        )
2885        .await;
2886        let project_a = cx_a.update(|cx| {
2887            Project::local(
2888                client_a.clone(),
2889                client_a.user_store.clone(),
2890                lang_registry.clone(),
2891                fs.clone(),
2892                cx,
2893            )
2894        });
2895        let (worktree_a, _) = project_a
2896            .update(cx_a, |p, cx| {
2897                p.find_or_create_local_worktree("/a", true, cx)
2898            })
2899            .await
2900            .unwrap();
2901        worktree_a
2902            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2903            .await;
2904        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2905        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2906        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2907        let buffer_a = project_a
2908            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2909            .await
2910            .unwrap();
2911
2912        // Join the worktree as client B.
2913        let project_b = Project::remote(
2914            project_id,
2915            client_b.clone(),
2916            client_b.user_store.clone(),
2917            lang_registry.clone(),
2918            fs.clone(),
2919            &mut cx_b.to_async(),
2920        )
2921        .await
2922        .unwrap();
2923
2924        let buffer_b = cx_b
2925            .background()
2926            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2927            .await
2928            .unwrap();
2929        buffer_b.update(cx_b, |buffer, cx| {
2930            buffer.edit([(4..7, "six")], cx);
2931            buffer.edit([(10..11, "6")], cx);
2932            assert_eq!(buffer.text(), "let six = 6;");
2933            assert!(buffer.is_dirty());
2934            assert!(!buffer.has_conflict());
2935        });
2936        buffer_a
2937            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
2938            .await;
2939
2940        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
2941            .await
2942            .unwrap();
2943        buffer_a
2944            .condition(cx_a, |buffer, _| buffer.has_conflict())
2945            .await;
2946        buffer_b
2947            .condition(cx_b, |buffer, _| buffer.has_conflict())
2948            .await;
2949
2950        project_b
2951            .update(cx_b, |project, cx| {
2952                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
2953            })
2954            .await
2955            .unwrap();
2956        buffer_a.read_with(cx_a, |buffer, _| {
2957            assert_eq!(buffer.text(), "let seven = 7;");
2958            assert!(!buffer.is_dirty());
2959            assert!(!buffer.has_conflict());
2960        });
2961        buffer_b.read_with(cx_b, |buffer, _| {
2962            assert_eq!(buffer.text(), "let seven = 7;");
2963            assert!(!buffer.is_dirty());
2964            assert!(!buffer.has_conflict());
2965        });
2966
2967        buffer_a.update(cx_a, |buffer, cx| {
2968            // Undoing on the host is a no-op when the reload was initiated by the guest.
2969            buffer.undo(cx);
2970            assert_eq!(buffer.text(), "let seven = 7;");
2971            assert!(!buffer.is_dirty());
2972            assert!(!buffer.has_conflict());
2973        });
2974        buffer_b.update(cx_b, |buffer, cx| {
2975            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
2976            buffer.undo(cx);
2977            assert_eq!(buffer.text(), "let six = 6;");
2978            assert!(buffer.is_dirty());
2979            assert!(!buffer.has_conflict());
2980        });
2981    }
2982
2983    #[gpui::test(iterations = 10)]
2984    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2985        cx_a.foreground().forbid_parking();
2986        let lang_registry = Arc::new(LanguageRegistry::test());
2987        let fs = FakeFs::new(cx_a.background());
2988
2989        // Set up a fake language server.
2990        let mut language = Language::new(
2991            LanguageConfig {
2992                name: "Rust".into(),
2993                path_suffixes: vec!["rs".to_string()],
2994                ..Default::default()
2995            },
2996            Some(tree_sitter_rust::language()),
2997        );
2998        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2999        lang_registry.add(Arc::new(language));
3000
3001        // Connect to a server as 2 clients.
3002        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3003        let client_a = server.create_client(cx_a, "user_a").await;
3004        let client_b = server.create_client(cx_b, "user_b").await;
3005
3006        // Share a project as client A
3007        fs.insert_tree(
3008            "/a",
3009            json!({
3010                ".zed.toml": r#"collaborators = ["user_b"]"#,
3011                "a.rs": "let one = two",
3012            }),
3013        )
3014        .await;
3015        let project_a = cx_a.update(|cx| {
3016            Project::local(
3017                client_a.clone(),
3018                client_a.user_store.clone(),
3019                lang_registry.clone(),
3020                fs.clone(),
3021                cx,
3022            )
3023        });
3024        let (worktree_a, _) = project_a
3025            .update(cx_a, |p, cx| {
3026                p.find_or_create_local_worktree("/a", true, cx)
3027            })
3028            .await
3029            .unwrap();
3030        worktree_a
3031            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3032            .await;
3033        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3034        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3035        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3036
3037        // Join the worktree as client B.
3038        let project_b = Project::remote(
3039            project_id,
3040            client_b.clone(),
3041            client_b.user_store.clone(),
3042            lang_registry.clone(),
3043            fs.clone(),
3044            &mut cx_b.to_async(),
3045        )
3046        .await
3047        .unwrap();
3048
3049        let buffer_b = cx_b
3050            .background()
3051            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3052            .await
3053            .unwrap();
3054
3055        let fake_language_server = fake_language_servers.next().await.unwrap();
3056        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3057            Ok(Some(vec![
3058                lsp::TextEdit {
3059                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3060                    new_text: "h".to_string(),
3061                },
3062                lsp::TextEdit {
3063                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3064                    new_text: "y".to_string(),
3065                },
3066            ]))
3067        });
3068
3069        project_b
3070            .update(cx_b, |project, cx| {
3071                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3072            })
3073            .await
3074            .unwrap();
3075        assert_eq!(
3076            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3077            "let honey = two"
3078        );
3079    }
3080
3081    #[gpui::test(iterations = 10)]
3082    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3083        cx_a.foreground().forbid_parking();
3084        let lang_registry = Arc::new(LanguageRegistry::test());
3085        let fs = FakeFs::new(cx_a.background());
3086        fs.insert_tree(
3087            "/root-1",
3088            json!({
3089                ".zed.toml": r#"collaborators = ["user_b"]"#,
3090                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3091            }),
3092        )
3093        .await;
3094        fs.insert_tree(
3095            "/root-2",
3096            json!({
3097                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3098            }),
3099        )
3100        .await;
3101
3102        // Set up a fake language server.
3103        let mut language = Language::new(
3104            LanguageConfig {
3105                name: "Rust".into(),
3106                path_suffixes: vec!["rs".to_string()],
3107                ..Default::default()
3108            },
3109            Some(tree_sitter_rust::language()),
3110        );
3111        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3112        lang_registry.add(Arc::new(language));
3113
3114        // Connect to a server as 2 clients.
3115        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3116        let client_a = server.create_client(cx_a, "user_a").await;
3117        let client_b = server.create_client(cx_b, "user_b").await;
3118
3119        // Share a project as client A
3120        let project_a = cx_a.update(|cx| {
3121            Project::local(
3122                client_a.clone(),
3123                client_a.user_store.clone(),
3124                lang_registry.clone(),
3125                fs.clone(),
3126                cx,
3127            )
3128        });
3129        let (worktree_a, _) = project_a
3130            .update(cx_a, |p, cx| {
3131                p.find_or_create_local_worktree("/root-1", true, cx)
3132            })
3133            .await
3134            .unwrap();
3135        worktree_a
3136            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3137            .await;
3138        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3139        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3140        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3141
3142        // Join the worktree as client B.
3143        let project_b = Project::remote(
3144            project_id,
3145            client_b.clone(),
3146            client_b.user_store.clone(),
3147            lang_registry.clone(),
3148            fs.clone(),
3149            &mut cx_b.to_async(),
3150        )
3151        .await
3152        .unwrap();
3153
3154        // Open the file on client B.
3155        let buffer_b = cx_b
3156            .background()
3157            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3158            .await
3159            .unwrap();
3160
3161        // Request the definition of a symbol as the guest.
3162        let fake_language_server = fake_language_servers.next().await.unwrap();
3163        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3164            |_, _| async move {
3165                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3166                    lsp::Location::new(
3167                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3168                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3169                    ),
3170                )))
3171            },
3172        );
3173
3174        let definitions_1 = project_b
3175            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3176            .await
3177            .unwrap();
3178        cx_b.read(|cx| {
3179            assert_eq!(definitions_1.len(), 1);
3180            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3181            let target_buffer = definitions_1[0].buffer.read(cx);
3182            assert_eq!(
3183                target_buffer.text(),
3184                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3185            );
3186            assert_eq!(
3187                definitions_1[0].range.to_point(target_buffer),
3188                Point::new(0, 6)..Point::new(0, 9)
3189            );
3190        });
3191
3192        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3193        // the previous call to `definition`.
3194        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3195            |_, _| async move {
3196                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3197                    lsp::Location::new(
3198                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3199                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3200                    ),
3201                )))
3202            },
3203        );
3204
3205        let definitions_2 = project_b
3206            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3207            .await
3208            .unwrap();
3209        cx_b.read(|cx| {
3210            assert_eq!(definitions_2.len(), 1);
3211            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3212            let target_buffer = definitions_2[0].buffer.read(cx);
3213            assert_eq!(
3214                target_buffer.text(),
3215                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3216            );
3217            assert_eq!(
3218                definitions_2[0].range.to_point(target_buffer),
3219                Point::new(1, 6)..Point::new(1, 11)
3220            );
3221        });
3222        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3223    }
3224
3225    #[gpui::test(iterations = 10)]
3226    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3227        cx_a.foreground().forbid_parking();
3228        let lang_registry = Arc::new(LanguageRegistry::test());
3229        let fs = FakeFs::new(cx_a.background());
3230        fs.insert_tree(
3231            "/root-1",
3232            json!({
3233                ".zed.toml": r#"collaborators = ["user_b"]"#,
3234                "one.rs": "const ONE: usize = 1;",
3235                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3236            }),
3237        )
3238        .await;
3239        fs.insert_tree(
3240            "/root-2",
3241            json!({
3242                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3243            }),
3244        )
3245        .await;
3246
3247        // Set up a fake language server.
3248        let mut language = Language::new(
3249            LanguageConfig {
3250                name: "Rust".into(),
3251                path_suffixes: vec!["rs".to_string()],
3252                ..Default::default()
3253            },
3254            Some(tree_sitter_rust::language()),
3255        );
3256        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3257        lang_registry.add(Arc::new(language));
3258
3259        // Connect to a server as 2 clients.
3260        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3261        let client_a = server.create_client(cx_a, "user_a").await;
3262        let client_b = server.create_client(cx_b, "user_b").await;
3263
3264        // Share a project as client A
3265        let project_a = cx_a.update(|cx| {
3266            Project::local(
3267                client_a.clone(),
3268                client_a.user_store.clone(),
3269                lang_registry.clone(),
3270                fs.clone(),
3271                cx,
3272            )
3273        });
3274        let (worktree_a, _) = project_a
3275            .update(cx_a, |p, cx| {
3276                p.find_or_create_local_worktree("/root-1", true, cx)
3277            })
3278            .await
3279            .unwrap();
3280        worktree_a
3281            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3282            .await;
3283        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3284        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3285        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3286
3287        // Join the worktree as client B.
3288        let project_b = Project::remote(
3289            project_id,
3290            client_b.clone(),
3291            client_b.user_store.clone(),
3292            lang_registry.clone(),
3293            fs.clone(),
3294            &mut cx_b.to_async(),
3295        )
3296        .await
3297        .unwrap();
3298
3299        // Open the file on client B.
3300        let buffer_b = cx_b
3301            .background()
3302            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3303            .await
3304            .unwrap();
3305
3306        // Request references to a symbol as the guest.
3307        let fake_language_server = fake_language_servers.next().await.unwrap();
3308        fake_language_server.handle_request::<lsp::request::References, _, _>(
3309            |params, _| async move {
3310                assert_eq!(
3311                    params.text_document_position.text_document.uri.as_str(),
3312                    "file:///root-1/one.rs"
3313                );
3314                Ok(Some(vec![
3315                    lsp::Location {
3316                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3317                        range: lsp::Range::new(
3318                            lsp::Position::new(0, 24),
3319                            lsp::Position::new(0, 27),
3320                        ),
3321                    },
3322                    lsp::Location {
3323                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3324                        range: lsp::Range::new(
3325                            lsp::Position::new(0, 35),
3326                            lsp::Position::new(0, 38),
3327                        ),
3328                    },
3329                    lsp::Location {
3330                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3331                        range: lsp::Range::new(
3332                            lsp::Position::new(0, 37),
3333                            lsp::Position::new(0, 40),
3334                        ),
3335                    },
3336                ]))
3337            },
3338        );
3339
3340        let references = project_b
3341            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3342            .await
3343            .unwrap();
3344        cx_b.read(|cx| {
3345            assert_eq!(references.len(), 3);
3346            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3347
3348            let two_buffer = references[0].buffer.read(cx);
3349            let three_buffer = references[2].buffer.read(cx);
3350            assert_eq!(
3351                two_buffer.file().unwrap().path().as_ref(),
3352                Path::new("two.rs")
3353            );
3354            assert_eq!(references[1].buffer, references[0].buffer);
3355            assert_eq!(
3356                three_buffer.file().unwrap().full_path(cx),
3357                Path::new("three.rs")
3358            );
3359
3360            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3361            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3362            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3363        });
3364    }
3365
3366    #[gpui::test(iterations = 10)]
3367    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3368        cx_a.foreground().forbid_parking();
3369        let lang_registry = Arc::new(LanguageRegistry::test());
3370        let fs = FakeFs::new(cx_a.background());
3371        fs.insert_tree(
3372            "/root-1",
3373            json!({
3374                ".zed.toml": r#"collaborators = ["user_b"]"#,
3375                "a": "hello world",
3376                "b": "goodnight moon",
3377                "c": "a world of goo",
3378                "d": "world champion of clown world",
3379            }),
3380        )
3381        .await;
3382        fs.insert_tree(
3383            "/root-2",
3384            json!({
3385                "e": "disney world is fun",
3386            }),
3387        )
3388        .await;
3389
3390        // Connect to a server as 2 clients.
3391        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3392        let client_a = server.create_client(cx_a, "user_a").await;
3393        let client_b = server.create_client(cx_b, "user_b").await;
3394
3395        // Share a project as client A
3396        let project_a = cx_a.update(|cx| {
3397            Project::local(
3398                client_a.clone(),
3399                client_a.user_store.clone(),
3400                lang_registry.clone(),
3401                fs.clone(),
3402                cx,
3403            )
3404        });
3405        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3406
3407        let (worktree_1, _) = project_a
3408            .update(cx_a, |p, cx| {
3409                p.find_or_create_local_worktree("/root-1", true, cx)
3410            })
3411            .await
3412            .unwrap();
3413        worktree_1
3414            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3415            .await;
3416        let (worktree_2, _) = project_a
3417            .update(cx_a, |p, cx| {
3418                p.find_or_create_local_worktree("/root-2", true, cx)
3419            })
3420            .await
3421            .unwrap();
3422        worktree_2
3423            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3424            .await;
3425
3426        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3427
3428        // Join the worktree as client B.
3429        let project_b = Project::remote(
3430            project_id,
3431            client_b.clone(),
3432            client_b.user_store.clone(),
3433            lang_registry.clone(),
3434            fs.clone(),
3435            &mut cx_b.to_async(),
3436        )
3437        .await
3438        .unwrap();
3439
3440        let results = project_b
3441            .update(cx_b, |project, cx| {
3442                project.search(SearchQuery::text("world", false, false), cx)
3443            })
3444            .await
3445            .unwrap();
3446
3447        let mut ranges_by_path = results
3448            .into_iter()
3449            .map(|(buffer, ranges)| {
3450                buffer.read_with(cx_b, |buffer, cx| {
3451                    let path = buffer.file().unwrap().full_path(cx);
3452                    let offset_ranges = ranges
3453                        .into_iter()
3454                        .map(|range| range.to_offset(buffer))
3455                        .collect::<Vec<_>>();
3456                    (path, offset_ranges)
3457                })
3458            })
3459            .collect::<Vec<_>>();
3460        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3461
3462        assert_eq!(
3463            ranges_by_path,
3464            &[
3465                (PathBuf::from("root-1/a"), vec![6..11]),
3466                (PathBuf::from("root-1/c"), vec![2..7]),
3467                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3468                (PathBuf::from("root-2/e"), vec![7..12]),
3469            ]
3470        );
3471    }
3472
3473    #[gpui::test(iterations = 10)]
3474    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3475        cx_a.foreground().forbid_parking();
3476        let lang_registry = Arc::new(LanguageRegistry::test());
3477        let fs = FakeFs::new(cx_a.background());
3478        fs.insert_tree(
3479            "/root-1",
3480            json!({
3481                ".zed.toml": r#"collaborators = ["user_b"]"#,
3482                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3483            }),
3484        )
3485        .await;
3486
3487        // Set up a fake language server.
3488        let mut language = Language::new(
3489            LanguageConfig {
3490                name: "Rust".into(),
3491                path_suffixes: vec!["rs".to_string()],
3492                ..Default::default()
3493            },
3494            Some(tree_sitter_rust::language()),
3495        );
3496        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3497        lang_registry.add(Arc::new(language));
3498
3499        // Connect to a server as 2 clients.
3500        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3501        let client_a = server.create_client(cx_a, "user_a").await;
3502        let client_b = server.create_client(cx_b, "user_b").await;
3503
3504        // Share a project as client A
3505        let project_a = cx_a.update(|cx| {
3506            Project::local(
3507                client_a.clone(),
3508                client_a.user_store.clone(),
3509                lang_registry.clone(),
3510                fs.clone(),
3511                cx,
3512            )
3513        });
3514        let (worktree_a, _) = project_a
3515            .update(cx_a, |p, cx| {
3516                p.find_or_create_local_worktree("/root-1", true, cx)
3517            })
3518            .await
3519            .unwrap();
3520        worktree_a
3521            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3522            .await;
3523        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3524        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3525        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3526
3527        // Join the worktree as client B.
3528        let project_b = Project::remote(
3529            project_id,
3530            client_b.clone(),
3531            client_b.user_store.clone(),
3532            lang_registry.clone(),
3533            fs.clone(),
3534            &mut cx_b.to_async(),
3535        )
3536        .await
3537        .unwrap();
3538
3539        // Open the file on client B.
3540        let buffer_b = cx_b
3541            .background()
3542            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3543            .await
3544            .unwrap();
3545
3546        // Request document highlights as the guest.
3547        let fake_language_server = fake_language_servers.next().await.unwrap();
3548        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3549            |params, _| async move {
3550                assert_eq!(
3551                    params
3552                        .text_document_position_params
3553                        .text_document
3554                        .uri
3555                        .as_str(),
3556                    "file:///root-1/main.rs"
3557                );
3558                assert_eq!(
3559                    params.text_document_position_params.position,
3560                    lsp::Position::new(0, 34)
3561                );
3562                Ok(Some(vec![
3563                    lsp::DocumentHighlight {
3564                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3565                        range: lsp::Range::new(
3566                            lsp::Position::new(0, 10),
3567                            lsp::Position::new(0, 16),
3568                        ),
3569                    },
3570                    lsp::DocumentHighlight {
3571                        kind: Some(lsp::DocumentHighlightKind::READ),
3572                        range: lsp::Range::new(
3573                            lsp::Position::new(0, 32),
3574                            lsp::Position::new(0, 38),
3575                        ),
3576                    },
3577                    lsp::DocumentHighlight {
3578                        kind: Some(lsp::DocumentHighlightKind::READ),
3579                        range: lsp::Range::new(
3580                            lsp::Position::new(0, 41),
3581                            lsp::Position::new(0, 47),
3582                        ),
3583                    },
3584                ]))
3585            },
3586        );
3587
3588        let highlights = project_b
3589            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3590            .await
3591            .unwrap();
3592        buffer_b.read_with(cx_b, |buffer, _| {
3593            let snapshot = buffer.snapshot();
3594
3595            let highlights = highlights
3596                .into_iter()
3597                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3598                .collect::<Vec<_>>();
3599            assert_eq!(
3600                highlights,
3601                &[
3602                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3603                    (lsp::DocumentHighlightKind::READ, 32..38),
3604                    (lsp::DocumentHighlightKind::READ, 41..47)
3605                ]
3606            )
3607        });
3608    }
3609
3610    #[gpui::test(iterations = 10)]
3611    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3612        cx_a.foreground().forbid_parking();
3613        let lang_registry = Arc::new(LanguageRegistry::test());
3614        let fs = FakeFs::new(cx_a.background());
3615        fs.insert_tree(
3616            "/code",
3617            json!({
3618                "crate-1": {
3619                    ".zed.toml": r#"collaborators = ["user_b"]"#,
3620                    "one.rs": "const ONE: usize = 1;",
3621                },
3622                "crate-2": {
3623                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3624                },
3625                "private": {
3626                    "passwords.txt": "the-password",
3627                }
3628            }),
3629        )
3630        .await;
3631
3632        // Set up a fake language server.
3633        let mut language = Language::new(
3634            LanguageConfig {
3635                name: "Rust".into(),
3636                path_suffixes: vec!["rs".to_string()],
3637                ..Default::default()
3638            },
3639            Some(tree_sitter_rust::language()),
3640        );
3641        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3642        lang_registry.add(Arc::new(language));
3643
3644        // Connect to a server as 2 clients.
3645        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3646        let client_a = server.create_client(cx_a, "user_a").await;
3647        let client_b = server.create_client(cx_b, "user_b").await;
3648
3649        // Share a project as client A
3650        let project_a = cx_a.update(|cx| {
3651            Project::local(
3652                client_a.clone(),
3653                client_a.user_store.clone(),
3654                lang_registry.clone(),
3655                fs.clone(),
3656                cx,
3657            )
3658        });
3659        let (worktree_a, _) = project_a
3660            .update(cx_a, |p, cx| {
3661                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3662            })
3663            .await
3664            .unwrap();
3665        worktree_a
3666            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3667            .await;
3668        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3669        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3670        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3671
3672        // Join the worktree as client B.
3673        let project_b = Project::remote(
3674            project_id,
3675            client_b.clone(),
3676            client_b.user_store.clone(),
3677            lang_registry.clone(),
3678            fs.clone(),
3679            &mut cx_b.to_async(),
3680        )
3681        .await
3682        .unwrap();
3683
3684        // Cause the language server to start.
3685        let _buffer = cx_b
3686            .background()
3687            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3688            .await
3689            .unwrap();
3690
3691        let fake_language_server = fake_language_servers.next().await.unwrap();
3692        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3693            |_, _| async move {
3694                #[allow(deprecated)]
3695                Ok(Some(vec![lsp::SymbolInformation {
3696                    name: "TWO".into(),
3697                    location: lsp::Location {
3698                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3699                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3700                    },
3701                    kind: lsp::SymbolKind::CONSTANT,
3702                    tags: None,
3703                    container_name: None,
3704                    deprecated: None,
3705                }]))
3706            },
3707        );
3708
3709        // Request the definition of a symbol as the guest.
3710        let symbols = project_b
3711            .update(cx_b, |p, cx| p.symbols("two", cx))
3712            .await
3713            .unwrap();
3714        assert_eq!(symbols.len(), 1);
3715        assert_eq!(symbols[0].name, "TWO");
3716
3717        // Open one of the returned symbols.
3718        let buffer_b_2 = project_b
3719            .update(cx_b, |project, cx| {
3720                project.open_buffer_for_symbol(&symbols[0], cx)
3721            })
3722            .await
3723            .unwrap();
3724        buffer_b_2.read_with(cx_b, |buffer, _| {
3725            assert_eq!(
3726                buffer.file().unwrap().path().as_ref(),
3727                Path::new("../crate-2/two.rs")
3728            );
3729        });
3730
3731        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3732        let mut fake_symbol = symbols[0].clone();
3733        fake_symbol.path = Path::new("/code/secrets").into();
3734        let error = project_b
3735            .update(cx_b, |project, cx| {
3736                project.open_buffer_for_symbol(&fake_symbol, cx)
3737            })
3738            .await
3739            .unwrap_err();
3740        assert!(error.to_string().contains("invalid symbol signature"));
3741    }
3742
3743    #[gpui::test(iterations = 10)]
3744    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3745        cx_a: &mut TestAppContext,
3746        cx_b: &mut TestAppContext,
3747        mut rng: StdRng,
3748    ) {
3749        cx_a.foreground().forbid_parking();
3750        let lang_registry = Arc::new(LanguageRegistry::test());
3751        let fs = FakeFs::new(cx_a.background());
3752        fs.insert_tree(
3753            "/root",
3754            json!({
3755                ".zed.toml": r#"collaborators = ["user_b"]"#,
3756                "a.rs": "const ONE: usize = b::TWO;",
3757                "b.rs": "const TWO: usize = 2",
3758            }),
3759        )
3760        .await;
3761
3762        // Set up a fake language server.
3763        let mut language = Language::new(
3764            LanguageConfig {
3765                name: "Rust".into(),
3766                path_suffixes: vec!["rs".to_string()],
3767                ..Default::default()
3768            },
3769            Some(tree_sitter_rust::language()),
3770        );
3771        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3772        lang_registry.add(Arc::new(language));
3773
3774        // Connect to a server as 2 clients.
3775        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3776        let client_a = server.create_client(cx_a, "user_a").await;
3777        let client_b = server.create_client(cx_b, "user_b").await;
3778
3779        // Share a project as client A
3780        let project_a = cx_a.update(|cx| {
3781            Project::local(
3782                client_a.clone(),
3783                client_a.user_store.clone(),
3784                lang_registry.clone(),
3785                fs.clone(),
3786                cx,
3787            )
3788        });
3789
3790        let (worktree_a, _) = project_a
3791            .update(cx_a, |p, cx| {
3792                p.find_or_create_local_worktree("/root", true, cx)
3793            })
3794            .await
3795            .unwrap();
3796        worktree_a
3797            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3798            .await;
3799        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3800        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3801        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3802
3803        // Join the worktree as client B.
3804        let project_b = Project::remote(
3805            project_id,
3806            client_b.clone(),
3807            client_b.user_store.clone(),
3808            lang_registry.clone(),
3809            fs.clone(),
3810            &mut cx_b.to_async(),
3811        )
3812        .await
3813        .unwrap();
3814
3815        let buffer_b1 = cx_b
3816            .background()
3817            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3818            .await
3819            .unwrap();
3820
3821        let fake_language_server = fake_language_servers.next().await.unwrap();
3822        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3823            |_, _| async move {
3824                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3825                    lsp::Location::new(
3826                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
3827                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3828                    ),
3829                )))
3830            },
3831        );
3832
3833        let definitions;
3834        let buffer_b2;
3835        if rng.gen() {
3836            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3837            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3838        } else {
3839            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3840            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3841        }
3842
3843        let buffer_b2 = buffer_b2.await.unwrap();
3844        let definitions = definitions.await.unwrap();
3845        assert_eq!(definitions.len(), 1);
3846        assert_eq!(definitions[0].buffer, buffer_b2);
3847    }
3848
3849    #[gpui::test(iterations = 10)]
3850    async fn test_collaborating_with_code_actions(
3851        cx_a: &mut TestAppContext,
3852        cx_b: &mut TestAppContext,
3853    ) {
3854        cx_a.foreground().forbid_parking();
3855        let lang_registry = Arc::new(LanguageRegistry::test());
3856        let fs = FakeFs::new(cx_a.background());
3857        cx_b.update(|cx| editor::init(cx));
3858
3859        // Set up a fake language server.
3860        let mut language = Language::new(
3861            LanguageConfig {
3862                name: "Rust".into(),
3863                path_suffixes: vec!["rs".to_string()],
3864                ..Default::default()
3865            },
3866            Some(tree_sitter_rust::language()),
3867        );
3868        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3869        lang_registry.add(Arc::new(language));
3870
3871        // Connect to a server as 2 clients.
3872        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3873        let client_a = server.create_client(cx_a, "user_a").await;
3874        let client_b = server.create_client(cx_b, "user_b").await;
3875
3876        // Share a project as client A
3877        fs.insert_tree(
3878            "/a",
3879            json!({
3880                ".zed.toml": r#"collaborators = ["user_b"]"#,
3881                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3882                "other.rs": "pub fn foo() -> usize { 4 }",
3883            }),
3884        )
3885        .await;
3886        let project_a = cx_a.update(|cx| {
3887            Project::local(
3888                client_a.clone(),
3889                client_a.user_store.clone(),
3890                lang_registry.clone(),
3891                fs.clone(),
3892                cx,
3893            )
3894        });
3895        let (worktree_a, _) = project_a
3896            .update(cx_a, |p, cx| {
3897                p.find_or_create_local_worktree("/a", true, cx)
3898            })
3899            .await
3900            .unwrap();
3901        worktree_a
3902            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3903            .await;
3904        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3905        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3906        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3907
3908        // Join the worktree as client B.
3909        let project_b = Project::remote(
3910            project_id,
3911            client_b.clone(),
3912            client_b.user_store.clone(),
3913            lang_registry.clone(),
3914            fs.clone(),
3915            &mut cx_b.to_async(),
3916        )
3917        .await
3918        .unwrap();
3919        let mut params = cx_b.update(WorkspaceParams::test);
3920        params.languages = lang_registry.clone();
3921        params.client = client_b.client.clone();
3922        params.user_store = client_b.user_store.clone();
3923        params.project = project_b;
3924
3925        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3926        let editor_b = workspace_b
3927            .update(cx_b, |workspace, cx| {
3928                workspace.open_path((worktree_id, "main.rs"), true, cx)
3929            })
3930            .await
3931            .unwrap()
3932            .downcast::<Editor>()
3933            .unwrap();
3934
3935        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3936        fake_language_server
3937            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3938                assert_eq!(
3939                    params.text_document.uri,
3940                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3941                );
3942                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3943                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3944                Ok(None)
3945            })
3946            .next()
3947            .await;
3948
3949        // Move cursor to a location that contains code actions.
3950        editor_b.update(cx_b, |editor, cx| {
3951            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3952            cx.focus(&editor_b);
3953        });
3954
3955        fake_language_server
3956            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3957                assert_eq!(
3958                    params.text_document.uri,
3959                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3960                );
3961                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3962                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3963
3964                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
3965                    lsp::CodeAction {
3966                        title: "Inline into all callers".to_string(),
3967                        edit: Some(lsp::WorkspaceEdit {
3968                            changes: Some(
3969                                [
3970                                    (
3971                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3972                                        vec![lsp::TextEdit::new(
3973                                            lsp::Range::new(
3974                                                lsp::Position::new(1, 22),
3975                                                lsp::Position::new(1, 34),
3976                                            ),
3977                                            "4".to_string(),
3978                                        )],
3979                                    ),
3980                                    (
3981                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3982                                        vec![lsp::TextEdit::new(
3983                                            lsp::Range::new(
3984                                                lsp::Position::new(0, 0),
3985                                                lsp::Position::new(0, 27),
3986                                            ),
3987                                            "".to_string(),
3988                                        )],
3989                                    ),
3990                                ]
3991                                .into_iter()
3992                                .collect(),
3993                            ),
3994                            ..Default::default()
3995                        }),
3996                        data: Some(json!({
3997                            "codeActionParams": {
3998                                "range": {
3999                                    "start": {"line": 1, "column": 31},
4000                                    "end": {"line": 1, "column": 31},
4001                                }
4002                            }
4003                        })),
4004                        ..Default::default()
4005                    },
4006                )]))
4007            })
4008            .next()
4009            .await;
4010
4011        // Toggle code actions and wait for them to display.
4012        editor_b.update(cx_b, |editor, cx| {
4013            editor.toggle_code_actions(
4014                &ToggleCodeActions {
4015                    deployed_from_indicator: false,
4016                },
4017                cx,
4018            );
4019        });
4020        editor_b
4021            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4022            .await;
4023
4024        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4025
4026        // Confirming the code action will trigger a resolve request.
4027        let confirm_action = workspace_b
4028            .update(cx_b, |workspace, cx| {
4029                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4030            })
4031            .unwrap();
4032        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4033            |_, _| async move {
4034                Ok(lsp::CodeAction {
4035                    title: "Inline into all callers".to_string(),
4036                    edit: Some(lsp::WorkspaceEdit {
4037                        changes: Some(
4038                            [
4039                                (
4040                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4041                                    vec![lsp::TextEdit::new(
4042                                        lsp::Range::new(
4043                                            lsp::Position::new(1, 22),
4044                                            lsp::Position::new(1, 34),
4045                                        ),
4046                                        "4".to_string(),
4047                                    )],
4048                                ),
4049                                (
4050                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4051                                    vec![lsp::TextEdit::new(
4052                                        lsp::Range::new(
4053                                            lsp::Position::new(0, 0),
4054                                            lsp::Position::new(0, 27),
4055                                        ),
4056                                        "".to_string(),
4057                                    )],
4058                                ),
4059                            ]
4060                            .into_iter()
4061                            .collect(),
4062                        ),
4063                        ..Default::default()
4064                    }),
4065                    ..Default::default()
4066                })
4067            },
4068        );
4069
4070        // After the action is confirmed, an editor containing both modified files is opened.
4071        confirm_action.await.unwrap();
4072        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4073            workspace
4074                .active_item(cx)
4075                .unwrap()
4076                .downcast::<Editor>()
4077                .unwrap()
4078        });
4079        code_action_editor.update(cx_b, |editor, cx| {
4080            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4081            editor.undo(&Undo, cx);
4082            assert_eq!(
4083                editor.text(cx),
4084                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4085            );
4086            editor.redo(&Redo, cx);
4087            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4088        });
4089    }
4090
4091    #[gpui::test(iterations = 10)]
4092    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4093        cx_a.foreground().forbid_parking();
4094        let lang_registry = Arc::new(LanguageRegistry::test());
4095        let fs = FakeFs::new(cx_a.background());
4096        cx_b.update(|cx| editor::init(cx));
4097
4098        // Set up a fake language server.
4099        let mut language = Language::new(
4100            LanguageConfig {
4101                name: "Rust".into(),
4102                path_suffixes: vec!["rs".to_string()],
4103                ..Default::default()
4104            },
4105            Some(tree_sitter_rust::language()),
4106        );
4107        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4108            capabilities: lsp::ServerCapabilities {
4109                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4110                    prepare_provider: Some(true),
4111                    work_done_progress_options: Default::default(),
4112                })),
4113                ..Default::default()
4114            },
4115            ..Default::default()
4116        });
4117        lang_registry.add(Arc::new(language));
4118
4119        // Connect to a server as 2 clients.
4120        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4121        let client_a = server.create_client(cx_a, "user_a").await;
4122        let client_b = server.create_client(cx_b, "user_b").await;
4123
4124        // Share a project as client A
4125        fs.insert_tree(
4126            "/dir",
4127            json!({
4128                ".zed.toml": r#"collaborators = ["user_b"]"#,
4129                "one.rs": "const ONE: usize = 1;",
4130                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4131            }),
4132        )
4133        .await;
4134        let project_a = cx_a.update(|cx| {
4135            Project::local(
4136                client_a.clone(),
4137                client_a.user_store.clone(),
4138                lang_registry.clone(),
4139                fs.clone(),
4140                cx,
4141            )
4142        });
4143        let (worktree_a, _) = project_a
4144            .update(cx_a, |p, cx| {
4145                p.find_or_create_local_worktree("/dir", true, cx)
4146            })
4147            .await
4148            .unwrap();
4149        worktree_a
4150            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4151            .await;
4152        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4153        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4154        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4155
4156        // Join the worktree as client B.
4157        let project_b = Project::remote(
4158            project_id,
4159            client_b.clone(),
4160            client_b.user_store.clone(),
4161            lang_registry.clone(),
4162            fs.clone(),
4163            &mut cx_b.to_async(),
4164        )
4165        .await
4166        .unwrap();
4167        let mut params = cx_b.update(WorkspaceParams::test);
4168        params.languages = lang_registry.clone();
4169        params.client = client_b.client.clone();
4170        params.user_store = client_b.user_store.clone();
4171        params.project = project_b;
4172
4173        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4174        let editor_b = workspace_b
4175            .update(cx_b, |workspace, cx| {
4176                workspace.open_path((worktree_id, "one.rs"), true, cx)
4177            })
4178            .await
4179            .unwrap()
4180            .downcast::<Editor>()
4181            .unwrap();
4182        let fake_language_server = fake_language_servers.next().await.unwrap();
4183
4184        // Move cursor to a location that can be renamed.
4185        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4186            editor.select_ranges([7..7], None, cx);
4187            editor.rename(&Rename, cx).unwrap()
4188        });
4189
4190        fake_language_server
4191            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4192                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4193                assert_eq!(params.position, lsp::Position::new(0, 7));
4194                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4195                    lsp::Position::new(0, 6),
4196                    lsp::Position::new(0, 9),
4197                ))))
4198            })
4199            .next()
4200            .await
4201            .unwrap();
4202        prepare_rename.await.unwrap();
4203        editor_b.update(cx_b, |editor, cx| {
4204            let rename = editor.pending_rename().unwrap();
4205            let buffer = editor.buffer().read(cx).snapshot(cx);
4206            assert_eq!(
4207                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4208                6..9
4209            );
4210            rename.editor.update(cx, |rename_editor, cx| {
4211                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4212                    rename_buffer.edit([(0..3, "THREE")], cx);
4213                });
4214            });
4215        });
4216
4217        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4218            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4219        });
4220        fake_language_server
4221            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4222                assert_eq!(
4223                    params.text_document_position.text_document.uri.as_str(),
4224                    "file:///dir/one.rs"
4225                );
4226                assert_eq!(
4227                    params.text_document_position.position,
4228                    lsp::Position::new(0, 6)
4229                );
4230                assert_eq!(params.new_name, "THREE");
4231                Ok(Some(lsp::WorkspaceEdit {
4232                    changes: Some(
4233                        [
4234                            (
4235                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4236                                vec![lsp::TextEdit::new(
4237                                    lsp::Range::new(
4238                                        lsp::Position::new(0, 6),
4239                                        lsp::Position::new(0, 9),
4240                                    ),
4241                                    "THREE".to_string(),
4242                                )],
4243                            ),
4244                            (
4245                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4246                                vec![
4247                                    lsp::TextEdit::new(
4248                                        lsp::Range::new(
4249                                            lsp::Position::new(0, 24),
4250                                            lsp::Position::new(0, 27),
4251                                        ),
4252                                        "THREE".to_string(),
4253                                    ),
4254                                    lsp::TextEdit::new(
4255                                        lsp::Range::new(
4256                                            lsp::Position::new(0, 35),
4257                                            lsp::Position::new(0, 38),
4258                                        ),
4259                                        "THREE".to_string(),
4260                                    ),
4261                                ],
4262                            ),
4263                        ]
4264                        .into_iter()
4265                        .collect(),
4266                    ),
4267                    ..Default::default()
4268                }))
4269            })
4270            .next()
4271            .await
4272            .unwrap();
4273        confirm_rename.await.unwrap();
4274
4275        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4276            workspace
4277                .active_item(cx)
4278                .unwrap()
4279                .downcast::<Editor>()
4280                .unwrap()
4281        });
4282        rename_editor.update(cx_b, |editor, cx| {
4283            assert_eq!(
4284                editor.text(cx),
4285                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4286            );
4287            editor.undo(&Undo, cx);
4288            assert_eq!(
4289                editor.text(cx),
4290                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4291            );
4292            editor.redo(&Redo, cx);
4293            assert_eq!(
4294                editor.text(cx),
4295                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4296            );
4297        });
4298
4299        // Ensure temporary rename edits cannot be undone/redone.
4300        editor_b.update(cx_b, |editor, cx| {
4301            editor.undo(&Undo, cx);
4302            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4303            editor.undo(&Undo, cx);
4304            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4305            editor.redo(&Redo, cx);
4306            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4307        })
4308    }
4309
4310    #[gpui::test(iterations = 10)]
4311    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4312        cx_a.foreground().forbid_parking();
4313
4314        // Connect to a server as 2 clients.
4315        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4316        let client_a = server.create_client(cx_a, "user_a").await;
4317        let client_b = server.create_client(cx_b, "user_b").await;
4318
4319        // Create an org that includes these 2 users.
4320        let db = &server.app_state.db;
4321        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4322        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4323            .await
4324            .unwrap();
4325        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4326            .await
4327            .unwrap();
4328
4329        // Create a channel that includes all the users.
4330        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4331        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4332            .await
4333            .unwrap();
4334        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4335            .await
4336            .unwrap();
4337        db.create_channel_message(
4338            channel_id,
4339            client_b.current_user_id(&cx_b),
4340            "hello A, it's B.",
4341            OffsetDateTime::now_utc(),
4342            1,
4343        )
4344        .await
4345        .unwrap();
4346
4347        let channels_a = cx_a
4348            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4349        channels_a
4350            .condition(cx_a, |list, _| list.available_channels().is_some())
4351            .await;
4352        channels_a.read_with(cx_a, |list, _| {
4353            assert_eq!(
4354                list.available_channels().unwrap(),
4355                &[ChannelDetails {
4356                    id: channel_id.to_proto(),
4357                    name: "test-channel".to_string()
4358                }]
4359            )
4360        });
4361        let channel_a = channels_a.update(cx_a, |this, cx| {
4362            this.get_channel(channel_id.to_proto(), cx).unwrap()
4363        });
4364        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4365        channel_a
4366            .condition(&cx_a, |channel, _| {
4367                channel_messages(channel)
4368                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4369            })
4370            .await;
4371
4372        let channels_b = cx_b
4373            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4374        channels_b
4375            .condition(cx_b, |list, _| list.available_channels().is_some())
4376            .await;
4377        channels_b.read_with(cx_b, |list, _| {
4378            assert_eq!(
4379                list.available_channels().unwrap(),
4380                &[ChannelDetails {
4381                    id: channel_id.to_proto(),
4382                    name: "test-channel".to_string()
4383                }]
4384            )
4385        });
4386
4387        let channel_b = channels_b.update(cx_b, |this, cx| {
4388            this.get_channel(channel_id.to_proto(), cx).unwrap()
4389        });
4390        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4391        channel_b
4392            .condition(&cx_b, |channel, _| {
4393                channel_messages(channel)
4394                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4395            })
4396            .await;
4397
4398        channel_a
4399            .update(cx_a, |channel, cx| {
4400                channel
4401                    .send_message("oh, hi B.".to_string(), cx)
4402                    .unwrap()
4403                    .detach();
4404                let task = channel.send_message("sup".to_string(), cx).unwrap();
4405                assert_eq!(
4406                    channel_messages(channel),
4407                    &[
4408                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4409                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4410                        ("user_a".to_string(), "sup".to_string(), true)
4411                    ]
4412                );
4413                task
4414            })
4415            .await
4416            .unwrap();
4417
4418        channel_b
4419            .condition(&cx_b, |channel, _| {
4420                channel_messages(channel)
4421                    == [
4422                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4423                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4424                        ("user_a".to_string(), "sup".to_string(), false),
4425                    ]
4426            })
4427            .await;
4428
4429        assert_eq!(
4430            server
4431                .state()
4432                .await
4433                .channel(channel_id)
4434                .unwrap()
4435                .connection_ids
4436                .len(),
4437            2
4438        );
4439        cx_b.update(|_| drop(channel_b));
4440        server
4441            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4442            .await;
4443
4444        cx_a.update(|_| drop(channel_a));
4445        server
4446            .condition(|state| state.channel(channel_id).is_none())
4447            .await;
4448    }
4449
4450    #[gpui::test(iterations = 10)]
4451    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4452        cx_a.foreground().forbid_parking();
4453
4454        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4455        let client_a = server.create_client(cx_a, "user_a").await;
4456
4457        let db = &server.app_state.db;
4458        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4459        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4460        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4461            .await
4462            .unwrap();
4463        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4464            .await
4465            .unwrap();
4466
4467        let channels_a = cx_a
4468            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4469        channels_a
4470            .condition(cx_a, |list, _| list.available_channels().is_some())
4471            .await;
4472        let channel_a = channels_a.update(cx_a, |this, cx| {
4473            this.get_channel(channel_id.to_proto(), cx).unwrap()
4474        });
4475
4476        // Messages aren't allowed to be too long.
4477        channel_a
4478            .update(cx_a, |channel, cx| {
4479                let long_body = "this is long.\n".repeat(1024);
4480                channel.send_message(long_body, cx).unwrap()
4481            })
4482            .await
4483            .unwrap_err();
4484
4485        // Messages aren't allowed to be blank.
4486        channel_a.update(cx_a, |channel, cx| {
4487            channel.send_message(String::new(), cx).unwrap_err()
4488        });
4489
4490        // Leading and trailing whitespace are trimmed.
4491        channel_a
4492            .update(cx_a, |channel, cx| {
4493                channel
4494                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4495                    .unwrap()
4496            })
4497            .await
4498            .unwrap();
4499        assert_eq!(
4500            db.get_channel_messages(channel_id, 10, None)
4501                .await
4502                .unwrap()
4503                .iter()
4504                .map(|m| &m.body)
4505                .collect::<Vec<_>>(),
4506            &["surrounded by whitespace"]
4507        );
4508    }
4509
4510    #[gpui::test(iterations = 10)]
4511    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4512        cx_a.foreground().forbid_parking();
4513
4514        // Connect to a server as 2 clients.
4515        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4516        let client_a = server.create_client(cx_a, "user_a").await;
4517        let client_b = server.create_client(cx_b, "user_b").await;
4518        let mut status_b = client_b.status();
4519
4520        // Create an org that includes these 2 users.
4521        let db = &server.app_state.db;
4522        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4523        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4524            .await
4525            .unwrap();
4526        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4527            .await
4528            .unwrap();
4529
4530        // Create a channel that includes all the users.
4531        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4532        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4533            .await
4534            .unwrap();
4535        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4536            .await
4537            .unwrap();
4538        db.create_channel_message(
4539            channel_id,
4540            client_b.current_user_id(&cx_b),
4541            "hello A, it's B.",
4542            OffsetDateTime::now_utc(),
4543            2,
4544        )
4545        .await
4546        .unwrap();
4547
4548        let channels_a = cx_a
4549            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4550        channels_a
4551            .condition(cx_a, |list, _| list.available_channels().is_some())
4552            .await;
4553
4554        channels_a.read_with(cx_a, |list, _| {
4555            assert_eq!(
4556                list.available_channels().unwrap(),
4557                &[ChannelDetails {
4558                    id: channel_id.to_proto(),
4559                    name: "test-channel".to_string()
4560                }]
4561            )
4562        });
4563        let channel_a = channels_a.update(cx_a, |this, cx| {
4564            this.get_channel(channel_id.to_proto(), cx).unwrap()
4565        });
4566        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4567        channel_a
4568            .condition(&cx_a, |channel, _| {
4569                channel_messages(channel)
4570                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4571            })
4572            .await;
4573
4574        let channels_b = cx_b
4575            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4576        channels_b
4577            .condition(cx_b, |list, _| list.available_channels().is_some())
4578            .await;
4579        channels_b.read_with(cx_b, |list, _| {
4580            assert_eq!(
4581                list.available_channels().unwrap(),
4582                &[ChannelDetails {
4583                    id: channel_id.to_proto(),
4584                    name: "test-channel".to_string()
4585                }]
4586            )
4587        });
4588
4589        let channel_b = channels_b.update(cx_b, |this, cx| {
4590            this.get_channel(channel_id.to_proto(), cx).unwrap()
4591        });
4592        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4593        channel_b
4594            .condition(&cx_b, |channel, _| {
4595                channel_messages(channel)
4596                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4597            })
4598            .await;
4599
4600        // Disconnect client B, ensuring we can still access its cached channel data.
4601        server.forbid_connections();
4602        server.disconnect_client(client_b.current_user_id(&cx_b));
4603        cx_b.foreground().advance_clock(Duration::from_secs(3));
4604        while !matches!(
4605            status_b.next().await,
4606            Some(client::Status::ReconnectionError { .. })
4607        ) {}
4608
4609        channels_b.read_with(cx_b, |channels, _| {
4610            assert_eq!(
4611                channels.available_channels().unwrap(),
4612                [ChannelDetails {
4613                    id: channel_id.to_proto(),
4614                    name: "test-channel".to_string()
4615                }]
4616            )
4617        });
4618        channel_b.read_with(cx_b, |channel, _| {
4619            assert_eq!(
4620                channel_messages(channel),
4621                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4622            )
4623        });
4624
4625        // Send a message from client B while it is disconnected.
4626        channel_b
4627            .update(cx_b, |channel, cx| {
4628                let task = channel
4629                    .send_message("can you see this?".to_string(), cx)
4630                    .unwrap();
4631                assert_eq!(
4632                    channel_messages(channel),
4633                    &[
4634                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4635                        ("user_b".to_string(), "can you see this?".to_string(), true)
4636                    ]
4637                );
4638                task
4639            })
4640            .await
4641            .unwrap_err();
4642
4643        // Send a message from client A while B is disconnected.
4644        channel_a
4645            .update(cx_a, |channel, cx| {
4646                channel
4647                    .send_message("oh, hi B.".to_string(), cx)
4648                    .unwrap()
4649                    .detach();
4650                let task = channel.send_message("sup".to_string(), cx).unwrap();
4651                assert_eq!(
4652                    channel_messages(channel),
4653                    &[
4654                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4655                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4656                        ("user_a".to_string(), "sup".to_string(), true)
4657                    ]
4658                );
4659                task
4660            })
4661            .await
4662            .unwrap();
4663
4664        // Give client B a chance to reconnect.
4665        server.allow_connections();
4666        cx_b.foreground().advance_clock(Duration::from_secs(10));
4667
4668        // Verify that B sees the new messages upon reconnection, as well as the message client B
4669        // sent while offline.
4670        channel_b
4671            .condition(&cx_b, |channel, _| {
4672                channel_messages(channel)
4673                    == [
4674                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4675                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4676                        ("user_a".to_string(), "sup".to_string(), false),
4677                        ("user_b".to_string(), "can you see this?".to_string(), false),
4678                    ]
4679            })
4680            .await;
4681
4682        // Ensure client A and B can communicate normally after reconnection.
4683        channel_a
4684            .update(cx_a, |channel, cx| {
4685                channel.send_message("you online?".to_string(), cx).unwrap()
4686            })
4687            .await
4688            .unwrap();
4689        channel_b
4690            .condition(&cx_b, |channel, _| {
4691                channel_messages(channel)
4692                    == [
4693                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4694                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4695                        ("user_a".to_string(), "sup".to_string(), false),
4696                        ("user_b".to_string(), "can you see this?".to_string(), false),
4697                        ("user_a".to_string(), "you online?".to_string(), false),
4698                    ]
4699            })
4700            .await;
4701
4702        channel_b
4703            .update(cx_b, |channel, cx| {
4704                channel.send_message("yep".to_string(), cx).unwrap()
4705            })
4706            .await
4707            .unwrap();
4708        channel_a
4709            .condition(&cx_a, |channel, _| {
4710                channel_messages(channel)
4711                    == [
4712                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4713                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4714                        ("user_a".to_string(), "sup".to_string(), false),
4715                        ("user_b".to_string(), "can you see this?".to_string(), false),
4716                        ("user_a".to_string(), "you online?".to_string(), false),
4717                        ("user_b".to_string(), "yep".to_string(), false),
4718                    ]
4719            })
4720            .await;
4721    }
4722
4723    #[gpui::test(iterations = 10)]
4724    async fn test_contacts(
4725        cx_a: &mut TestAppContext,
4726        cx_b: &mut TestAppContext,
4727        cx_c: &mut TestAppContext,
4728    ) {
4729        cx_a.foreground().forbid_parking();
4730        let lang_registry = Arc::new(LanguageRegistry::test());
4731        let fs = FakeFs::new(cx_a.background());
4732
4733        // Connect to a server as 3 clients.
4734        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4735        let client_a = server.create_client(cx_a, "user_a").await;
4736        let client_b = server.create_client(cx_b, "user_b").await;
4737        let client_c = server.create_client(cx_c, "user_c").await;
4738
4739        // Share a worktree as client A.
4740        fs.insert_tree(
4741            "/a",
4742            json!({
4743                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4744            }),
4745        )
4746        .await;
4747
4748        let project_a = cx_a.update(|cx| {
4749            Project::local(
4750                client_a.clone(),
4751                client_a.user_store.clone(),
4752                lang_registry.clone(),
4753                fs.clone(),
4754                cx,
4755            )
4756        });
4757        let (worktree_a, _) = project_a
4758            .update(cx_a, |p, cx| {
4759                p.find_or_create_local_worktree("/a", true, cx)
4760            })
4761            .await
4762            .unwrap();
4763        worktree_a
4764            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4765            .await;
4766
4767        client_a
4768            .user_store
4769            .condition(&cx_a, |user_store, _| {
4770                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4771            })
4772            .await;
4773        client_b
4774            .user_store
4775            .condition(&cx_b, |user_store, _| {
4776                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4777            })
4778            .await;
4779        client_c
4780            .user_store
4781            .condition(&cx_c, |user_store, _| {
4782                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4783            })
4784            .await;
4785
4786        let project_id = project_a
4787            .update(cx_a, |project, _| project.next_remote_id())
4788            .await;
4789        project_a
4790            .update(cx_a, |project, cx| project.share(cx))
4791            .await
4792            .unwrap();
4793        client_a
4794            .user_store
4795            .condition(&cx_a, |user_store, _| {
4796                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4797            })
4798            .await;
4799        client_b
4800            .user_store
4801            .condition(&cx_b, |user_store, _| {
4802                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4803            })
4804            .await;
4805        client_c
4806            .user_store
4807            .condition(&cx_c, |user_store, _| {
4808                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4809            })
4810            .await;
4811
4812        let _project_b = Project::remote(
4813            project_id,
4814            client_b.clone(),
4815            client_b.user_store.clone(),
4816            lang_registry.clone(),
4817            fs.clone(),
4818            &mut cx_b.to_async(),
4819        )
4820        .await
4821        .unwrap();
4822
4823        client_a
4824            .user_store
4825            .condition(&cx_a, |user_store, _| {
4826                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4827            })
4828            .await;
4829        client_b
4830            .user_store
4831            .condition(&cx_b, |user_store, _| {
4832                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4833            })
4834            .await;
4835        client_c
4836            .user_store
4837            .condition(&cx_c, |user_store, _| {
4838                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4839            })
4840            .await;
4841
4842        project_a
4843            .condition(&cx_a, |project, _| {
4844                project.collaborators().contains_key(&client_b.peer_id)
4845            })
4846            .await;
4847
4848        cx_a.update(move |_| drop(project_a));
4849        client_a
4850            .user_store
4851            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4852            .await;
4853        client_b
4854            .user_store
4855            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4856            .await;
4857        client_c
4858            .user_store
4859            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4860            .await;
4861
4862        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
4863            user_store
4864                .contacts()
4865                .iter()
4866                .map(|contact| {
4867                    let worktrees = contact
4868                        .projects
4869                        .iter()
4870                        .map(|p| {
4871                            (
4872                                p.worktree_root_names[0].as_str(),
4873                                p.is_shared,
4874                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4875                            )
4876                        })
4877                        .collect();
4878                    (contact.user.github_login.as_str(), worktrees)
4879                })
4880                .collect()
4881        }
4882    }
4883
4884    #[gpui::test(iterations = 10)]
4885    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4886        cx_a.foreground().forbid_parking();
4887        let fs = FakeFs::new(cx_a.background());
4888
4889        // 2 clients connect to a server.
4890        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4891        let mut client_a = server.create_client(cx_a, "user_a").await;
4892        let mut client_b = server.create_client(cx_b, "user_b").await;
4893        cx_a.update(editor::init);
4894        cx_b.update(editor::init);
4895
4896        // Client A shares a project.
4897        fs.insert_tree(
4898            "/a",
4899            json!({
4900                ".zed.toml": r#"collaborators = ["user_b"]"#,
4901                "1.txt": "one",
4902                "2.txt": "two",
4903                "3.txt": "three",
4904            }),
4905        )
4906        .await;
4907        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4908        project_a
4909            .update(cx_a, |project, cx| project.share(cx))
4910            .await
4911            .unwrap();
4912
4913        // Client B joins the project.
4914        let project_b = client_b
4915            .build_remote_project(
4916                project_a
4917                    .read_with(cx_a, |project, _| project.remote_id())
4918                    .unwrap(),
4919                cx_b,
4920            )
4921            .await;
4922
4923        // Client A opens some editors.
4924        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4925        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4926        let editor_a1 = workspace_a
4927            .update(cx_a, |workspace, cx| {
4928                workspace.open_path((worktree_id, "1.txt"), true, cx)
4929            })
4930            .await
4931            .unwrap()
4932            .downcast::<Editor>()
4933            .unwrap();
4934        let editor_a2 = workspace_a
4935            .update(cx_a, |workspace, cx| {
4936                workspace.open_path((worktree_id, "2.txt"), true, cx)
4937            })
4938            .await
4939            .unwrap()
4940            .downcast::<Editor>()
4941            .unwrap();
4942
4943        // Client B opens an editor.
4944        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4945        let editor_b1 = workspace_b
4946            .update(cx_b, |workspace, cx| {
4947                workspace.open_path((worktree_id, "1.txt"), true, cx)
4948            })
4949            .await
4950            .unwrap()
4951            .downcast::<Editor>()
4952            .unwrap();
4953
4954        let client_a_id = project_b.read_with(cx_b, |project, _| {
4955            project.collaborators().values().next().unwrap().peer_id
4956        });
4957        let client_b_id = project_a.read_with(cx_a, |project, _| {
4958            project.collaborators().values().next().unwrap().peer_id
4959        });
4960
4961        // When client B starts following client A, all visible view states are replicated to client B.
4962        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4963        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4964        workspace_b
4965            .update(cx_b, |workspace, cx| {
4966                workspace
4967                    .toggle_follow(&ToggleFollow(client_a_id), cx)
4968                    .unwrap()
4969            })
4970            .await
4971            .unwrap();
4972        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4973            workspace
4974                .active_item(cx)
4975                .unwrap()
4976                .downcast::<Editor>()
4977                .unwrap()
4978        });
4979        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4980        assert_eq!(
4981            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4982            Some((worktree_id, "2.txt").into())
4983        );
4984        assert_eq!(
4985            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4986            vec![2..3]
4987        );
4988        assert_eq!(
4989            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4990            vec![0..1]
4991        );
4992
4993        // When client A activates a different editor, client B does so as well.
4994        workspace_a.update(cx_a, |workspace, cx| {
4995            workspace.activate_item(&editor_a1, cx)
4996        });
4997        workspace_b
4998            .condition(cx_b, |workspace, cx| {
4999                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5000            })
5001            .await;
5002
5003        // When client A navigates back and forth, client B does so as well.
5004        workspace_a
5005            .update(cx_a, |workspace, cx| {
5006                workspace::Pane::go_back(workspace, None, cx)
5007            })
5008            .await;
5009        workspace_b
5010            .condition(cx_b, |workspace, cx| {
5011                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5012            })
5013            .await;
5014
5015        workspace_a
5016            .update(cx_a, |workspace, cx| {
5017                workspace::Pane::go_forward(workspace, None, cx)
5018            })
5019            .await;
5020        workspace_b
5021            .condition(cx_b, |workspace, cx| {
5022                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5023            })
5024            .await;
5025
5026        // Changes to client A's editor are reflected on client B.
5027        editor_a1.update(cx_a, |editor, cx| {
5028            editor.select_ranges([1..1, 2..2], None, cx);
5029        });
5030        editor_b1
5031            .condition(cx_b, |editor, cx| {
5032                editor.selected_ranges(cx) == vec![1..1, 2..2]
5033            })
5034            .await;
5035
5036        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5037        editor_b1
5038            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5039            .await;
5040
5041        editor_a1.update(cx_a, |editor, cx| {
5042            editor.select_ranges([3..3], None, cx);
5043            editor.set_scroll_position(vec2f(0., 100.), cx);
5044        });
5045        editor_b1
5046            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5047            .await;
5048
5049        // After unfollowing, client B stops receiving updates from client A.
5050        workspace_b.update(cx_b, |workspace, cx| {
5051            workspace.unfollow(&workspace.active_pane().clone(), cx)
5052        });
5053        workspace_a.update(cx_a, |workspace, cx| {
5054            workspace.activate_item(&editor_a2, cx)
5055        });
5056        cx_a.foreground().run_until_parked();
5057        assert_eq!(
5058            workspace_b.read_with(cx_b, |workspace, cx| workspace
5059                .active_item(cx)
5060                .unwrap()
5061                .id()),
5062            editor_b1.id()
5063        );
5064
5065        // Client A starts following client B.
5066        workspace_a
5067            .update(cx_a, |workspace, cx| {
5068                workspace
5069                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5070                    .unwrap()
5071            })
5072            .await
5073            .unwrap();
5074        assert_eq!(
5075            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5076            Some(client_b_id)
5077        );
5078        assert_eq!(
5079            workspace_a.read_with(cx_a, |workspace, cx| workspace
5080                .active_item(cx)
5081                .unwrap()
5082                .id()),
5083            editor_a1.id()
5084        );
5085
5086        // Following interrupts when client B disconnects.
5087        client_b.disconnect(&cx_b.to_async()).unwrap();
5088        cx_a.foreground().run_until_parked();
5089        assert_eq!(
5090            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5091            None
5092        );
5093    }
5094
5095    #[gpui::test(iterations = 10)]
5096    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5097        cx_a.foreground().forbid_parking();
5098        let fs = FakeFs::new(cx_a.background());
5099
5100        // 2 clients connect to a server.
5101        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5102        let mut client_a = server.create_client(cx_a, "user_a").await;
5103        let mut client_b = server.create_client(cx_b, "user_b").await;
5104        cx_a.update(editor::init);
5105        cx_b.update(editor::init);
5106
5107        // Client A shares a project.
5108        fs.insert_tree(
5109            "/a",
5110            json!({
5111                ".zed.toml": r#"collaborators = ["user_b"]"#,
5112                "1.txt": "one",
5113                "2.txt": "two",
5114                "3.txt": "three",
5115                "4.txt": "four",
5116            }),
5117        )
5118        .await;
5119        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5120        project_a
5121            .update(cx_a, |project, cx| project.share(cx))
5122            .await
5123            .unwrap();
5124
5125        // Client B joins the project.
5126        let project_b = client_b
5127            .build_remote_project(
5128                project_a
5129                    .read_with(cx_a, |project, _| project.remote_id())
5130                    .unwrap(),
5131                cx_b,
5132            )
5133            .await;
5134
5135        // Client A opens some editors.
5136        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5137        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5138        let _editor_a1 = workspace_a
5139            .update(cx_a, |workspace, cx| {
5140                workspace.open_path((worktree_id, "1.txt"), true, cx)
5141            })
5142            .await
5143            .unwrap()
5144            .downcast::<Editor>()
5145            .unwrap();
5146
5147        // Client B opens an editor.
5148        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5149        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5150        let _editor_b1 = workspace_b
5151            .update(cx_b, |workspace, cx| {
5152                workspace.open_path((worktree_id, "2.txt"), true, cx)
5153            })
5154            .await
5155            .unwrap()
5156            .downcast::<Editor>()
5157            .unwrap();
5158
5159        // Clients A and B follow each other in split panes
5160        workspace_a
5161            .update(cx_a, |workspace, cx| {
5162                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5163                assert_ne!(*workspace.active_pane(), pane_a1);
5164                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5165                workspace
5166                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5167                    .unwrap()
5168            })
5169            .await
5170            .unwrap();
5171        workspace_b
5172            .update(cx_b, |workspace, cx| {
5173                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5174                assert_ne!(*workspace.active_pane(), pane_b1);
5175                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5176                workspace
5177                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5178                    .unwrap()
5179            })
5180            .await
5181            .unwrap();
5182
5183        workspace_a
5184            .update(cx_a, |workspace, cx| {
5185                workspace.activate_next_pane(cx);
5186                assert_eq!(*workspace.active_pane(), pane_a1);
5187                workspace.open_path((worktree_id, "3.txt"), true, cx)
5188            })
5189            .await
5190            .unwrap();
5191        workspace_b
5192            .update(cx_b, |workspace, cx| {
5193                workspace.activate_next_pane(cx);
5194                assert_eq!(*workspace.active_pane(), pane_b1);
5195                workspace.open_path((worktree_id, "4.txt"), true, cx)
5196            })
5197            .await
5198            .unwrap();
5199        cx_a.foreground().run_until_parked();
5200
5201        // Ensure leader updates don't change the active pane of followers
5202        workspace_a.read_with(cx_a, |workspace, _| {
5203            assert_eq!(*workspace.active_pane(), pane_a1);
5204        });
5205        workspace_b.read_with(cx_b, |workspace, _| {
5206            assert_eq!(*workspace.active_pane(), pane_b1);
5207        });
5208
5209        // Ensure peers following each other doesn't cause an infinite loop.
5210        assert_eq!(
5211            workspace_a.read_with(cx_a, |workspace, cx| workspace
5212                .active_item(cx)
5213                .unwrap()
5214                .project_path(cx)),
5215            Some((worktree_id, "3.txt").into())
5216        );
5217        workspace_a.update(cx_a, |workspace, cx| {
5218            assert_eq!(
5219                workspace.active_item(cx).unwrap().project_path(cx),
5220                Some((worktree_id, "3.txt").into())
5221            );
5222            workspace.activate_next_pane(cx);
5223            assert_eq!(
5224                workspace.active_item(cx).unwrap().project_path(cx),
5225                Some((worktree_id, "4.txt").into())
5226            );
5227        });
5228        workspace_b.update(cx_b, |workspace, cx| {
5229            assert_eq!(
5230                workspace.active_item(cx).unwrap().project_path(cx),
5231                Some((worktree_id, "4.txt").into())
5232            );
5233            workspace.activate_next_pane(cx);
5234            assert_eq!(
5235                workspace.active_item(cx).unwrap().project_path(cx),
5236                Some((worktree_id, "3.txt").into())
5237            );
5238        });
5239    }
5240
5241    #[gpui::test(iterations = 10)]
5242    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5243        cx_a.foreground().forbid_parking();
5244        let fs = FakeFs::new(cx_a.background());
5245
5246        // 2 clients connect to a server.
5247        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5248        let mut client_a = server.create_client(cx_a, "user_a").await;
5249        let mut client_b = server.create_client(cx_b, "user_b").await;
5250        cx_a.update(editor::init);
5251        cx_b.update(editor::init);
5252
5253        // Client A shares a project.
5254        fs.insert_tree(
5255            "/a",
5256            json!({
5257                ".zed.toml": r#"collaborators = ["user_b"]"#,
5258                "1.txt": "one",
5259                "2.txt": "two",
5260                "3.txt": "three",
5261            }),
5262        )
5263        .await;
5264        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5265        project_a
5266            .update(cx_a, |project, cx| project.share(cx))
5267            .await
5268            .unwrap();
5269
5270        // Client B joins the project.
5271        let project_b = client_b
5272            .build_remote_project(
5273                project_a
5274                    .read_with(cx_a, |project, _| project.remote_id())
5275                    .unwrap(),
5276                cx_b,
5277            )
5278            .await;
5279
5280        // Client A opens some editors.
5281        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5282        let _editor_a1 = workspace_a
5283            .update(cx_a, |workspace, cx| {
5284                workspace.open_path((worktree_id, "1.txt"), true, cx)
5285            })
5286            .await
5287            .unwrap()
5288            .downcast::<Editor>()
5289            .unwrap();
5290
5291        // Client B starts following client A.
5292        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5293        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5294        let leader_id = project_b.read_with(cx_b, |project, _| {
5295            project.collaborators().values().next().unwrap().peer_id
5296        });
5297        workspace_b
5298            .update(cx_b, |workspace, cx| {
5299                workspace
5300                    .toggle_follow(&ToggleFollow(leader_id), cx)
5301                    .unwrap()
5302            })
5303            .await
5304            .unwrap();
5305        assert_eq!(
5306            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5307            Some(leader_id)
5308        );
5309        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5310            workspace
5311                .active_item(cx)
5312                .unwrap()
5313                .downcast::<Editor>()
5314                .unwrap()
5315        });
5316
5317        // When client B moves, it automatically stops following client A.
5318        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5319        assert_eq!(
5320            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5321            None
5322        );
5323
5324        workspace_b
5325            .update(cx_b, |workspace, cx| {
5326                workspace
5327                    .toggle_follow(&ToggleFollow(leader_id), cx)
5328                    .unwrap()
5329            })
5330            .await
5331            .unwrap();
5332        assert_eq!(
5333            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5334            Some(leader_id)
5335        );
5336
5337        // When client B edits, it automatically stops following client A.
5338        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5339        assert_eq!(
5340            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5341            None
5342        );
5343
5344        workspace_b
5345            .update(cx_b, |workspace, cx| {
5346                workspace
5347                    .toggle_follow(&ToggleFollow(leader_id), cx)
5348                    .unwrap()
5349            })
5350            .await
5351            .unwrap();
5352        assert_eq!(
5353            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5354            Some(leader_id)
5355        );
5356
5357        // When client B scrolls, it automatically stops following client A.
5358        editor_b2.update(cx_b, |editor, cx| {
5359            editor.set_scroll_position(vec2f(0., 3.), cx)
5360        });
5361        assert_eq!(
5362            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5363            None
5364        );
5365
5366        workspace_b
5367            .update(cx_b, |workspace, cx| {
5368                workspace
5369                    .toggle_follow(&ToggleFollow(leader_id), cx)
5370                    .unwrap()
5371            })
5372            .await
5373            .unwrap();
5374        assert_eq!(
5375            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5376            Some(leader_id)
5377        );
5378
5379        // When client B activates a different pane, it continues following client A in the original pane.
5380        workspace_b.update(cx_b, |workspace, cx| {
5381            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5382        });
5383        assert_eq!(
5384            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5385            Some(leader_id)
5386        );
5387
5388        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5389        assert_eq!(
5390            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5391            Some(leader_id)
5392        );
5393
5394        // When client B activates a different item in the original pane, it automatically stops following client A.
5395        workspace_b
5396            .update(cx_b, |workspace, cx| {
5397                workspace.open_path((worktree_id, "2.txt"), true, cx)
5398            })
5399            .await
5400            .unwrap();
5401        assert_eq!(
5402            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5403            None
5404        );
5405    }
5406
5407    #[gpui::test(iterations = 100)]
5408    async fn test_random_collaboration(
5409        cx: &mut TestAppContext,
5410        deterministic: Arc<Deterministic>,
5411        rng: StdRng,
5412    ) {
5413        cx.foreground().forbid_parking();
5414        let max_peers = env::var("MAX_PEERS")
5415            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5416            .unwrap_or(5);
5417        assert!(max_peers <= 5);
5418
5419        let max_operations = env::var("OPERATIONS")
5420            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5421            .unwrap_or(10);
5422
5423        let rng = Arc::new(Mutex::new(rng));
5424
5425        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5426        let host_language_registry = Arc::new(LanguageRegistry::test());
5427
5428        let fs = FakeFs::new(cx.background());
5429        fs.insert_tree(
5430            "/_collab",
5431            json!({
5432                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5433            }),
5434        )
5435        .await;
5436
5437        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5438        let mut clients = Vec::new();
5439        let mut user_ids = Vec::new();
5440        let mut op_start_signals = Vec::new();
5441        let files = Arc::new(Mutex::new(Vec::new()));
5442
5443        let mut next_entity_id = 100000;
5444        let mut host_cx = TestAppContext::new(
5445            cx.foreground_platform(),
5446            cx.platform(),
5447            deterministic.build_foreground(next_entity_id),
5448            deterministic.build_background(),
5449            cx.font_cache(),
5450            cx.leak_detector(),
5451            next_entity_id,
5452        );
5453        let host = server.create_client(&mut host_cx, "host").await;
5454        let host_project = host_cx.update(|cx| {
5455            Project::local(
5456                host.client.clone(),
5457                host.user_store.clone(),
5458                host_language_registry.clone(),
5459                fs.clone(),
5460                cx,
5461            )
5462        });
5463        let host_project_id = host_project
5464            .update(&mut host_cx, |p, _| p.next_remote_id())
5465            .await;
5466
5467        let (collab_worktree, _) = host_project
5468            .update(&mut host_cx, |project, cx| {
5469                project.find_or_create_local_worktree("/_collab", true, cx)
5470            })
5471            .await
5472            .unwrap();
5473        collab_worktree
5474            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5475            .await;
5476        host_project
5477            .update(&mut host_cx, |project, cx| project.share(cx))
5478            .await
5479            .unwrap();
5480
5481        // Set up fake language servers.
5482        let mut language = Language::new(
5483            LanguageConfig {
5484                name: "Rust".into(),
5485                path_suffixes: vec!["rs".to_string()],
5486                ..Default::default()
5487            },
5488            None,
5489        );
5490        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5491            name: "the-fake-language-server",
5492            capabilities: lsp::LanguageServer::full_capabilities(),
5493            initializer: Some(Box::new({
5494                let rng = rng.clone();
5495                let files = files.clone();
5496                let project = host_project.downgrade();
5497                move |fake_server: &mut FakeLanguageServer| {
5498                    fake_server.handle_request::<lsp::request::Completion, _, _>(
5499                        |_, _| async move {
5500                            Ok(Some(lsp::CompletionResponse::Array(vec![
5501                                lsp::CompletionItem {
5502                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5503                                        range: lsp::Range::new(
5504                                            lsp::Position::new(0, 0),
5505                                            lsp::Position::new(0, 0),
5506                                        ),
5507                                        new_text: "the-new-text".to_string(),
5508                                    })),
5509                                    ..Default::default()
5510                                },
5511                            ])))
5512                        },
5513                    );
5514
5515                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5516                        |_, _| async move {
5517                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5518                                lsp::CodeAction {
5519                                    title: "the-code-action".to_string(),
5520                                    ..Default::default()
5521                                },
5522                            )]))
5523                        },
5524                    );
5525
5526                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5527                        |params, _| async move {
5528                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5529                                params.position,
5530                                params.position,
5531                            ))))
5532                        },
5533                    );
5534
5535                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5536                        let files = files.clone();
5537                        let rng = rng.clone();
5538                        move |_, _| {
5539                            let files = files.clone();
5540                            let rng = rng.clone();
5541                            async move {
5542                                let files = files.lock();
5543                                let mut rng = rng.lock();
5544                                let count = rng.gen_range::<usize, _>(1..3);
5545                                let files = (0..count)
5546                                    .map(|_| files.choose(&mut *rng).unwrap())
5547                                    .collect::<Vec<_>>();
5548                                log::info!("LSP: Returning definitions in files {:?}", &files);
5549                                Ok(Some(lsp::GotoDefinitionResponse::Array(
5550                                    files
5551                                        .into_iter()
5552                                        .map(|file| lsp::Location {
5553                                            uri: lsp::Url::from_file_path(file).unwrap(),
5554                                            range: Default::default(),
5555                                        })
5556                                        .collect(),
5557                                )))
5558                            }
5559                        }
5560                    });
5561
5562                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5563                        let rng = rng.clone();
5564                        let project = project.clone();
5565                        move |params, mut cx| {
5566                            let highlights = if let Some(project) = project.upgrade(&cx) {
5567                                project.update(&mut cx, |project, cx| {
5568                                    let path = params
5569                                        .text_document_position_params
5570                                        .text_document
5571                                        .uri
5572                                        .to_file_path()
5573                                        .unwrap();
5574                                    let (worktree, relative_path) =
5575                                        project.find_local_worktree(&path, cx)?;
5576                                    let project_path =
5577                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
5578                                    let buffer =
5579                                        project.get_open_buffer(&project_path, cx)?.read(cx);
5580
5581                                    let mut highlights = Vec::new();
5582                                    let highlight_count = rng.lock().gen_range(1..=5);
5583                                    let mut prev_end = 0;
5584                                    for _ in 0..highlight_count {
5585                                        let range =
5586                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
5587
5588                                        highlights.push(lsp::DocumentHighlight {
5589                                            range: range_to_lsp(range.to_point_utf16(buffer)),
5590                                            kind: Some(lsp::DocumentHighlightKind::READ),
5591                                        });
5592                                        prev_end = range.end;
5593                                    }
5594                                    Some(highlights)
5595                                })
5596                            } else {
5597                                None
5598                            };
5599                            async move { Ok(highlights) }
5600                        }
5601                    });
5602                }
5603            })),
5604            ..Default::default()
5605        });
5606        host_language_registry.add(Arc::new(language));
5607
5608        let op_start_signal = futures::channel::mpsc::unbounded();
5609        user_ids.push(host.current_user_id(&host_cx));
5610        op_start_signals.push(op_start_signal.0);
5611        clients.push(host_cx.foreground().spawn(host.simulate_host(
5612            host_project,
5613            files,
5614            op_start_signal.1,
5615            rng.clone(),
5616            host_cx,
5617        )));
5618
5619        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5620            rng.lock().gen_range(0..max_operations)
5621        } else {
5622            max_operations
5623        };
5624        let mut available_guests = vec![
5625            "guest-1".to_string(),
5626            "guest-2".to_string(),
5627            "guest-3".to_string(),
5628            "guest-4".to_string(),
5629        ];
5630        let mut operations = 0;
5631        while operations < max_operations {
5632            if operations == disconnect_host_at {
5633                server.disconnect_client(user_ids[0]);
5634                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5635                drop(op_start_signals);
5636                let mut clients = futures::future::join_all(clients).await;
5637                cx.foreground().run_until_parked();
5638
5639                let (host, mut host_cx, host_err) = clients.remove(0);
5640                if let Some(host_err) = host_err {
5641                    log::error!("host error - {}", host_err);
5642                }
5643                host.project
5644                    .as_ref()
5645                    .unwrap()
5646                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5647                for (guest, mut guest_cx, guest_err) in clients {
5648                    if let Some(guest_err) = guest_err {
5649                        log::error!("{} error - {}", guest.username, guest_err);
5650                    }
5651                    let contacts = server
5652                        .store
5653                        .read()
5654                        .await
5655                        .contacts_for_user(guest.current_user_id(&guest_cx));
5656                    assert!(!contacts
5657                        .iter()
5658                        .flat_map(|contact| &contact.projects)
5659                        .any(|project| project.id == host_project_id));
5660                    guest
5661                        .project
5662                        .as_ref()
5663                        .unwrap()
5664                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5665                    guest_cx.update(|_| drop(guest));
5666                }
5667                host_cx.update(|_| drop(host));
5668
5669                return;
5670            }
5671
5672            let distribution = rng.lock().gen_range(0..100);
5673            match distribution {
5674                0..=19 if !available_guests.is_empty() => {
5675                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
5676                    let guest_username = available_guests.remove(guest_ix);
5677                    log::info!("Adding new connection for {}", guest_username);
5678                    next_entity_id += 100000;
5679                    let mut guest_cx = TestAppContext::new(
5680                        cx.foreground_platform(),
5681                        cx.platform(),
5682                        deterministic.build_foreground(next_entity_id),
5683                        deterministic.build_background(),
5684                        cx.font_cache(),
5685                        cx.leak_detector(),
5686                        next_entity_id,
5687                    );
5688                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
5689                    let guest_project = Project::remote(
5690                        host_project_id,
5691                        guest.client.clone(),
5692                        guest.user_store.clone(),
5693                        guest_lang_registry.clone(),
5694                        FakeFs::new(cx.background()),
5695                        &mut guest_cx.to_async(),
5696                    )
5697                    .await
5698                    .unwrap();
5699                    let op_start_signal = futures::channel::mpsc::unbounded();
5700                    user_ids.push(guest.current_user_id(&guest_cx));
5701                    op_start_signals.push(op_start_signal.0);
5702                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5703                        guest_username.clone(),
5704                        guest_project,
5705                        op_start_signal.1,
5706                        rng.clone(),
5707                        guest_cx,
5708                    )));
5709
5710                    log::info!("Added connection for {}", guest_username);
5711                    operations += 1;
5712                }
5713                20..=29 if clients.len() > 1 => {
5714                    log::info!("Removing guest");
5715                    let guest_ix = rng.lock().gen_range(1..clients.len());
5716                    let removed_guest_id = user_ids.remove(guest_ix);
5717                    let guest = clients.remove(guest_ix);
5718                    op_start_signals.remove(guest_ix);
5719                    server.disconnect_client(removed_guest_id);
5720                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5721                    let (guest, mut guest_cx, guest_err) = guest.await;
5722                    if let Some(guest_err) = guest_err {
5723                        log::error!("{} error - {}", guest.username, guest_err);
5724                    }
5725                    guest
5726                        .project
5727                        .as_ref()
5728                        .unwrap()
5729                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5730                    for user_id in &user_ids {
5731                        for contact in server.store.read().await.contacts_for_user(*user_id) {
5732                            assert_ne!(
5733                                contact.user_id, removed_guest_id.0 as u64,
5734                                "removed guest is still a contact of another peer"
5735                            );
5736                            for project in contact.projects {
5737                                for project_guest_id in project.guests {
5738                                    assert_ne!(
5739                                        project_guest_id, removed_guest_id.0 as u64,
5740                                        "removed guest appears as still participating on a project"
5741                                    );
5742                                }
5743                            }
5744                        }
5745                    }
5746
5747                    log::info!("{} removed", guest.username);
5748                    available_guests.push(guest.username.clone());
5749                    guest_cx.update(|_| drop(guest));
5750
5751                    operations += 1;
5752                }
5753                _ => {
5754                    while operations < max_operations && rng.lock().gen_bool(0.7) {
5755                        op_start_signals
5756                            .choose(&mut *rng.lock())
5757                            .unwrap()
5758                            .unbounded_send(())
5759                            .unwrap();
5760                        operations += 1;
5761                    }
5762
5763                    if rng.lock().gen_bool(0.8) {
5764                        cx.foreground().run_until_parked();
5765                    }
5766                }
5767            }
5768        }
5769
5770        drop(op_start_signals);
5771        let mut clients = futures::future::join_all(clients).await;
5772        cx.foreground().run_until_parked();
5773
5774        let (host_client, mut host_cx, host_err) = clients.remove(0);
5775        if let Some(host_err) = host_err {
5776            panic!("host error - {}", host_err);
5777        }
5778        let host_project = host_client.project.as_ref().unwrap();
5779        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5780            project
5781                .worktrees(cx)
5782                .map(|worktree| {
5783                    let snapshot = worktree.read(cx).snapshot();
5784                    (snapshot.id(), snapshot)
5785                })
5786                .collect::<BTreeMap<_, _>>()
5787        });
5788
5789        host_client
5790            .project
5791            .as_ref()
5792            .unwrap()
5793            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5794
5795        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5796            if let Some(guest_err) = guest_err {
5797                panic!("{} error - {}", guest_client.username, guest_err);
5798            }
5799            let worktree_snapshots =
5800                guest_client
5801                    .project
5802                    .as_ref()
5803                    .unwrap()
5804                    .read_with(&guest_cx, |project, cx| {
5805                        project
5806                            .worktrees(cx)
5807                            .map(|worktree| {
5808                                let worktree = worktree.read(cx);
5809                                (worktree.id(), worktree.snapshot())
5810                            })
5811                            .collect::<BTreeMap<_, _>>()
5812                    });
5813
5814            assert_eq!(
5815                worktree_snapshots.keys().collect::<Vec<_>>(),
5816                host_worktree_snapshots.keys().collect::<Vec<_>>(),
5817                "{} has different worktrees than the host",
5818                guest_client.username
5819            );
5820            for (id, host_snapshot) in &host_worktree_snapshots {
5821                let guest_snapshot = &worktree_snapshots[id];
5822                assert_eq!(
5823                    guest_snapshot.root_name(),
5824                    host_snapshot.root_name(),
5825                    "{} has different root name than the host for worktree {}",
5826                    guest_client.username,
5827                    id
5828                );
5829                assert_eq!(
5830                    guest_snapshot.entries(false).collect::<Vec<_>>(),
5831                    host_snapshot.entries(false).collect::<Vec<_>>(),
5832                    "{} has different snapshot than the host for worktree {}",
5833                    guest_client.username,
5834                    id
5835                );
5836                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
5837            }
5838
5839            guest_client
5840                .project
5841                .as_ref()
5842                .unwrap()
5843                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5844
5845            for guest_buffer in &guest_client.buffers {
5846                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5847                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5848                    project.buffer_for_id(buffer_id, cx).expect(&format!(
5849                        "host does not have buffer for guest:{}, peer:{}, id:{}",
5850                        guest_client.username, guest_client.peer_id, buffer_id
5851                    ))
5852                });
5853                let path = host_buffer
5854                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5855
5856                assert_eq!(
5857                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5858                    0,
5859                    "{}, buffer {}, path {:?} has deferred operations",
5860                    guest_client.username,
5861                    buffer_id,
5862                    path,
5863                );
5864                assert_eq!(
5865                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5866                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5867                    "{}, buffer {}, path {:?}, differs from the host's buffer",
5868                    guest_client.username,
5869                    buffer_id,
5870                    path
5871                );
5872            }
5873
5874            guest_cx.update(|_| drop(guest_client));
5875        }
5876
5877        host_cx.update(|_| drop(host_client));
5878    }
5879
5880    struct TestServer {
5881        peer: Arc<Peer>,
5882        app_state: Arc<AppState>,
5883        server: Arc<Server>,
5884        foreground: Rc<executor::Foreground>,
5885        notifications: mpsc::UnboundedReceiver<()>,
5886        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5887        forbid_connections: Arc<AtomicBool>,
5888        _test_db: TestDb,
5889    }
5890
5891    impl TestServer {
5892        async fn start(
5893            foreground: Rc<executor::Foreground>,
5894            background: Arc<executor::Background>,
5895        ) -> Self {
5896            let test_db = TestDb::fake(background);
5897            let app_state = Self::build_app_state(&test_db).await;
5898            let peer = Peer::new();
5899            let notifications = mpsc::unbounded();
5900            let server = Server::new(app_state.clone(), Some(notifications.0));
5901            Self {
5902                peer,
5903                app_state,
5904                server,
5905                foreground,
5906                notifications: notifications.1,
5907                connection_killers: Default::default(),
5908                forbid_connections: Default::default(),
5909                _test_db: test_db,
5910            }
5911        }
5912
5913        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
5914            cx.update(|cx| {
5915                let settings = Settings::test(cx);
5916                cx.set_global(settings);
5917            });
5918
5919            let http = FakeHttpClient::with_404_response();
5920            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
5921            let client_name = name.to_string();
5922            let mut client = Client::new(http.clone());
5923            let server = self.server.clone();
5924            let connection_killers = self.connection_killers.clone();
5925            let forbid_connections = self.forbid_connections.clone();
5926            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
5927
5928            Arc::get_mut(&mut client)
5929                .unwrap()
5930                .override_authenticate(move |cx| {
5931                    cx.spawn(|_| async move {
5932                        let access_token = "the-token".to_string();
5933                        Ok(Credentials {
5934                            user_id: user_id.0 as u64,
5935                            access_token,
5936                        })
5937                    })
5938                })
5939                .override_establish_connection(move |credentials, cx| {
5940                    assert_eq!(credentials.user_id, user_id.0 as u64);
5941                    assert_eq!(credentials.access_token, "the-token");
5942
5943                    let server = server.clone();
5944                    let connection_killers = connection_killers.clone();
5945                    let forbid_connections = forbid_connections.clone();
5946                    let client_name = client_name.clone();
5947                    let connection_id_tx = connection_id_tx.clone();
5948                    cx.spawn(move |cx| async move {
5949                        if forbid_connections.load(SeqCst) {
5950                            Err(EstablishConnectionError::other(anyhow!(
5951                                "server is forbidding connections"
5952                            )))
5953                        } else {
5954                            let (client_conn, server_conn, killed) =
5955                                Connection::in_memory(cx.background());
5956                            connection_killers.lock().insert(user_id, killed);
5957                            cx.background()
5958                                .spawn(server.handle_connection(
5959                                    server_conn,
5960                                    client_name,
5961                                    user_id,
5962                                    Some(connection_id_tx),
5963                                    cx.background(),
5964                                ))
5965                                .detach();
5966                            Ok(client_conn)
5967                        }
5968                    })
5969                });
5970
5971            client
5972                .authenticate_and_connect(false, &cx.to_async())
5973                .await
5974                .unwrap();
5975
5976            Channel::init(&client);
5977            Project::init(&client);
5978            cx.update(|cx| {
5979                workspace::init(&client, cx);
5980            });
5981
5982            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5983            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5984
5985            let client = TestClient {
5986                client,
5987                peer_id,
5988                username: name.to_string(),
5989                user_store,
5990                language_registry: Arc::new(LanguageRegistry::test()),
5991                project: Default::default(),
5992                buffers: Default::default(),
5993            };
5994            client.wait_for_current_user(cx).await;
5995            client
5996        }
5997
5998        fn disconnect_client(&self, user_id: UserId) {
5999            self.connection_killers
6000                .lock()
6001                .remove(&user_id)
6002                .unwrap()
6003                .store(true, SeqCst);
6004        }
6005
6006        fn forbid_connections(&self) {
6007            self.forbid_connections.store(true, SeqCst);
6008        }
6009
6010        fn allow_connections(&self) {
6011            self.forbid_connections.store(false, SeqCst);
6012        }
6013
6014        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6015            Arc::new(AppState {
6016                db: test_db.db().clone(),
6017                api_token: Default::default(),
6018            })
6019        }
6020
6021        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6022            self.server.store.read().await
6023        }
6024
6025        async fn condition<F>(&mut self, mut predicate: F)
6026        where
6027            F: FnMut(&Store) -> bool,
6028        {
6029            assert!(
6030                self.foreground.parking_forbidden(),
6031                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6032            );
6033            while !(predicate)(&*self.server.store.read().await) {
6034                self.foreground.start_waiting();
6035                self.notifications.next().await;
6036                self.foreground.finish_waiting();
6037            }
6038        }
6039    }
6040
6041    impl Deref for TestServer {
6042        type Target = Server;
6043
6044        fn deref(&self) -> &Self::Target {
6045            &self.server
6046        }
6047    }
6048
6049    impl Drop for TestServer {
6050        fn drop(&mut self) {
6051            self.peer.reset();
6052        }
6053    }
6054
6055    struct TestClient {
6056        client: Arc<Client>,
6057        username: String,
6058        pub peer_id: PeerId,
6059        pub user_store: ModelHandle<UserStore>,
6060        language_registry: Arc<LanguageRegistry>,
6061        project: Option<ModelHandle<Project>>,
6062        buffers: HashSet<ModelHandle<language::Buffer>>,
6063    }
6064
6065    impl Deref for TestClient {
6066        type Target = Arc<Client>;
6067
6068        fn deref(&self) -> &Self::Target {
6069            &self.client
6070        }
6071    }
6072
6073    impl TestClient {
6074        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6075            UserId::from_proto(
6076                self.user_store
6077                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6078            )
6079        }
6080
6081        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6082            let mut authed_user = self
6083                .user_store
6084                .read_with(cx, |user_store, _| user_store.watch_current_user());
6085            while authed_user.next().await.unwrap().is_none() {}
6086        }
6087
6088        async fn build_local_project(
6089            &mut self,
6090            fs: Arc<FakeFs>,
6091            root_path: impl AsRef<Path>,
6092            cx: &mut TestAppContext,
6093        ) -> (ModelHandle<Project>, WorktreeId) {
6094            let project = cx.update(|cx| {
6095                Project::local(
6096                    self.client.clone(),
6097                    self.user_store.clone(),
6098                    self.language_registry.clone(),
6099                    fs,
6100                    cx,
6101                )
6102            });
6103            self.project = Some(project.clone());
6104            let (worktree, _) = project
6105                .update(cx, |p, cx| {
6106                    p.find_or_create_local_worktree(root_path, true, cx)
6107                })
6108                .await
6109                .unwrap();
6110            worktree
6111                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6112                .await;
6113            project
6114                .update(cx, |project, _| project.next_remote_id())
6115                .await;
6116            (project, worktree.read_with(cx, |tree, _| tree.id()))
6117        }
6118
6119        async fn build_remote_project(
6120            &mut self,
6121            project_id: u64,
6122            cx: &mut TestAppContext,
6123        ) -> ModelHandle<Project> {
6124            let project = Project::remote(
6125                project_id,
6126                self.client.clone(),
6127                self.user_store.clone(),
6128                self.language_registry.clone(),
6129                FakeFs::new(cx.background()),
6130                &mut cx.to_async(),
6131            )
6132            .await
6133            .unwrap();
6134            self.project = Some(project.clone());
6135            project
6136        }
6137
6138        fn build_workspace(
6139            &self,
6140            project: &ModelHandle<Project>,
6141            cx: &mut TestAppContext,
6142        ) -> ViewHandle<Workspace> {
6143            let (window_id, _) = cx.add_window(|_| EmptyView);
6144            cx.add_view(window_id, |cx| {
6145                let fs = project.read(cx).fs().clone();
6146                Workspace::new(
6147                    &WorkspaceParams {
6148                        fs,
6149                        project: project.clone(),
6150                        user_store: self.user_store.clone(),
6151                        languages: self.language_registry.clone(),
6152                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6153                        channel_list: cx.add_model(|cx| {
6154                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6155                        }),
6156                        client: self.client.clone(),
6157                    },
6158                    cx,
6159                )
6160            })
6161        }
6162
6163        async fn simulate_host(
6164            mut self,
6165            project: ModelHandle<Project>,
6166            files: Arc<Mutex<Vec<PathBuf>>>,
6167            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6168            rng: Arc<Mutex<StdRng>>,
6169            mut cx: TestAppContext,
6170        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6171            async fn simulate_host_internal(
6172                client: &mut TestClient,
6173                project: ModelHandle<Project>,
6174                files: Arc<Mutex<Vec<PathBuf>>>,
6175                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6176                rng: Arc<Mutex<StdRng>>,
6177                cx: &mut TestAppContext,
6178            ) -> anyhow::Result<()> {
6179                let fs = project.read_with(cx, |project, _| project.fs().clone());
6180
6181                while op_start_signal.next().await.is_some() {
6182                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6183                    match distribution {
6184                        0..=20 if !files.lock().is_empty() => {
6185                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6186                            let mut path = path.as_path();
6187                            while let Some(parent_path) = path.parent() {
6188                                path = parent_path;
6189                                if rng.lock().gen() {
6190                                    break;
6191                                }
6192                            }
6193
6194                            log::info!("Host: find/create local worktree {:?}", path);
6195                            let find_or_create_worktree = project.update(cx, |project, cx| {
6196                                project.find_or_create_local_worktree(path, true, cx)
6197                            });
6198                            if rng.lock().gen() {
6199                                cx.background().spawn(find_or_create_worktree).detach();
6200                            } else {
6201                                find_or_create_worktree.await?;
6202                            }
6203                        }
6204                        10..=80 if !files.lock().is_empty() => {
6205                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6206                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6207                                let (worktree, path) = project
6208                                    .update(cx, |project, cx| {
6209                                        project.find_or_create_local_worktree(
6210                                            file.clone(),
6211                                            true,
6212                                            cx,
6213                                        )
6214                                    })
6215                                    .await?;
6216                                let project_path =
6217                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6218                                log::info!(
6219                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6220                                    file,
6221                                    project_path.0,
6222                                    project_path.1
6223                                );
6224                                let buffer = project
6225                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6226                                    .await
6227                                    .unwrap();
6228                                client.buffers.insert(buffer.clone());
6229                                buffer
6230                            } else {
6231                                client
6232                                    .buffers
6233                                    .iter()
6234                                    .choose(&mut *rng.lock())
6235                                    .unwrap()
6236                                    .clone()
6237                            };
6238
6239                            if rng.lock().gen_bool(0.1) {
6240                                cx.update(|cx| {
6241                                    log::info!(
6242                                        "Host: dropping buffer {:?}",
6243                                        buffer.read(cx).file().unwrap().full_path(cx)
6244                                    );
6245                                    client.buffers.remove(&buffer);
6246                                    drop(buffer);
6247                                });
6248                            } else {
6249                                buffer.update(cx, |buffer, cx| {
6250                                    log::info!(
6251                                        "Host: updating buffer {:?} ({})",
6252                                        buffer.file().unwrap().full_path(cx),
6253                                        buffer.remote_id()
6254                                    );
6255
6256                                    if rng.lock().gen_bool(0.7) {
6257                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6258                                    } else {
6259                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6260                                    }
6261                                });
6262                            }
6263                        }
6264                        _ => loop {
6265                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6266                            let mut path = PathBuf::new();
6267                            path.push("/");
6268                            for _ in 0..path_component_count {
6269                                let letter = rng.lock().gen_range(b'a'..=b'z');
6270                                path.push(std::str::from_utf8(&[letter]).unwrap());
6271                            }
6272                            path.set_extension("rs");
6273                            let parent_path = path.parent().unwrap();
6274
6275                            log::info!("Host: creating file {:?}", path,);
6276
6277                            if fs.create_dir(&parent_path).await.is_ok()
6278                                && fs.create_file(&path, Default::default()).await.is_ok()
6279                            {
6280                                files.lock().push(path);
6281                                break;
6282                            } else {
6283                                log::info!("Host: cannot create file");
6284                            }
6285                        },
6286                    }
6287
6288                    cx.background().simulate_random_delay().await;
6289                }
6290
6291                Ok(())
6292            }
6293
6294            let result = simulate_host_internal(
6295                &mut self,
6296                project.clone(),
6297                files,
6298                op_start_signal,
6299                rng,
6300                &mut cx,
6301            )
6302            .await;
6303            log::info!("Host done");
6304            self.project = Some(project);
6305            (self, cx, result.err())
6306        }
6307
6308        pub async fn simulate_guest(
6309            mut self,
6310            guest_username: String,
6311            project: ModelHandle<Project>,
6312            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6313            rng: Arc<Mutex<StdRng>>,
6314            mut cx: TestAppContext,
6315        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6316            async fn simulate_guest_internal(
6317                client: &mut TestClient,
6318                guest_username: &str,
6319                project: ModelHandle<Project>,
6320                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6321                rng: Arc<Mutex<StdRng>>,
6322                cx: &mut TestAppContext,
6323            ) -> anyhow::Result<()> {
6324                while op_start_signal.next().await.is_some() {
6325                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6326                        let worktree = if let Some(worktree) =
6327                            project.read_with(cx, |project, cx| {
6328                                project
6329                                    .worktrees(&cx)
6330                                    .filter(|worktree| {
6331                                        let worktree = worktree.read(cx);
6332                                        worktree.is_visible()
6333                                            && worktree.entries(false).any(|e| e.is_file())
6334                                    })
6335                                    .choose(&mut *rng.lock())
6336                            }) {
6337                            worktree
6338                        } else {
6339                            cx.background().simulate_random_delay().await;
6340                            continue;
6341                        };
6342
6343                        let (worktree_root_name, project_path) =
6344                            worktree.read_with(cx, |worktree, _| {
6345                                let entry = worktree
6346                                    .entries(false)
6347                                    .filter(|e| e.is_file())
6348                                    .choose(&mut *rng.lock())
6349                                    .unwrap();
6350                                (
6351                                    worktree.root_name().to_string(),
6352                                    (worktree.id(), entry.path.clone()),
6353                                )
6354                            });
6355                        log::info!(
6356                            "{}: opening path {:?} in worktree {} ({})",
6357                            guest_username,
6358                            project_path.1,
6359                            project_path.0,
6360                            worktree_root_name,
6361                        );
6362                        let buffer = project
6363                            .update(cx, |project, cx| {
6364                                project.open_buffer(project_path.clone(), cx)
6365                            })
6366                            .await?;
6367                        log::info!(
6368                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6369                            guest_username,
6370                            project_path.1,
6371                            project_path.0,
6372                            worktree_root_name,
6373                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6374                        );
6375                        client.buffers.insert(buffer.clone());
6376                        buffer
6377                    } else {
6378                        client
6379                            .buffers
6380                            .iter()
6381                            .choose(&mut *rng.lock())
6382                            .unwrap()
6383                            .clone()
6384                    };
6385
6386                    let choice = rng.lock().gen_range(0..100);
6387                    match choice {
6388                        0..=9 => {
6389                            cx.update(|cx| {
6390                                log::info!(
6391                                    "{}: dropping buffer {:?}",
6392                                    guest_username,
6393                                    buffer.read(cx).file().unwrap().full_path(cx)
6394                                );
6395                                client.buffers.remove(&buffer);
6396                                drop(buffer);
6397                            });
6398                        }
6399                        10..=19 => {
6400                            let completions = project.update(cx, |project, cx| {
6401                                log::info!(
6402                                    "{}: requesting completions for buffer {} ({:?})",
6403                                    guest_username,
6404                                    buffer.read(cx).remote_id(),
6405                                    buffer.read(cx).file().unwrap().full_path(cx)
6406                                );
6407                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6408                                project.completions(&buffer, offset, cx)
6409                            });
6410                            let completions = cx.background().spawn(async move {
6411                                completions
6412                                    .await
6413                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6414                            });
6415                            if rng.lock().gen_bool(0.3) {
6416                                log::info!("{}: detaching completions request", guest_username);
6417                                cx.update(|cx| completions.detach_and_log_err(cx));
6418                            } else {
6419                                completions.await?;
6420                            }
6421                        }
6422                        20..=29 => {
6423                            let code_actions = project.update(cx, |project, cx| {
6424                                log::info!(
6425                                    "{}: requesting code actions for buffer {} ({:?})",
6426                                    guest_username,
6427                                    buffer.read(cx).remote_id(),
6428                                    buffer.read(cx).file().unwrap().full_path(cx)
6429                                );
6430                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6431                                project.code_actions(&buffer, range, cx)
6432                            });
6433                            let code_actions = cx.background().spawn(async move {
6434                                code_actions.await.map_err(|err| {
6435                                    anyhow!("code actions request failed: {:?}", err)
6436                                })
6437                            });
6438                            if rng.lock().gen_bool(0.3) {
6439                                log::info!("{}: detaching code actions request", guest_username);
6440                                cx.update(|cx| code_actions.detach_and_log_err(cx));
6441                            } else {
6442                                code_actions.await?;
6443                            }
6444                        }
6445                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6446                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6447                                log::info!(
6448                                    "{}: saving buffer {} ({:?})",
6449                                    guest_username,
6450                                    buffer.remote_id(),
6451                                    buffer.file().unwrap().full_path(cx)
6452                                );
6453                                (buffer.version(), buffer.save(cx))
6454                            });
6455                            let save = cx.background().spawn(async move {
6456                                let (saved_version, _) = save
6457                                    .await
6458                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6459                                assert!(saved_version.observed_all(&requested_version));
6460                                Ok::<_, anyhow::Error>(())
6461                            });
6462                            if rng.lock().gen_bool(0.3) {
6463                                log::info!("{}: detaching save request", guest_username);
6464                                cx.update(|cx| save.detach_and_log_err(cx));
6465                            } else {
6466                                save.await?;
6467                            }
6468                        }
6469                        40..=44 => {
6470                            let prepare_rename = project.update(cx, |project, cx| {
6471                                log::info!(
6472                                    "{}: preparing rename for buffer {} ({:?})",
6473                                    guest_username,
6474                                    buffer.read(cx).remote_id(),
6475                                    buffer.read(cx).file().unwrap().full_path(cx)
6476                                );
6477                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6478                                project.prepare_rename(buffer, offset, cx)
6479                            });
6480                            let prepare_rename = cx.background().spawn(async move {
6481                                prepare_rename.await.map_err(|err| {
6482                                    anyhow!("prepare rename request failed: {:?}", err)
6483                                })
6484                            });
6485                            if rng.lock().gen_bool(0.3) {
6486                                log::info!("{}: detaching prepare rename request", guest_username);
6487                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6488                            } else {
6489                                prepare_rename.await?;
6490                            }
6491                        }
6492                        45..=49 => {
6493                            let definitions = project.update(cx, |project, cx| {
6494                                log::info!(
6495                                    "{}: requesting definitions for buffer {} ({:?})",
6496                                    guest_username,
6497                                    buffer.read(cx).remote_id(),
6498                                    buffer.read(cx).file().unwrap().full_path(cx)
6499                                );
6500                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6501                                project.definition(&buffer, offset, cx)
6502                            });
6503                            let definitions = cx.background().spawn(async move {
6504                                definitions
6505                                    .await
6506                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6507                            });
6508                            if rng.lock().gen_bool(0.3) {
6509                                log::info!("{}: detaching definitions request", guest_username);
6510                                cx.update(|cx| definitions.detach_and_log_err(cx));
6511                            } else {
6512                                client
6513                                    .buffers
6514                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6515                            }
6516                        }
6517                        50..=54 => {
6518                            let highlights = project.update(cx, |project, cx| {
6519                                log::info!(
6520                                    "{}: requesting highlights for buffer {} ({:?})",
6521                                    guest_username,
6522                                    buffer.read(cx).remote_id(),
6523                                    buffer.read(cx).file().unwrap().full_path(cx)
6524                                );
6525                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6526                                project.document_highlights(&buffer, offset, cx)
6527                            });
6528                            let highlights = cx.background().spawn(async move {
6529                                highlights
6530                                    .await
6531                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6532                            });
6533                            if rng.lock().gen_bool(0.3) {
6534                                log::info!("{}: detaching highlights request", guest_username);
6535                                cx.update(|cx| highlights.detach_and_log_err(cx));
6536                            } else {
6537                                highlights.await?;
6538                            }
6539                        }
6540                        55..=59 => {
6541                            let search = project.update(cx, |project, cx| {
6542                                let query = rng.lock().gen_range('a'..='z');
6543                                log::info!("{}: project-wide search {:?}", guest_username, query);
6544                                project.search(SearchQuery::text(query, false, false), cx)
6545                            });
6546                            let search = cx.background().spawn(async move {
6547                                search
6548                                    .await
6549                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
6550                            });
6551                            if rng.lock().gen_bool(0.3) {
6552                                log::info!("{}: detaching search request", guest_username);
6553                                cx.update(|cx| search.detach_and_log_err(cx));
6554                            } else {
6555                                client.buffers.extend(search.await?.into_keys());
6556                            }
6557                        }
6558                        60..=69 => {
6559                            let worktree = project
6560                                .read_with(cx, |project, cx| {
6561                                    project
6562                                        .worktrees(&cx)
6563                                        .filter(|worktree| {
6564                                            let worktree = worktree.read(cx);
6565                                            worktree.is_visible()
6566                                                && worktree.entries(false).any(|e| e.is_file())
6567                                                && worktree
6568                                                    .root_entry()
6569                                                    .map_or(false, |e| e.is_dir())
6570                                        })
6571                                        .choose(&mut *rng.lock())
6572                                })
6573                                .unwrap();
6574                            let (worktree_id, worktree_root_name) = worktree
6575                                .read_with(cx, |worktree, _| {
6576                                    (worktree.id(), worktree.root_name().to_string())
6577                                });
6578
6579                            let mut new_name = String::new();
6580                            for _ in 0..10 {
6581                                let letter = rng.lock().gen_range('a'..='z');
6582                                new_name.push(letter);
6583                            }
6584                            let mut new_path = PathBuf::new();
6585                            new_path.push(new_name);
6586                            new_path.set_extension("rs");
6587                            log::info!(
6588                                "{}: creating {:?} in worktree {} ({})",
6589                                guest_username,
6590                                new_path,
6591                                worktree_id,
6592                                worktree_root_name,
6593                            );
6594                            project
6595                                .update(cx, |project, cx| {
6596                                    project.create_entry((worktree_id, new_path), false, cx)
6597                                })
6598                                .unwrap()
6599                                .await?;
6600                        }
6601                        _ => {
6602                            buffer.update(cx, |buffer, cx| {
6603                                log::info!(
6604                                    "{}: updating buffer {} ({:?})",
6605                                    guest_username,
6606                                    buffer.remote_id(),
6607                                    buffer.file().unwrap().full_path(cx)
6608                                );
6609                                if rng.lock().gen_bool(0.7) {
6610                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6611                                } else {
6612                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6613                                }
6614                            });
6615                        }
6616                    }
6617                    cx.background().simulate_random_delay().await;
6618                }
6619                Ok(())
6620            }
6621
6622            let result = simulate_guest_internal(
6623                &mut self,
6624                &guest_username,
6625                project.clone(),
6626                op_start_signal,
6627                rng,
6628                &mut cx,
6629            )
6630            .await;
6631            log::info!("{}: done", guest_username);
6632
6633            self.project = Some(project);
6634            (self, cx, result.err())
6635        }
6636    }
6637
6638    impl Drop for TestClient {
6639        fn drop(&mut self) {
6640            self.client.tear_down();
6641        }
6642    }
6643
6644    impl Executor for Arc<gpui::executor::Background> {
6645        type Sleep = gpui::executor::Timer;
6646
6647        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6648            self.spawn(future).detach();
6649        }
6650
6651        fn sleep(&self, duration: Duration) -> Self::Sleep {
6652            self.as_ref().timer(duration)
6653        }
6654    }
6655
6656    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6657        channel
6658            .messages()
6659            .cursor::<()>()
6660            .map(|m| {
6661                (
6662                    m.sender.github_login.clone(),
6663                    m.body.clone(),
6664                    m.is_pending(),
6665                )
6666            })
6667            .collect()
6668    }
6669
6670    struct EmptyView;
6671
6672    impl gpui::Entity for EmptyView {
6673        type Event = ();
6674    }
6675
6676    impl gpui::View for EmptyView {
6677        fn ui_name() -> &'static str {
6678            "empty view"
6679        }
6680
6681        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6682            gpui::Element::boxed(gpui::elements::Empty)
6683        }
6684    }
6685}