rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::{IntoResponse, Response},
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::{HashMap, HashSet};
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::Arc,
  40    time::Duration,
  41};
  42use store::{Store, Worktree};
  43use time::OffsetDateTime;
  44use tokio::{
  45    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  46    time::Sleep,
  47};
  48use tower::ServiceBuilder;
  49use tracing::{info_span, instrument, Instrument};
  50
  51type MessageHandler =
  52    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  53
  54pub struct Server {
  55    peer: Arc<Peer>,
  56    store: RwLock<Store>,
  57    app_state: Arc<AppState>,
  58    handlers: HashMap<TypeId, MessageHandler>,
  59    notifications: Option<mpsc::UnboundedSender<()>>,
  60}
  61
  62pub trait Executor: Send + Clone {
  63    type Sleep: Send + Future;
  64    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  65    fn sleep(&self, duration: Duration) -> Self::Sleep;
  66}
  67
  68#[derive(Clone)]
  69pub struct RealExecutor;
  70
  71const MESSAGE_COUNT_PER_PAGE: usize = 100;
  72const MAX_MESSAGE_LEN: usize = 1024;
  73
  74struct StoreReadGuard<'a> {
  75    guard: RwLockReadGuard<'a, Store>,
  76    _not_send: PhantomData<Rc<()>>,
  77}
  78
  79struct StoreWriteGuard<'a> {
  80    guard: RwLockWriteGuard<'a, Store>,
  81    _not_send: PhantomData<Rc<()>>,
  82}
  83
  84impl Server {
  85    pub fn new(
  86        app_state: Arc<AppState>,
  87        notifications: Option<mpsc::UnboundedSender<()>>,
  88    ) -> Arc<Self> {
  89        let mut server = Self {
  90            peer: Peer::new(),
  91            app_state,
  92            store: Default::default(),
  93            handlers: Default::default(),
  94            notifications,
  95        };
  96
  97        server
  98            .add_request_handler(Server::ping)
  99            .add_request_handler(Server::register_project)
 100            .add_message_handler(Server::unregister_project)
 101            .add_request_handler(Server::share_project)
 102            .add_message_handler(Server::unshare_project)
 103            .add_sync_request_handler(Server::join_project)
 104            .add_message_handler(Server::leave_project)
 105            .add_request_handler(Server::register_worktree)
 106            .add_message_handler(Server::unregister_worktree)
 107            .add_request_handler(Server::update_worktree)
 108            .add_message_handler(Server::start_language_server)
 109            .add_message_handler(Server::update_language_server)
 110            .add_message_handler(Server::update_diagnostic_summary)
 111            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 112            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 113            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 114            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 115            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 116            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 117            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 118            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 119            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 120            .add_request_handler(
 121                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 122            )
 123            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 124            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 125            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 126            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 127            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 128            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 129            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 130            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 131            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 132            .add_request_handler(Server::update_buffer)
 133            .add_message_handler(Server::update_buffer_file)
 134            .add_message_handler(Server::buffer_reloaded)
 135            .add_message_handler(Server::buffer_saved)
 136            .add_request_handler(Server::save_buffer)
 137            .add_request_handler(Server::get_channels)
 138            .add_request_handler(Server::get_users)
 139            .add_request_handler(Server::fuzzy_search_users)
 140            .add_request_handler(Server::join_channel)
 141            .add_message_handler(Server::leave_channel)
 142            .add_request_handler(Server::send_channel_message)
 143            .add_request_handler(Server::follow)
 144            .add_message_handler(Server::unfollow)
 145            .add_message_handler(Server::update_followers)
 146            .add_request_handler(Server::get_channel_messages);
 147
 148        Arc::new(server)
 149    }
 150
 151    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 152    where
 153        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 154        Fut: 'static + Send + Future<Output = Result<()>>,
 155        M: EnvelopedMessage,
 156    {
 157        let prev_handler = self.handlers.insert(
 158            TypeId::of::<M>(),
 159            Box::new(move |server, envelope| {
 160                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 161                let span = info_span!(
 162                    "handle message",
 163                    payload_type = envelope.payload_type_name(),
 164                    payload = format!("{:?}", envelope.payload).as_str(),
 165                );
 166                let future = (handler)(server, *envelope);
 167                async move {
 168                    if let Err(error) = future.await {
 169                        tracing::error!(%error, "error handling message");
 170                    }
 171                }
 172                .instrument(span)
 173                .boxed()
 174            }),
 175        );
 176        if prev_handler.is_some() {
 177            panic!("registered a handler for the same message twice");
 178        }
 179        self
 180    }
 181
 182    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 183    where
 184        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 185        Fut: 'static + Send + Future<Output = Result<M::Response>>,
 186        M: RequestMessage,
 187    {
 188        self.add_message_handler(move |server, envelope| {
 189            let receipt = envelope.receipt();
 190            let response = (handler)(server.clone(), envelope);
 191            async move {
 192                match response.await {
 193                    Ok(response) => {
 194                        server.peer.respond(receipt, response)?;
 195                        Ok(())
 196                    }
 197                    Err(error) => {
 198                        server.peer.respond_with_error(
 199                            receipt,
 200                            proto::Error {
 201                                message: error.to_string(),
 202                            },
 203                        )?;
 204                        Err(error)
 205                    }
 206                }
 207            }
 208        })
 209    }
 210
 211    /// Handle a request while holding a lock to the store. This is useful when we're registering
 212    /// a connection but we want to respond on the connection before anybody else can send on it.
 213    fn add_sync_request_handler<F, M>(&mut self, handler: F) -> &mut Self
 214    where
 215        F: 'static
 216            + Send
 217            + Sync
 218            + Fn(Arc<Self>, &mut Store, TypedEnvelope<M>) -> Result<M::Response>,
 219        M: RequestMessage,
 220    {
 221        let handler = Arc::new(handler);
 222        self.add_message_handler(move |server, envelope| {
 223            let receipt = envelope.receipt();
 224            let handler = handler.clone();
 225            async move {
 226                let mut store = server.state_mut().await;
 227                let response = (handler)(server.clone(), &mut *store, envelope);
 228                match response {
 229                    Ok(response) => {
 230                        server.peer.respond(receipt, response)?;
 231                        Ok(())
 232                    }
 233                    Err(error) => {
 234                        server.peer.respond_with_error(
 235                            receipt,
 236                            proto::Error {
 237                                message: error.to_string(),
 238                            },
 239                        )?;
 240                        Err(error)
 241                    }
 242                }
 243            }
 244        })
 245    }
 246
 247    pub fn handle_connection<E: Executor>(
 248        self: &Arc<Self>,
 249        connection: Connection,
 250        address: String,
 251        user_id: UserId,
 252        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 253        executor: E,
 254    ) -> impl Future<Output = ()> {
 255        let mut this = self.clone();
 256        let span = info_span!("handle connection", %user_id, %address);
 257        async move {
 258            let (connection_id, handle_io, mut incoming_rx) = this
 259                .peer
 260                .add_connection(connection, {
 261                    let executor = executor.clone();
 262                    move |duration| {
 263                        let timer = executor.sleep(duration);
 264                        async move {
 265                            timer.await;
 266                        }
 267                    }
 268                })
 269                .await;
 270
 271            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 272
 273            if let Some(send_connection_id) = send_connection_id.as_mut() {
 274                let _ = send_connection_id.send(connection_id).await;
 275            }
 276
 277            {
 278                let mut state = this.state_mut().await;
 279                state.add_connection(connection_id, user_id);
 280                this.update_contacts_for_users(&*state, &[user_id]);
 281            }
 282
 283            let handle_io = handle_io.fuse();
 284            futures::pin_mut!(handle_io);
 285            loop {
 286                let next_message = incoming_rx.next().fuse();
 287                futures::pin_mut!(next_message);
 288                futures::select_biased! {
 289                    result = handle_io => {
 290                        if let Err(error) = result {
 291                            tracing::error!(%error, "error handling I/O");
 292                        }
 293                        break;
 294                    }
 295                    message = next_message => {
 296                        if let Some(message) = message {
 297                            let type_name = message.payload_type_name();
 298                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 299                            async {
 300                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 301                                    let notifications = this.notifications.clone();
 302                                    let is_background = message.is_background();
 303                                    let handle_message = (handler)(this.clone(), message);
 304                                    let handle_message = async move {
 305                                        handle_message.await;
 306                                        if let Some(mut notifications) = notifications {
 307                                            let _ = notifications.send(()).await;
 308                                        }
 309                                    };
 310                                    if is_background {
 311                                        executor.spawn_detached(handle_message);
 312                                    } else {
 313                                        handle_message.await;
 314                                    }
 315                                } else {
 316                                    tracing::error!("no message handler");
 317                                }
 318                            }.instrument(span).await;
 319                        } else {
 320                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 321                            break;
 322                        }
 323                    }
 324                }
 325            }
 326
 327            if let Err(error) = this.sign_out(connection_id).await {
 328                tracing::error!(%error, "error signing out");
 329            }
 330        }.instrument(span)
 331    }
 332
 333    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 334        self.peer.disconnect(connection_id);
 335        let mut state = self.state_mut().await;
 336        let removed_connection = state.remove_connection(connection_id)?;
 337
 338        for (project_id, project) in removed_connection.hosted_projects {
 339            if let Some(share) = project.share {
 340                broadcast(
 341                    connection_id,
 342                    share.guests.keys().copied().collect(),
 343                    |conn_id| {
 344                        self.peer
 345                            .send(conn_id, proto::UnshareProject { project_id })
 346                    },
 347                );
 348            }
 349        }
 350
 351        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 352            broadcast(connection_id, peer_ids, |conn_id| {
 353                self.peer.send(
 354                    conn_id,
 355                    proto::RemoveProjectCollaborator {
 356                        project_id,
 357                        peer_id: connection_id.0,
 358                    },
 359                )
 360            });
 361        }
 362
 363        self.update_contacts_for_users(&*state, removed_connection.contact_ids.iter());
 364        Ok(())
 365    }
 366
 367    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> Result<proto::Ack> {
 368        Ok(proto::Ack {})
 369    }
 370
 371    async fn register_project(
 372        self: Arc<Server>,
 373        request: TypedEnvelope<proto::RegisterProject>,
 374    ) -> Result<proto::RegisterProjectResponse> {
 375        let project_id = {
 376            let mut state = self.state_mut().await;
 377            let user_id = state.user_id_for_connection(request.sender_id)?;
 378            state.register_project(request.sender_id, user_id)
 379        };
 380        Ok(proto::RegisterProjectResponse { project_id })
 381    }
 382
 383    async fn unregister_project(
 384        self: Arc<Server>,
 385        request: TypedEnvelope<proto::UnregisterProject>,
 386    ) -> Result<()> {
 387        let mut state = self.state_mut().await;
 388        let project = state.unregister_project(request.payload.project_id, request.sender_id)?;
 389        self.update_contacts_for_users(&*state, &project.authorized_user_ids());
 390        Ok(())
 391    }
 392
 393    async fn share_project(
 394        self: Arc<Server>,
 395        request: TypedEnvelope<proto::ShareProject>,
 396    ) -> Result<proto::Ack> {
 397        let mut state = self.state_mut().await;
 398        let project = state.share_project(request.payload.project_id, request.sender_id)?;
 399        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 400        Ok(proto::Ack {})
 401    }
 402
 403    async fn unshare_project(
 404        self: Arc<Server>,
 405        request: TypedEnvelope<proto::UnshareProject>,
 406    ) -> Result<()> {
 407        let project_id = request.payload.project_id;
 408        let mut state = self.state_mut().await;
 409        let project = state.unshare_project(project_id, request.sender_id)?;
 410        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 411            self.peer
 412                .send(conn_id, proto::UnshareProject { project_id })
 413        });
 414        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 415        Ok(())
 416    }
 417
 418    fn join_project(
 419        self: Arc<Server>,
 420        state: &mut Store,
 421        request: TypedEnvelope<proto::JoinProject>,
 422    ) -> Result<proto::JoinProjectResponse> {
 423        let project_id = request.payload.project_id;
 424
 425        let user_id = state.user_id_for_connection(request.sender_id)?;
 426        let (response, connection_ids, contact_user_ids) = state
 427            .join_project(request.sender_id, user_id, project_id)
 428            .and_then(|joined| {
 429                let share = joined.project.share()?;
 430                let peer_count = share.guests.len();
 431                let mut collaborators = Vec::with_capacity(peer_count);
 432                collaborators.push(proto::Collaborator {
 433                    peer_id: joined.project.host_connection_id.0,
 434                    replica_id: 0,
 435                    user_id: joined.project.host_user_id.to_proto(),
 436                });
 437                let worktrees = share
 438                    .worktrees
 439                    .iter()
 440                    .filter_map(|(id, shared_worktree)| {
 441                        let worktree = joined.project.worktrees.get(&id)?;
 442                        Some(proto::Worktree {
 443                            id: *id,
 444                            root_name: worktree.root_name.clone(),
 445                            entries: shared_worktree.entries.values().cloned().collect(),
 446                            diagnostic_summaries: shared_worktree
 447                                .diagnostic_summaries
 448                                .values()
 449                                .cloned()
 450                                .collect(),
 451                            visible: worktree.visible,
 452                            scan_id: shared_worktree.scan_id,
 453                        })
 454                    })
 455                    .collect();
 456                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 457                    if *peer_conn_id != request.sender_id {
 458                        collaborators.push(proto::Collaborator {
 459                            peer_id: peer_conn_id.0,
 460                            replica_id: *peer_replica_id as u32,
 461                            user_id: peer_user_id.to_proto(),
 462                        });
 463                    }
 464                }
 465                let response = proto::JoinProjectResponse {
 466                    worktrees,
 467                    replica_id: joined.replica_id as u32,
 468                    collaborators,
 469                    language_servers: joined.project.language_servers.clone(),
 470                };
 471                let connection_ids = joined.project.connection_ids();
 472                let contact_user_ids = joined.project.authorized_user_ids();
 473                Ok((response, connection_ids, contact_user_ids))
 474            })?;
 475
 476        broadcast(request.sender_id, connection_ids, |conn_id| {
 477            self.peer.send(
 478                conn_id,
 479                proto::AddProjectCollaborator {
 480                    project_id,
 481                    collaborator: Some(proto::Collaborator {
 482                        peer_id: request.sender_id.0,
 483                        replica_id: response.replica_id,
 484                        user_id: user_id.to_proto(),
 485                    }),
 486                },
 487            )
 488        });
 489        self.update_contacts_for_users(state, &contact_user_ids);
 490        Ok(response)
 491    }
 492
 493    async fn leave_project(
 494        self: Arc<Server>,
 495        request: TypedEnvelope<proto::LeaveProject>,
 496    ) -> Result<()> {
 497        let sender_id = request.sender_id;
 498        let project_id = request.payload.project_id;
 499        let mut state = self.state_mut().await;
 500        let worktree = state.leave_project(sender_id, project_id)?;
 501        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 502            self.peer.send(
 503                conn_id,
 504                proto::RemoveProjectCollaborator {
 505                    project_id,
 506                    peer_id: sender_id.0,
 507                },
 508            )
 509        });
 510        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 511        Ok(())
 512    }
 513
 514    async fn register_worktree(
 515        self: Arc<Server>,
 516        request: TypedEnvelope<proto::RegisterWorktree>,
 517    ) -> Result<proto::Ack> {
 518        let mut contact_user_ids = HashSet::default();
 519        for github_login in &request.payload.authorized_logins {
 520            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 521            contact_user_ids.insert(contact_user_id);
 522        }
 523
 524        let mut state = self.state_mut().await;
 525        let host_user_id = state.user_id_for_connection(request.sender_id)?;
 526        contact_user_ids.insert(host_user_id);
 527
 528        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 529        let guest_connection_ids = state
 530            .read_project(request.payload.project_id, request.sender_id)?
 531            .guest_connection_ids();
 532        state.register_worktree(
 533            request.payload.project_id,
 534            request.payload.worktree_id,
 535            request.sender_id,
 536            Worktree {
 537                authorized_user_ids: contact_user_ids.clone(),
 538                root_name: request.payload.root_name.clone(),
 539                visible: request.payload.visible,
 540            },
 541        )?;
 542
 543        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 544            self.peer
 545                .forward_send(request.sender_id, connection_id, request.payload.clone())
 546        });
 547        self.update_contacts_for_users(&*state, &contact_user_ids);
 548        Ok(proto::Ack {})
 549    }
 550
 551    async fn unregister_worktree(
 552        self: Arc<Server>,
 553        request: TypedEnvelope<proto::UnregisterWorktree>,
 554    ) -> Result<()> {
 555        let project_id = request.payload.project_id;
 556        let worktree_id = request.payload.worktree_id;
 557        let mut state = self.state_mut().await;
 558        let (worktree, guest_connection_ids) =
 559            state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 560        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 561            self.peer.send(
 562                conn_id,
 563                proto::UnregisterWorktree {
 564                    project_id,
 565                    worktree_id,
 566                },
 567            )
 568        });
 569        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 570        Ok(())
 571    }
 572
 573    async fn update_worktree(
 574        self: Arc<Server>,
 575        request: TypedEnvelope<proto::UpdateWorktree>,
 576    ) -> Result<proto::Ack> {
 577        let connection_ids = self.state_mut().await.update_worktree(
 578            request.sender_id,
 579            request.payload.project_id,
 580            request.payload.worktree_id,
 581            &request.payload.removed_entries,
 582            &request.payload.updated_entries,
 583            request.payload.scan_id,
 584        )?;
 585
 586        broadcast(request.sender_id, connection_ids, |connection_id| {
 587            self.peer
 588                .forward_send(request.sender_id, connection_id, request.payload.clone())
 589        });
 590
 591        Ok(proto::Ack {})
 592    }
 593
 594    async fn update_diagnostic_summary(
 595        self: Arc<Server>,
 596        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 597    ) -> Result<()> {
 598        let summary = request
 599            .payload
 600            .summary
 601            .clone()
 602            .ok_or_else(|| anyhow!("invalid summary"))?;
 603        let receiver_ids = self.state_mut().await.update_diagnostic_summary(
 604            request.payload.project_id,
 605            request.payload.worktree_id,
 606            request.sender_id,
 607            summary,
 608        )?;
 609
 610        broadcast(request.sender_id, receiver_ids, |connection_id| {
 611            self.peer
 612                .forward_send(request.sender_id, connection_id, request.payload.clone())
 613        });
 614        Ok(())
 615    }
 616
 617    async fn start_language_server(
 618        self: Arc<Server>,
 619        request: TypedEnvelope<proto::StartLanguageServer>,
 620    ) -> Result<()> {
 621        let receiver_ids = self.state_mut().await.start_language_server(
 622            request.payload.project_id,
 623            request.sender_id,
 624            request
 625                .payload
 626                .server
 627                .clone()
 628                .ok_or_else(|| anyhow!("invalid language server"))?,
 629        )?;
 630        broadcast(request.sender_id, receiver_ids, |connection_id| {
 631            self.peer
 632                .forward_send(request.sender_id, connection_id, request.payload.clone())
 633        });
 634        Ok(())
 635    }
 636
 637    async fn update_language_server(
 638        self: Arc<Server>,
 639        request: TypedEnvelope<proto::UpdateLanguageServer>,
 640    ) -> Result<()> {
 641        let receiver_ids = self
 642            .state()
 643            .await
 644            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 645        broadcast(request.sender_id, receiver_ids, |connection_id| {
 646            self.peer
 647                .forward_send(request.sender_id, connection_id, request.payload.clone())
 648        });
 649        Ok(())
 650    }
 651
 652    async fn forward_project_request<T>(
 653        self: Arc<Server>,
 654        request: TypedEnvelope<T>,
 655    ) -> Result<T::Response>
 656    where
 657        T: EntityMessage + RequestMessage,
 658    {
 659        let host_connection_id = self
 660            .state()
 661            .await
 662            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 663            .host_connection_id;
 664        Ok(self
 665            .peer
 666            .forward_request(request.sender_id, host_connection_id, request.payload)
 667            .await?)
 668    }
 669
 670    async fn save_buffer(
 671        self: Arc<Server>,
 672        request: TypedEnvelope<proto::SaveBuffer>,
 673    ) -> Result<proto::BufferSaved> {
 674        let host = self
 675            .state()
 676            .await
 677            .read_project(request.payload.project_id, request.sender_id)?
 678            .host_connection_id;
 679        let response = self
 680            .peer
 681            .forward_request(request.sender_id, host, request.payload.clone())
 682            .await?;
 683
 684        let mut guests = self
 685            .state()
 686            .await
 687            .read_project(request.payload.project_id, request.sender_id)?
 688            .connection_ids();
 689        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 690        broadcast(host, guests, |conn_id| {
 691            self.peer.forward_send(host, conn_id, response.clone())
 692        });
 693
 694        Ok(response)
 695    }
 696
 697    async fn update_buffer(
 698        self: Arc<Server>,
 699        request: TypedEnvelope<proto::UpdateBuffer>,
 700    ) -> Result<proto::Ack> {
 701        let receiver_ids = self
 702            .state()
 703            .await
 704            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 705        broadcast(request.sender_id, receiver_ids, |connection_id| {
 706            self.peer
 707                .forward_send(request.sender_id, connection_id, request.payload.clone())
 708        });
 709        Ok(proto::Ack {})
 710    }
 711
 712    async fn update_buffer_file(
 713        self: Arc<Server>,
 714        request: TypedEnvelope<proto::UpdateBufferFile>,
 715    ) -> Result<()> {
 716        let receiver_ids = self
 717            .state()
 718            .await
 719            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 720        broadcast(request.sender_id, receiver_ids, |connection_id| {
 721            self.peer
 722                .forward_send(request.sender_id, connection_id, request.payload.clone())
 723        });
 724        Ok(())
 725    }
 726
 727    async fn buffer_reloaded(
 728        self: Arc<Server>,
 729        request: TypedEnvelope<proto::BufferReloaded>,
 730    ) -> Result<()> {
 731        let receiver_ids = self
 732            .state()
 733            .await
 734            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 735        broadcast(request.sender_id, receiver_ids, |connection_id| {
 736            self.peer
 737                .forward_send(request.sender_id, connection_id, request.payload.clone())
 738        });
 739        Ok(())
 740    }
 741
 742    async fn buffer_saved(
 743        self: Arc<Server>,
 744        request: TypedEnvelope<proto::BufferSaved>,
 745    ) -> Result<()> {
 746        let receiver_ids = self
 747            .state()
 748            .await
 749            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 750        broadcast(request.sender_id, receiver_ids, |connection_id| {
 751            self.peer
 752                .forward_send(request.sender_id, connection_id, request.payload.clone())
 753        });
 754        Ok(())
 755    }
 756
 757    async fn follow(
 758        self: Arc<Self>,
 759        request: TypedEnvelope<proto::Follow>,
 760    ) -> Result<proto::FollowResponse> {
 761        let leader_id = ConnectionId(request.payload.leader_id);
 762        let follower_id = request.sender_id;
 763        if !self
 764            .state()
 765            .await
 766            .project_connection_ids(request.payload.project_id, follower_id)?
 767            .contains(&leader_id)
 768        {
 769            Err(anyhow!("no such peer"))?;
 770        }
 771        let mut response = self
 772            .peer
 773            .forward_request(request.sender_id, leader_id, request.payload)
 774            .await?;
 775        response
 776            .views
 777            .retain(|view| view.leader_id != Some(follower_id.0));
 778        Ok(response)
 779    }
 780
 781    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 782        let leader_id = ConnectionId(request.payload.leader_id);
 783        if !self
 784            .state()
 785            .await
 786            .project_connection_ids(request.payload.project_id, request.sender_id)?
 787            .contains(&leader_id)
 788        {
 789            Err(anyhow!("no such peer"))?;
 790        }
 791        self.peer
 792            .forward_send(request.sender_id, leader_id, request.payload)?;
 793        Ok(())
 794    }
 795
 796    async fn update_followers(
 797        self: Arc<Self>,
 798        request: TypedEnvelope<proto::UpdateFollowers>,
 799    ) -> Result<()> {
 800        let connection_ids = self
 801            .state()
 802            .await
 803            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 804        let leader_id = request
 805            .payload
 806            .variant
 807            .as_ref()
 808            .and_then(|variant| match variant {
 809                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 810                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 811                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 812            });
 813        for follower_id in &request.payload.follower_ids {
 814            let follower_id = ConnectionId(*follower_id);
 815            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 816                self.peer
 817                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 818            }
 819        }
 820        Ok(())
 821    }
 822
 823    async fn get_channels(
 824        self: Arc<Server>,
 825        request: TypedEnvelope<proto::GetChannels>,
 826    ) -> Result<proto::GetChannelsResponse> {
 827        let user_id = self
 828            .state()
 829            .await
 830            .user_id_for_connection(request.sender_id)?;
 831        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 832        Ok(proto::GetChannelsResponse {
 833            channels: channels
 834                .into_iter()
 835                .map(|chan| proto::Channel {
 836                    id: chan.id.to_proto(),
 837                    name: chan.name,
 838                })
 839                .collect(),
 840        })
 841    }
 842
 843    async fn get_users(
 844        self: Arc<Server>,
 845        request: TypedEnvelope<proto::GetUsers>,
 846    ) -> Result<proto::UsersResponse> {
 847        let user_ids = request
 848            .payload
 849            .user_ids
 850            .into_iter()
 851            .map(UserId::from_proto)
 852            .collect();
 853        let users = self
 854            .app_state
 855            .db
 856            .get_users_by_ids(user_ids)
 857            .await?
 858            .into_iter()
 859            .map(|user| proto::User {
 860                id: user.id.to_proto(),
 861                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 862                github_login: user.github_login,
 863            })
 864            .collect();
 865        Ok(proto::UsersResponse { users })
 866    }
 867
 868    async fn fuzzy_search_users(
 869        self: Arc<Server>,
 870        request: TypedEnvelope<proto::FuzzySearchUsers>,
 871    ) -> Result<proto::UsersResponse> {
 872        let query = request.payload.query;
 873        let db = &self.app_state.db;
 874        let users = match query.len() {
 875            0 => vec![],
 876            1 | 2 => db
 877                .get_user_by_github_login(&query)
 878                .await?
 879                .into_iter()
 880                .collect(),
 881            _ => db.fuzzy_search_users(&query, 10).await?,
 882        };
 883        let users = users
 884            .into_iter()
 885            .map(|user| proto::User {
 886                id: user.id.to_proto(),
 887                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 888                github_login: user.github_login,
 889            })
 890            .collect();
 891        Ok(proto::UsersResponse { users })
 892    }
 893
 894    #[instrument(skip(self, state, user_ids))]
 895    fn update_contacts_for_users<'a>(
 896        self: &Arc<Self>,
 897        state: &Store,
 898        user_ids: impl IntoIterator<Item = &'a UserId>,
 899    ) {
 900        for user_id in user_ids {
 901            let contacts = state.contacts_for_user(*user_id);
 902            for connection_id in state.connection_ids_for_user(*user_id) {
 903                self.peer
 904                    .send(
 905                        connection_id,
 906                        proto::UpdateContacts {
 907                            contacts: contacts.clone(),
 908                            pending_requests_from_user_ids: Default::default(),
 909                            pending_requests_to_user_ids: Default::default(),
 910                        },
 911                    )
 912                    .trace_err();
 913            }
 914        }
 915    }
 916
 917    async fn join_channel(
 918        self: Arc<Self>,
 919        request: TypedEnvelope<proto::JoinChannel>,
 920    ) -> Result<proto::JoinChannelResponse> {
 921        let user_id = self
 922            .state()
 923            .await
 924            .user_id_for_connection(request.sender_id)?;
 925        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 926        if !self
 927            .app_state
 928            .db
 929            .can_user_access_channel(user_id, channel_id)
 930            .await?
 931        {
 932            Err(anyhow!("access denied"))?;
 933        }
 934
 935        self.state_mut()
 936            .await
 937            .join_channel(request.sender_id, channel_id);
 938        let messages = self
 939            .app_state
 940            .db
 941            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 942            .await?
 943            .into_iter()
 944            .map(|msg| proto::ChannelMessage {
 945                id: msg.id.to_proto(),
 946                body: msg.body,
 947                timestamp: msg.sent_at.unix_timestamp() as u64,
 948                sender_id: msg.sender_id.to_proto(),
 949                nonce: Some(msg.nonce.as_u128().into()),
 950            })
 951            .collect::<Vec<_>>();
 952        Ok(proto::JoinChannelResponse {
 953            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 954            messages,
 955        })
 956    }
 957
 958    async fn leave_channel(
 959        self: Arc<Self>,
 960        request: TypedEnvelope<proto::LeaveChannel>,
 961    ) -> Result<()> {
 962        let user_id = self
 963            .state()
 964            .await
 965            .user_id_for_connection(request.sender_id)?;
 966        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 967        if !self
 968            .app_state
 969            .db
 970            .can_user_access_channel(user_id, channel_id)
 971            .await?
 972        {
 973            Err(anyhow!("access denied"))?;
 974        }
 975
 976        self.state_mut()
 977            .await
 978            .leave_channel(request.sender_id, channel_id);
 979
 980        Ok(())
 981    }
 982
 983    async fn send_channel_message(
 984        self: Arc<Self>,
 985        request: TypedEnvelope<proto::SendChannelMessage>,
 986    ) -> Result<proto::SendChannelMessageResponse> {
 987        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 988        let user_id;
 989        let connection_ids;
 990        {
 991            let state = self.state().await;
 992            user_id = state.user_id_for_connection(request.sender_id)?;
 993            connection_ids = state.channel_connection_ids(channel_id)?;
 994        }
 995
 996        // Validate the message body.
 997        let body = request.payload.body.trim().to_string();
 998        if body.len() > MAX_MESSAGE_LEN {
 999            return Err(anyhow!("message is too long"))?;
1000        }
1001        if body.is_empty() {
1002            return Err(anyhow!("message can't be blank"))?;
1003        }
1004
1005        let timestamp = OffsetDateTime::now_utc();
1006        let nonce = request
1007            .payload
1008            .nonce
1009            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1010
1011        let message_id = self
1012            .app_state
1013            .db
1014            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1015            .await?
1016            .to_proto();
1017        let message = proto::ChannelMessage {
1018            sender_id: user_id.to_proto(),
1019            id: message_id,
1020            body,
1021            timestamp: timestamp.unix_timestamp() as u64,
1022            nonce: Some(nonce),
1023        };
1024        broadcast(request.sender_id, connection_ids, |conn_id| {
1025            self.peer.send(
1026                conn_id,
1027                proto::ChannelMessageSent {
1028                    channel_id: channel_id.to_proto(),
1029                    message: Some(message.clone()),
1030                },
1031            )
1032        });
1033        Ok(proto::SendChannelMessageResponse {
1034            message: Some(message),
1035        })
1036    }
1037
1038    async fn get_channel_messages(
1039        self: Arc<Self>,
1040        request: TypedEnvelope<proto::GetChannelMessages>,
1041    ) -> Result<proto::GetChannelMessagesResponse> {
1042        let user_id = self
1043            .state()
1044            .await
1045            .user_id_for_connection(request.sender_id)?;
1046        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1047        if !self
1048            .app_state
1049            .db
1050            .can_user_access_channel(user_id, channel_id)
1051            .await?
1052        {
1053            Err(anyhow!("access denied"))?;
1054        }
1055
1056        let messages = self
1057            .app_state
1058            .db
1059            .get_channel_messages(
1060                channel_id,
1061                MESSAGE_COUNT_PER_PAGE,
1062                Some(MessageId::from_proto(request.payload.before_message_id)),
1063            )
1064            .await?
1065            .into_iter()
1066            .map(|msg| proto::ChannelMessage {
1067                id: msg.id.to_proto(),
1068                body: msg.body,
1069                timestamp: msg.sent_at.unix_timestamp() as u64,
1070                sender_id: msg.sender_id.to_proto(),
1071                nonce: Some(msg.nonce.as_u128().into()),
1072            })
1073            .collect::<Vec<_>>();
1074
1075        Ok(proto::GetChannelMessagesResponse {
1076            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1077            messages,
1078        })
1079    }
1080
1081    async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1082        #[cfg(test)]
1083        tokio::task::yield_now().await;
1084        let guard = self.store.read().await;
1085        #[cfg(test)]
1086        tokio::task::yield_now().await;
1087        StoreReadGuard {
1088            guard,
1089            _not_send: PhantomData,
1090        }
1091    }
1092
1093    async fn state_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1094        #[cfg(test)]
1095        tokio::task::yield_now().await;
1096        let guard = self.store.write().await;
1097        #[cfg(test)]
1098        tokio::task::yield_now().await;
1099        StoreWriteGuard {
1100            guard,
1101            _not_send: PhantomData,
1102        }
1103    }
1104}
1105
1106impl<'a> Deref for StoreReadGuard<'a> {
1107    type Target = Store;
1108
1109    fn deref(&self) -> &Self::Target {
1110        &*self.guard
1111    }
1112}
1113
1114impl<'a> Deref for StoreWriteGuard<'a> {
1115    type Target = Store;
1116
1117    fn deref(&self) -> &Self::Target {
1118        &*self.guard
1119    }
1120}
1121
1122impl<'a> DerefMut for StoreWriteGuard<'a> {
1123    fn deref_mut(&mut self) -> &mut Self::Target {
1124        &mut *self.guard
1125    }
1126}
1127
1128impl<'a> Drop for StoreWriteGuard<'a> {
1129    fn drop(&mut self) {
1130        #[cfg(test)]
1131        self.check_invariants();
1132
1133        let metrics = self.metrics();
1134        tracing::info!(
1135            connections = metrics.connections,
1136            registered_projects = metrics.registered_projects,
1137            shared_projects = metrics.shared_projects,
1138            "metrics"
1139        );
1140    }
1141}
1142
1143impl Executor for RealExecutor {
1144    type Sleep = Sleep;
1145
1146    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1147        tokio::task::spawn(future);
1148    }
1149
1150    fn sleep(&self, duration: Duration) -> Self::Sleep {
1151        tokio::time::sleep(duration)
1152    }
1153}
1154
1155#[instrument(skip(f))]
1156fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1157where
1158    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1159{
1160    for receiver_id in receiver_ids {
1161        if receiver_id != sender_id {
1162            f(receiver_id).trace_err();
1163        }
1164    }
1165}
1166
1167lazy_static! {
1168    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1169}
1170
1171pub struct ProtocolVersion(u32);
1172
1173impl Header for ProtocolVersion {
1174    fn name() -> &'static HeaderName {
1175        &ZED_PROTOCOL_VERSION
1176    }
1177
1178    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1179    where
1180        Self: Sized,
1181        I: Iterator<Item = &'i axum::http::HeaderValue>,
1182    {
1183        let version = values
1184            .next()
1185            .ok_or_else(|| axum::headers::Error::invalid())?
1186            .to_str()
1187            .map_err(|_| axum::headers::Error::invalid())?
1188            .parse()
1189            .map_err(|_| axum::headers::Error::invalid())?;
1190        Ok(Self(version))
1191    }
1192
1193    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1194        values.extend([self.0.to_string().parse().unwrap()]);
1195    }
1196}
1197
1198pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1199    let server = Server::new(app_state.clone(), None);
1200    Router::new()
1201        .route("/rpc", get(handle_websocket_request))
1202        .layer(
1203            ServiceBuilder::new()
1204                .layer(Extension(app_state))
1205                .layer(middleware::from_fn(auth::validate_header))
1206                .layer(Extension(server)),
1207        )
1208}
1209
1210pub async fn handle_websocket_request(
1211    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1212    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1213    Extension(server): Extension<Arc<Server>>,
1214    Extension(user_id): Extension<UserId>,
1215    ws: WebSocketUpgrade,
1216) -> Response {
1217    if protocol_version != rpc::PROTOCOL_VERSION {
1218        return (
1219            StatusCode::UPGRADE_REQUIRED,
1220            "client must be upgraded".to_string(),
1221        )
1222            .into_response();
1223    }
1224    let socket_address = socket_address.to_string();
1225    ws.on_upgrade(move |socket| {
1226        let socket = socket
1227            .map_ok(to_tungstenite_message)
1228            .err_into()
1229            .with(|message| async move { Ok(to_axum_message(message)) });
1230        let connection = Connection::new(Box::pin(socket));
1231        server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
1232    })
1233}
1234
1235fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1236    match message {
1237        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1238        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1239        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1240        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1241        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1242            code: frame.code.into(),
1243            reason: frame.reason,
1244        })),
1245    }
1246}
1247
1248fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1249    match message {
1250        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1251        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1252        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1253        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1254        AxumMessage::Close(frame) => {
1255            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1256                code: frame.code.into(),
1257                reason: frame.reason,
1258            }))
1259        }
1260    }
1261}
1262
1263pub trait ResultExt {
1264    type Ok;
1265
1266    fn trace_err(self) -> Option<Self::Ok>;
1267}
1268
1269impl<T, E> ResultExt for Result<T, E>
1270where
1271    E: std::fmt::Debug,
1272{
1273    type Ok = T;
1274
1275    fn trace_err(self) -> Option<T> {
1276        match self {
1277            Ok(value) => Some(value),
1278            Err(error) => {
1279                tracing::error!("{:?}", error);
1280                None
1281            }
1282        }
1283    }
1284}
1285
1286#[cfg(test)]
1287mod tests {
1288    use super::*;
1289    use crate::{
1290        db::{tests::TestDb, UserId},
1291        AppState,
1292    };
1293    use ::rpc::Peer;
1294    use client::{
1295        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1296        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1297    };
1298    use collections::BTreeMap;
1299    use editor::{
1300        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1301        ToOffset, ToggleCodeActions, Undo,
1302    };
1303    use gpui::{
1304        executor::{self, Deterministic},
1305        geometry::vector::vec2f,
1306        ModelHandle, TestAppContext, ViewHandle,
1307    };
1308    use language::{
1309        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1310        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1311    };
1312    use lsp::{self, FakeLanguageServer};
1313    use parking_lot::Mutex;
1314    use project::{
1315        fs::{FakeFs, Fs as _},
1316        search::SearchQuery,
1317        worktree::WorktreeHandle,
1318        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1319    };
1320    use rand::prelude::*;
1321    use rpc::PeerId;
1322    use serde_json::json;
1323    use settings::Settings;
1324    use sqlx::types::time::OffsetDateTime;
1325    use std::{
1326        env,
1327        ops::Deref,
1328        path::{Path, PathBuf},
1329        rc::Rc,
1330        sync::{
1331            atomic::{AtomicBool, Ordering::SeqCst},
1332            Arc,
1333        },
1334        time::Duration,
1335    };
1336    use theme::ThemeRegistry;
1337    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1338
1339    #[cfg(test)]
1340    #[ctor::ctor]
1341    fn init_logger() {
1342        if std::env::var("RUST_LOG").is_ok() {
1343            env_logger::init();
1344        }
1345    }
1346
1347    #[gpui::test(iterations = 10)]
1348    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1349        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1350        let lang_registry = Arc::new(LanguageRegistry::test());
1351        let fs = FakeFs::new(cx_a.background());
1352        cx_a.foreground().forbid_parking();
1353
1354        // Connect to a server as 2 clients.
1355        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1356        let client_a = server.create_client(cx_a, "user_a").await;
1357        let client_b = server.create_client(cx_b, "user_b").await;
1358
1359        // Share a project as client A
1360        fs.insert_tree(
1361            "/a",
1362            json!({
1363                ".zed.toml": r#"collaborators = ["user_b"]"#,
1364                "a.txt": "a-contents",
1365                "b.txt": "b-contents",
1366            }),
1367        )
1368        .await;
1369        let project_a = cx_a.update(|cx| {
1370            Project::local(
1371                client_a.clone(),
1372                client_a.user_store.clone(),
1373                lang_registry.clone(),
1374                fs.clone(),
1375                cx,
1376            )
1377        });
1378        let (worktree_a, _) = project_a
1379            .update(cx_a, |p, cx| {
1380                p.find_or_create_local_worktree("/a", true, cx)
1381            })
1382            .await
1383            .unwrap();
1384        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1385        worktree_a
1386            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1387            .await;
1388        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1389        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1390
1391        // Join that project as client B
1392        let project_b = Project::remote(
1393            project_id,
1394            client_b.clone(),
1395            client_b.user_store.clone(),
1396            lang_registry.clone(),
1397            fs.clone(),
1398            &mut cx_b.to_async(),
1399        )
1400        .await
1401        .unwrap();
1402
1403        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1404            assert_eq!(
1405                project
1406                    .collaborators()
1407                    .get(&client_a.peer_id)
1408                    .unwrap()
1409                    .user
1410                    .github_login,
1411                "user_a"
1412            );
1413            project.replica_id()
1414        });
1415        project_a
1416            .condition(&cx_a, |tree, _| {
1417                tree.collaborators()
1418                    .get(&client_b.peer_id)
1419                    .map_or(false, |collaborator| {
1420                        collaborator.replica_id == replica_id_b
1421                            && collaborator.user.github_login == "user_b"
1422                    })
1423            })
1424            .await;
1425
1426        // Open the same file as client B and client A.
1427        let buffer_b = project_b
1428            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1429            .await
1430            .unwrap();
1431        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1432        project_a.read_with(cx_a, |project, cx| {
1433            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1434        });
1435        let buffer_a = project_a
1436            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1437            .await
1438            .unwrap();
1439
1440        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1441
1442        // TODO
1443        // // Create a selection set as client B and see that selection set as client A.
1444        // buffer_a
1445        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1446        //     .await;
1447
1448        // Edit the buffer as client B and see that edit as client A.
1449        editor_b.update(cx_b, |editor, cx| {
1450            editor.handle_input(&Input("ok, ".into()), cx)
1451        });
1452        buffer_a
1453            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1454            .await;
1455
1456        // TODO
1457        // // Remove the selection set as client B, see those selections disappear as client A.
1458        cx_b.update(move |_| drop(editor_b));
1459        // buffer_a
1460        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1461        //     .await;
1462
1463        // Dropping the client B's project removes client B from client A's collaborators.
1464        cx_b.update(move |_| drop(project_b));
1465        project_a
1466            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1467            .await;
1468    }
1469
1470    #[gpui::test(iterations = 10)]
1471    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1472        let lang_registry = Arc::new(LanguageRegistry::test());
1473        let fs = FakeFs::new(cx_a.background());
1474        cx_a.foreground().forbid_parking();
1475
1476        // Connect to a server as 2 clients.
1477        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1478        let client_a = server.create_client(cx_a, "user_a").await;
1479        let client_b = server.create_client(cx_b, "user_b").await;
1480
1481        // Share a project as client A
1482        fs.insert_tree(
1483            "/a",
1484            json!({
1485                ".zed.toml": r#"collaborators = ["user_b"]"#,
1486                "a.txt": "a-contents",
1487                "b.txt": "b-contents",
1488            }),
1489        )
1490        .await;
1491        let project_a = cx_a.update(|cx| {
1492            Project::local(
1493                client_a.clone(),
1494                client_a.user_store.clone(),
1495                lang_registry.clone(),
1496                fs.clone(),
1497                cx,
1498            )
1499        });
1500        let (worktree_a, _) = project_a
1501            .update(cx_a, |p, cx| {
1502                p.find_or_create_local_worktree("/a", true, cx)
1503            })
1504            .await
1505            .unwrap();
1506        worktree_a
1507            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1508            .await;
1509        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1510        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1511        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1512        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1513
1514        // Join that project as client B
1515        let project_b = Project::remote(
1516            project_id,
1517            client_b.clone(),
1518            client_b.user_store.clone(),
1519            lang_registry.clone(),
1520            fs.clone(),
1521            &mut cx_b.to_async(),
1522        )
1523        .await
1524        .unwrap();
1525        project_b
1526            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1527            .await
1528            .unwrap();
1529
1530        // Unshare the project as client A
1531        project_a.update(cx_a, |project, cx| project.unshare(cx));
1532        project_b
1533            .condition(cx_b, |project, _| project.is_read_only())
1534            .await;
1535        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1536        cx_b.update(|_| {
1537            drop(project_b);
1538        });
1539
1540        // Share the project again and ensure guests can still join.
1541        project_a
1542            .update(cx_a, |project, cx| project.share(cx))
1543            .await
1544            .unwrap();
1545        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1546
1547        let project_b2 = Project::remote(
1548            project_id,
1549            client_b.clone(),
1550            client_b.user_store.clone(),
1551            lang_registry.clone(),
1552            fs.clone(),
1553            &mut cx_b.to_async(),
1554        )
1555        .await
1556        .unwrap();
1557        project_b2
1558            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1559            .await
1560            .unwrap();
1561    }
1562
1563    #[gpui::test(iterations = 10)]
1564    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1565        let lang_registry = Arc::new(LanguageRegistry::test());
1566        let fs = FakeFs::new(cx_a.background());
1567        cx_a.foreground().forbid_parking();
1568
1569        // Connect to a server as 2 clients.
1570        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1571        let client_a = server.create_client(cx_a, "user_a").await;
1572        let client_b = server.create_client(cx_b, "user_b").await;
1573
1574        // Share a project as client A
1575        fs.insert_tree(
1576            "/a",
1577            json!({
1578                ".zed.toml": r#"collaborators = ["user_b"]"#,
1579                "a.txt": "a-contents",
1580                "b.txt": "b-contents",
1581            }),
1582        )
1583        .await;
1584        let project_a = cx_a.update(|cx| {
1585            Project::local(
1586                client_a.clone(),
1587                client_a.user_store.clone(),
1588                lang_registry.clone(),
1589                fs.clone(),
1590                cx,
1591            )
1592        });
1593        let (worktree_a, _) = project_a
1594            .update(cx_a, |p, cx| {
1595                p.find_or_create_local_worktree("/a", true, cx)
1596            })
1597            .await
1598            .unwrap();
1599        worktree_a
1600            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1601            .await;
1602        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1603        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1604        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1605        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1606
1607        // Join that project as client B
1608        let project_b = Project::remote(
1609            project_id,
1610            client_b.clone(),
1611            client_b.user_store.clone(),
1612            lang_registry.clone(),
1613            fs.clone(),
1614            &mut cx_b.to_async(),
1615        )
1616        .await
1617        .unwrap();
1618        project_b
1619            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1620            .await
1621            .unwrap();
1622
1623        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1624        server.disconnect_client(client_a.current_user_id(cx_a));
1625        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1626        project_a
1627            .condition(cx_a, |project, _| project.collaborators().is_empty())
1628            .await;
1629        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1630        project_b
1631            .condition(cx_b, |project, _| project.is_read_only())
1632            .await;
1633        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1634        cx_b.update(|_| {
1635            drop(project_b);
1636        });
1637
1638        // Await reconnection
1639        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1640
1641        // Share the project again and ensure guests can still join.
1642        project_a
1643            .update(cx_a, |project, cx| project.share(cx))
1644            .await
1645            .unwrap();
1646        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1647
1648        let project_b2 = Project::remote(
1649            project_id,
1650            client_b.clone(),
1651            client_b.user_store.clone(),
1652            lang_registry.clone(),
1653            fs.clone(),
1654            &mut cx_b.to_async(),
1655        )
1656        .await
1657        .unwrap();
1658        project_b2
1659            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1660            .await
1661            .unwrap();
1662    }
1663
1664    #[gpui::test(iterations = 10)]
1665    async fn test_propagate_saves_and_fs_changes(
1666        cx_a: &mut TestAppContext,
1667        cx_b: &mut TestAppContext,
1668        cx_c: &mut TestAppContext,
1669    ) {
1670        let lang_registry = Arc::new(LanguageRegistry::test());
1671        let fs = FakeFs::new(cx_a.background());
1672        cx_a.foreground().forbid_parking();
1673
1674        // Connect to a server as 3 clients.
1675        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1676        let client_a = server.create_client(cx_a, "user_a").await;
1677        let client_b = server.create_client(cx_b, "user_b").await;
1678        let client_c = server.create_client(cx_c, "user_c").await;
1679
1680        // Share a worktree as client A.
1681        fs.insert_tree(
1682            "/a",
1683            json!({
1684                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1685                "file1": "",
1686                "file2": ""
1687            }),
1688        )
1689        .await;
1690        let project_a = cx_a.update(|cx| {
1691            Project::local(
1692                client_a.clone(),
1693                client_a.user_store.clone(),
1694                lang_registry.clone(),
1695                fs.clone(),
1696                cx,
1697            )
1698        });
1699        let (worktree_a, _) = project_a
1700            .update(cx_a, |p, cx| {
1701                p.find_or_create_local_worktree("/a", true, cx)
1702            })
1703            .await
1704            .unwrap();
1705        worktree_a
1706            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1707            .await;
1708        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1709        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1710        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1711
1712        // Join that worktree as clients B and C.
1713        let project_b = Project::remote(
1714            project_id,
1715            client_b.clone(),
1716            client_b.user_store.clone(),
1717            lang_registry.clone(),
1718            fs.clone(),
1719            &mut cx_b.to_async(),
1720        )
1721        .await
1722        .unwrap();
1723        let project_c = Project::remote(
1724            project_id,
1725            client_c.clone(),
1726            client_c.user_store.clone(),
1727            lang_registry.clone(),
1728            fs.clone(),
1729            &mut cx_c.to_async(),
1730        )
1731        .await
1732        .unwrap();
1733        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1734        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1735
1736        // Open and edit a buffer as both guests B and C.
1737        let buffer_b = project_b
1738            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1739            .await
1740            .unwrap();
1741        let buffer_c = project_c
1742            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1743            .await
1744            .unwrap();
1745        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1746        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1747
1748        // Open and edit that buffer as the host.
1749        let buffer_a = project_a
1750            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1751            .await
1752            .unwrap();
1753
1754        buffer_a
1755            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1756            .await;
1757        buffer_a.update(cx_a, |buf, cx| {
1758            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1759        });
1760
1761        // Wait for edits to propagate
1762        buffer_a
1763            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1764            .await;
1765        buffer_b
1766            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1767            .await;
1768        buffer_c
1769            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1770            .await;
1771
1772        // Edit the buffer as the host and concurrently save as guest B.
1773        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1774        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
1775        save_b.await.unwrap();
1776        assert_eq!(
1777            fs.load("/a/file1".as_ref()).await.unwrap(),
1778            "hi-a, i-am-c, i-am-b, i-am-a"
1779        );
1780        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1781        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1782        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1783
1784        worktree_a.flush_fs_events(cx_a).await;
1785
1786        // Make changes on host's file system, see those changes on guest worktrees.
1787        fs.rename(
1788            "/a/file1".as_ref(),
1789            "/a/file1-renamed".as_ref(),
1790            Default::default(),
1791        )
1792        .await
1793        .unwrap();
1794
1795        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1796            .await
1797            .unwrap();
1798        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1799
1800        worktree_a
1801            .condition(&cx_a, |tree, _| {
1802                tree.paths()
1803                    .map(|p| p.to_string_lossy())
1804                    .collect::<Vec<_>>()
1805                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1806            })
1807            .await;
1808        worktree_b
1809            .condition(&cx_b, |tree, _| {
1810                tree.paths()
1811                    .map(|p| p.to_string_lossy())
1812                    .collect::<Vec<_>>()
1813                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1814            })
1815            .await;
1816        worktree_c
1817            .condition(&cx_c, |tree, _| {
1818                tree.paths()
1819                    .map(|p| p.to_string_lossy())
1820                    .collect::<Vec<_>>()
1821                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1822            })
1823            .await;
1824
1825        // Ensure buffer files are updated as well.
1826        buffer_a
1827            .condition(&cx_a, |buf, _| {
1828                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1829            })
1830            .await;
1831        buffer_b
1832            .condition(&cx_b, |buf, _| {
1833                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1834            })
1835            .await;
1836        buffer_c
1837            .condition(&cx_c, |buf, _| {
1838                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1839            })
1840            .await;
1841    }
1842
1843    #[gpui::test(iterations = 10)]
1844    async fn test_fs_operations(
1845        executor: Arc<Deterministic>,
1846        cx_a: &mut TestAppContext,
1847        cx_b: &mut TestAppContext,
1848    ) {
1849        executor.forbid_parking();
1850        let fs = FakeFs::new(cx_a.background());
1851
1852        // Connect to a server as 2 clients.
1853        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1854        let mut client_a = server.create_client(cx_a, "user_a").await;
1855        let mut client_b = server.create_client(cx_b, "user_b").await;
1856
1857        // Share a project as client A
1858        fs.insert_tree(
1859            "/dir",
1860            json!({
1861                ".zed.toml": r#"collaborators = ["user_b"]"#,
1862                "a.txt": "a-contents",
1863                "b.txt": "b-contents",
1864            }),
1865        )
1866        .await;
1867
1868        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
1869        let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
1870        project_a
1871            .update(cx_a, |project, cx| project.share(cx))
1872            .await
1873            .unwrap();
1874
1875        let project_b = client_b.build_remote_project(project_id, cx_b).await;
1876
1877        let worktree_a =
1878            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
1879        let worktree_b =
1880            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
1881
1882        let entry = project_b
1883            .update(cx_b, |project, cx| {
1884                project
1885                    .create_entry((worktree_id, "c.txt"), false, cx)
1886                    .unwrap()
1887            })
1888            .await
1889            .unwrap();
1890        worktree_a.read_with(cx_a, |worktree, _| {
1891            assert_eq!(
1892                worktree
1893                    .paths()
1894                    .map(|p| p.to_string_lossy())
1895                    .collect::<Vec<_>>(),
1896                [".zed.toml", "a.txt", "b.txt", "c.txt"]
1897            );
1898        });
1899        worktree_b.read_with(cx_b, |worktree, _| {
1900            assert_eq!(
1901                worktree
1902                    .paths()
1903                    .map(|p| p.to_string_lossy())
1904                    .collect::<Vec<_>>(),
1905                [".zed.toml", "a.txt", "b.txt", "c.txt"]
1906            );
1907        });
1908
1909        project_b
1910            .update(cx_b, |project, cx| {
1911                project.rename_entry(entry.id, Path::new("d.txt"), cx)
1912            })
1913            .unwrap()
1914            .await
1915            .unwrap();
1916        worktree_a.read_with(cx_a, |worktree, _| {
1917            assert_eq!(
1918                worktree
1919                    .paths()
1920                    .map(|p| p.to_string_lossy())
1921                    .collect::<Vec<_>>(),
1922                [".zed.toml", "a.txt", "b.txt", "d.txt"]
1923            );
1924        });
1925        worktree_b.read_with(cx_b, |worktree, _| {
1926            assert_eq!(
1927                worktree
1928                    .paths()
1929                    .map(|p| p.to_string_lossy())
1930                    .collect::<Vec<_>>(),
1931                [".zed.toml", "a.txt", "b.txt", "d.txt"]
1932            );
1933        });
1934
1935        let dir_entry = project_b
1936            .update(cx_b, |project, cx| {
1937                project
1938                    .create_entry((worktree_id, "DIR"), true, cx)
1939                    .unwrap()
1940            })
1941            .await
1942            .unwrap();
1943        worktree_a.read_with(cx_a, |worktree, _| {
1944            assert_eq!(
1945                worktree
1946                    .paths()
1947                    .map(|p| p.to_string_lossy())
1948                    .collect::<Vec<_>>(),
1949                [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
1950            );
1951        });
1952        worktree_b.read_with(cx_b, |worktree, _| {
1953            assert_eq!(
1954                worktree
1955                    .paths()
1956                    .map(|p| p.to_string_lossy())
1957                    .collect::<Vec<_>>(),
1958                [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
1959            );
1960        });
1961
1962        project_b
1963            .update(cx_b, |project, cx| {
1964                project.delete_entry(dir_entry.id, cx).unwrap()
1965            })
1966            .await
1967            .unwrap();
1968        worktree_a.read_with(cx_a, |worktree, _| {
1969            assert_eq!(
1970                worktree
1971                    .paths()
1972                    .map(|p| p.to_string_lossy())
1973                    .collect::<Vec<_>>(),
1974                [".zed.toml", "a.txt", "b.txt", "d.txt"]
1975            );
1976        });
1977        worktree_b.read_with(cx_b, |worktree, _| {
1978            assert_eq!(
1979                worktree
1980                    .paths()
1981                    .map(|p| p.to_string_lossy())
1982                    .collect::<Vec<_>>(),
1983                [".zed.toml", "a.txt", "b.txt", "d.txt"]
1984            );
1985        });
1986
1987        project_b
1988            .update(cx_b, |project, cx| {
1989                project.delete_entry(entry.id, cx).unwrap()
1990            })
1991            .await
1992            .unwrap();
1993        worktree_a.read_with(cx_a, |worktree, _| {
1994            assert_eq!(
1995                worktree
1996                    .paths()
1997                    .map(|p| p.to_string_lossy())
1998                    .collect::<Vec<_>>(),
1999                [".zed.toml", "a.txt", "b.txt"]
2000            );
2001        });
2002        worktree_b.read_with(cx_b, |worktree, _| {
2003            assert_eq!(
2004                worktree
2005                    .paths()
2006                    .map(|p| p.to_string_lossy())
2007                    .collect::<Vec<_>>(),
2008                [".zed.toml", "a.txt", "b.txt"]
2009            );
2010        });
2011    }
2012
2013    #[gpui::test(iterations = 10)]
2014    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2015        cx_a.foreground().forbid_parking();
2016        let lang_registry = Arc::new(LanguageRegistry::test());
2017        let fs = FakeFs::new(cx_a.background());
2018
2019        // Connect to a server as 2 clients.
2020        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2021        let client_a = server.create_client(cx_a, "user_a").await;
2022        let client_b = server.create_client(cx_b, "user_b").await;
2023
2024        // Share a project as client A
2025        fs.insert_tree(
2026            "/dir",
2027            json!({
2028                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2029                "a.txt": "a-contents",
2030            }),
2031        )
2032        .await;
2033
2034        let project_a = cx_a.update(|cx| {
2035            Project::local(
2036                client_a.clone(),
2037                client_a.user_store.clone(),
2038                lang_registry.clone(),
2039                fs.clone(),
2040                cx,
2041            )
2042        });
2043        let (worktree_a, _) = project_a
2044            .update(cx_a, |p, cx| {
2045                p.find_or_create_local_worktree("/dir", true, cx)
2046            })
2047            .await
2048            .unwrap();
2049        worktree_a
2050            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2051            .await;
2052        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2053        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2054        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2055
2056        // Join that project as client B
2057        let project_b = Project::remote(
2058            project_id,
2059            client_b.clone(),
2060            client_b.user_store.clone(),
2061            lang_registry.clone(),
2062            fs.clone(),
2063            &mut cx_b.to_async(),
2064        )
2065        .await
2066        .unwrap();
2067
2068        // Open a buffer as client B
2069        let buffer_b = project_b
2070            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2071            .await
2072            .unwrap();
2073
2074        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2075        buffer_b.read_with(cx_b, |buf, _| {
2076            assert!(buf.is_dirty());
2077            assert!(!buf.has_conflict());
2078        });
2079
2080        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2081        buffer_b
2082            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2083            .await;
2084        buffer_b.read_with(cx_b, |buf, _| {
2085            assert!(!buf.has_conflict());
2086        });
2087
2088        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2089        buffer_b.read_with(cx_b, |buf, _| {
2090            assert!(buf.is_dirty());
2091            assert!(!buf.has_conflict());
2092        });
2093    }
2094
2095    #[gpui::test(iterations = 10)]
2096    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2097        cx_a.foreground().forbid_parking();
2098        let lang_registry = Arc::new(LanguageRegistry::test());
2099        let fs = FakeFs::new(cx_a.background());
2100
2101        // Connect to a server as 2 clients.
2102        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2103        let client_a = server.create_client(cx_a, "user_a").await;
2104        let client_b = server.create_client(cx_b, "user_b").await;
2105
2106        // Share a project as client A
2107        fs.insert_tree(
2108            "/dir",
2109            json!({
2110                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2111                "a.txt": "a-contents",
2112            }),
2113        )
2114        .await;
2115
2116        let project_a = cx_a.update(|cx| {
2117            Project::local(
2118                client_a.clone(),
2119                client_a.user_store.clone(),
2120                lang_registry.clone(),
2121                fs.clone(),
2122                cx,
2123            )
2124        });
2125        let (worktree_a, _) = project_a
2126            .update(cx_a, |p, cx| {
2127                p.find_or_create_local_worktree("/dir", true, cx)
2128            })
2129            .await
2130            .unwrap();
2131        worktree_a
2132            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2133            .await;
2134        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2135        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2136        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2137
2138        // Join that project as client B
2139        let project_b = Project::remote(
2140            project_id,
2141            client_b.clone(),
2142            client_b.user_store.clone(),
2143            lang_registry.clone(),
2144            fs.clone(),
2145            &mut cx_b.to_async(),
2146        )
2147        .await
2148        .unwrap();
2149        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2150
2151        // Open a buffer as client B
2152        let buffer_b = project_b
2153            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2154            .await
2155            .unwrap();
2156        buffer_b.read_with(cx_b, |buf, _| {
2157            assert!(!buf.is_dirty());
2158            assert!(!buf.has_conflict());
2159        });
2160
2161        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2162            .await
2163            .unwrap();
2164        buffer_b
2165            .condition(&cx_b, |buf, _| {
2166                buf.text() == "new contents" && !buf.is_dirty()
2167            })
2168            .await;
2169        buffer_b.read_with(cx_b, |buf, _| {
2170            assert!(!buf.has_conflict());
2171        });
2172    }
2173
2174    #[gpui::test(iterations = 10)]
2175    async fn test_editing_while_guest_opens_buffer(
2176        cx_a: &mut TestAppContext,
2177        cx_b: &mut TestAppContext,
2178    ) {
2179        cx_a.foreground().forbid_parking();
2180        let lang_registry = Arc::new(LanguageRegistry::test());
2181        let fs = FakeFs::new(cx_a.background());
2182
2183        // Connect to a server as 2 clients.
2184        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2185        let client_a = server.create_client(cx_a, "user_a").await;
2186        let client_b = server.create_client(cx_b, "user_b").await;
2187
2188        // Share a project as client A
2189        fs.insert_tree(
2190            "/dir",
2191            json!({
2192                ".zed.toml": r#"collaborators = ["user_b"]"#,
2193                "a.txt": "a-contents",
2194            }),
2195        )
2196        .await;
2197        let project_a = cx_a.update(|cx| {
2198            Project::local(
2199                client_a.clone(),
2200                client_a.user_store.clone(),
2201                lang_registry.clone(),
2202                fs.clone(),
2203                cx,
2204            )
2205        });
2206        let (worktree_a, _) = project_a
2207            .update(cx_a, |p, cx| {
2208                p.find_or_create_local_worktree("/dir", true, cx)
2209            })
2210            .await
2211            .unwrap();
2212        worktree_a
2213            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2214            .await;
2215        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2216        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2217        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2218
2219        // Join that project as client B
2220        let project_b = Project::remote(
2221            project_id,
2222            client_b.clone(),
2223            client_b.user_store.clone(),
2224            lang_registry.clone(),
2225            fs.clone(),
2226            &mut cx_b.to_async(),
2227        )
2228        .await
2229        .unwrap();
2230
2231        // Open a buffer as client A
2232        let buffer_a = project_a
2233            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2234            .await
2235            .unwrap();
2236
2237        // Start opening the same buffer as client B
2238        let buffer_b = cx_b
2239            .background()
2240            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2241
2242        // Edit the buffer as client A while client B is still opening it.
2243        cx_b.background().simulate_random_delay().await;
2244        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2245        cx_b.background().simulate_random_delay().await;
2246        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2247
2248        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2249        let buffer_b = buffer_b.await.unwrap();
2250        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2251    }
2252
2253    #[gpui::test(iterations = 10)]
2254    async fn test_leaving_worktree_while_opening_buffer(
2255        cx_a: &mut TestAppContext,
2256        cx_b: &mut TestAppContext,
2257    ) {
2258        cx_a.foreground().forbid_parking();
2259        let lang_registry = Arc::new(LanguageRegistry::test());
2260        let fs = FakeFs::new(cx_a.background());
2261
2262        // Connect to a server as 2 clients.
2263        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2264        let client_a = server.create_client(cx_a, "user_a").await;
2265        let client_b = server.create_client(cx_b, "user_b").await;
2266
2267        // Share a project as client A
2268        fs.insert_tree(
2269            "/dir",
2270            json!({
2271                ".zed.toml": r#"collaborators = ["user_b"]"#,
2272                "a.txt": "a-contents",
2273            }),
2274        )
2275        .await;
2276        let project_a = cx_a.update(|cx| {
2277            Project::local(
2278                client_a.clone(),
2279                client_a.user_store.clone(),
2280                lang_registry.clone(),
2281                fs.clone(),
2282                cx,
2283            )
2284        });
2285        let (worktree_a, _) = project_a
2286            .update(cx_a, |p, cx| {
2287                p.find_or_create_local_worktree("/dir", true, cx)
2288            })
2289            .await
2290            .unwrap();
2291        worktree_a
2292            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2293            .await;
2294        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2295        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2296        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2297
2298        // Join that project as client B
2299        let project_b = Project::remote(
2300            project_id,
2301            client_b.clone(),
2302            client_b.user_store.clone(),
2303            lang_registry.clone(),
2304            fs.clone(),
2305            &mut cx_b.to_async(),
2306        )
2307        .await
2308        .unwrap();
2309
2310        // See that a guest has joined as client A.
2311        project_a
2312            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2313            .await;
2314
2315        // Begin opening a buffer as client B, but leave the project before the open completes.
2316        let buffer_b = cx_b
2317            .background()
2318            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2319        cx_b.update(|_| drop(project_b));
2320        drop(buffer_b);
2321
2322        // See that the guest has left.
2323        project_a
2324            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2325            .await;
2326    }
2327
2328    #[gpui::test(iterations = 10)]
2329    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2330        cx_a.foreground().forbid_parking();
2331        let lang_registry = Arc::new(LanguageRegistry::test());
2332        let fs = FakeFs::new(cx_a.background());
2333
2334        // Connect to a server as 2 clients.
2335        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2336        let client_a = server.create_client(cx_a, "user_a").await;
2337        let client_b = server.create_client(cx_b, "user_b").await;
2338
2339        // Share a project as client A
2340        fs.insert_tree(
2341            "/a",
2342            json!({
2343                ".zed.toml": r#"collaborators = ["user_b"]"#,
2344                "a.txt": "a-contents",
2345                "b.txt": "b-contents",
2346            }),
2347        )
2348        .await;
2349        let project_a = cx_a.update(|cx| {
2350            Project::local(
2351                client_a.clone(),
2352                client_a.user_store.clone(),
2353                lang_registry.clone(),
2354                fs.clone(),
2355                cx,
2356            )
2357        });
2358        let (worktree_a, _) = project_a
2359            .update(cx_a, |p, cx| {
2360                p.find_or_create_local_worktree("/a", true, cx)
2361            })
2362            .await
2363            .unwrap();
2364        worktree_a
2365            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2366            .await;
2367        let project_id = project_a
2368            .update(cx_a, |project, _| project.next_remote_id())
2369            .await;
2370        project_a
2371            .update(cx_a, |project, cx| project.share(cx))
2372            .await
2373            .unwrap();
2374
2375        // Join that project as client B
2376        let _project_b = Project::remote(
2377            project_id,
2378            client_b.clone(),
2379            client_b.user_store.clone(),
2380            lang_registry.clone(),
2381            fs.clone(),
2382            &mut cx_b.to_async(),
2383        )
2384        .await
2385        .unwrap();
2386
2387        // Client A sees that a guest has joined.
2388        project_a
2389            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2390            .await;
2391
2392        // Drop client B's connection and ensure client A observes client B leaving the project.
2393        client_b.disconnect(&cx_b.to_async()).unwrap();
2394        project_a
2395            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2396            .await;
2397
2398        // Rejoin the project as client B
2399        let _project_b = Project::remote(
2400            project_id,
2401            client_b.clone(),
2402            client_b.user_store.clone(),
2403            lang_registry.clone(),
2404            fs.clone(),
2405            &mut cx_b.to_async(),
2406        )
2407        .await
2408        .unwrap();
2409
2410        // Client A sees that a guest has re-joined.
2411        project_a
2412            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2413            .await;
2414
2415        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2416        client_b.wait_for_current_user(cx_b).await;
2417        server.disconnect_client(client_b.current_user_id(cx_b));
2418        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2419        project_a
2420            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2421            .await;
2422    }
2423
2424    #[gpui::test(iterations = 10)]
2425    async fn test_collaborating_with_diagnostics(
2426        cx_a: &mut TestAppContext,
2427        cx_b: &mut TestAppContext,
2428    ) {
2429        cx_a.foreground().forbid_parking();
2430        let lang_registry = Arc::new(LanguageRegistry::test());
2431        let fs = FakeFs::new(cx_a.background());
2432
2433        // Set up a fake language server.
2434        let mut language = Language::new(
2435            LanguageConfig {
2436                name: "Rust".into(),
2437                path_suffixes: vec!["rs".to_string()],
2438                ..Default::default()
2439            },
2440            Some(tree_sitter_rust::language()),
2441        );
2442        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2443        lang_registry.add(Arc::new(language));
2444
2445        // Connect to a server as 2 clients.
2446        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2447        let client_a = server.create_client(cx_a, "user_a").await;
2448        let client_b = server.create_client(cx_b, "user_b").await;
2449
2450        // Share a project as client A
2451        fs.insert_tree(
2452            "/a",
2453            json!({
2454                ".zed.toml": r#"collaborators = ["user_b"]"#,
2455                "a.rs": "let one = two",
2456                "other.rs": "",
2457            }),
2458        )
2459        .await;
2460        let project_a = cx_a.update(|cx| {
2461            Project::local(
2462                client_a.clone(),
2463                client_a.user_store.clone(),
2464                lang_registry.clone(),
2465                fs.clone(),
2466                cx,
2467            )
2468        });
2469        let (worktree_a, _) = project_a
2470            .update(cx_a, |p, cx| {
2471                p.find_or_create_local_worktree("/a", true, cx)
2472            })
2473            .await
2474            .unwrap();
2475        worktree_a
2476            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2477            .await;
2478        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2479        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2480        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2481
2482        // Cause the language server to start.
2483        let _ = cx_a
2484            .background()
2485            .spawn(project_a.update(cx_a, |project, cx| {
2486                project.open_buffer(
2487                    ProjectPath {
2488                        worktree_id,
2489                        path: Path::new("other.rs").into(),
2490                    },
2491                    cx,
2492                )
2493            }))
2494            .await
2495            .unwrap();
2496
2497        // Simulate a language server reporting errors for a file.
2498        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2499        fake_language_server
2500            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2501            .await;
2502        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2503            lsp::PublishDiagnosticsParams {
2504                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2505                version: None,
2506                diagnostics: vec![lsp::Diagnostic {
2507                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2508                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2509                    message: "message 1".to_string(),
2510                    ..Default::default()
2511                }],
2512            },
2513        );
2514
2515        // Wait for server to see the diagnostics update.
2516        server
2517            .condition(|store| {
2518                let worktree = store
2519                    .project(project_id)
2520                    .unwrap()
2521                    .share
2522                    .as_ref()
2523                    .unwrap()
2524                    .worktrees
2525                    .get(&worktree_id.to_proto())
2526                    .unwrap();
2527
2528                !worktree.diagnostic_summaries.is_empty()
2529            })
2530            .await;
2531
2532        // Join the worktree as client B.
2533        let project_b = Project::remote(
2534            project_id,
2535            client_b.clone(),
2536            client_b.user_store.clone(),
2537            lang_registry.clone(),
2538            fs.clone(),
2539            &mut cx_b.to_async(),
2540        )
2541        .await
2542        .unwrap();
2543
2544        project_b.read_with(cx_b, |project, cx| {
2545            assert_eq!(
2546                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2547                &[(
2548                    ProjectPath {
2549                        worktree_id,
2550                        path: Arc::from(Path::new("a.rs")),
2551                    },
2552                    DiagnosticSummary {
2553                        error_count: 1,
2554                        warning_count: 0,
2555                        ..Default::default()
2556                    },
2557                )]
2558            )
2559        });
2560
2561        // Simulate a language server reporting more errors for a file.
2562        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2563            lsp::PublishDiagnosticsParams {
2564                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2565                version: None,
2566                diagnostics: vec![
2567                    lsp::Diagnostic {
2568                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2569                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2570                        message: "message 1".to_string(),
2571                        ..Default::default()
2572                    },
2573                    lsp::Diagnostic {
2574                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2575                        range: lsp::Range::new(
2576                            lsp::Position::new(0, 10),
2577                            lsp::Position::new(0, 13),
2578                        ),
2579                        message: "message 2".to_string(),
2580                        ..Default::default()
2581                    },
2582                ],
2583            },
2584        );
2585
2586        // Client b gets the updated summaries
2587        project_b
2588            .condition(&cx_b, |project, cx| {
2589                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2590                    == &[(
2591                        ProjectPath {
2592                            worktree_id,
2593                            path: Arc::from(Path::new("a.rs")),
2594                        },
2595                        DiagnosticSummary {
2596                            error_count: 1,
2597                            warning_count: 1,
2598                            ..Default::default()
2599                        },
2600                    )]
2601            })
2602            .await;
2603
2604        // Open the file with the errors on client B. They should be present.
2605        let buffer_b = cx_b
2606            .background()
2607            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2608            .await
2609            .unwrap();
2610
2611        buffer_b.read_with(cx_b, |buffer, _| {
2612            assert_eq!(
2613                buffer
2614                    .snapshot()
2615                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2616                    .map(|entry| entry)
2617                    .collect::<Vec<_>>(),
2618                &[
2619                    DiagnosticEntry {
2620                        range: Point::new(0, 4)..Point::new(0, 7),
2621                        diagnostic: Diagnostic {
2622                            group_id: 0,
2623                            message: "message 1".to_string(),
2624                            severity: lsp::DiagnosticSeverity::ERROR,
2625                            is_primary: true,
2626                            ..Default::default()
2627                        }
2628                    },
2629                    DiagnosticEntry {
2630                        range: Point::new(0, 10)..Point::new(0, 13),
2631                        diagnostic: Diagnostic {
2632                            group_id: 1,
2633                            severity: lsp::DiagnosticSeverity::WARNING,
2634                            message: "message 2".to_string(),
2635                            is_primary: true,
2636                            ..Default::default()
2637                        }
2638                    }
2639                ]
2640            );
2641        });
2642
2643        // Simulate a language server reporting no errors for a file.
2644        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2645            lsp::PublishDiagnosticsParams {
2646                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2647                version: None,
2648                diagnostics: vec![],
2649            },
2650        );
2651        project_a
2652            .condition(cx_a, |project, cx| {
2653                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2654            })
2655            .await;
2656        project_b
2657            .condition(cx_b, |project, cx| {
2658                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2659            })
2660            .await;
2661    }
2662
2663    #[gpui::test(iterations = 10)]
2664    async fn test_collaborating_with_completion(
2665        cx_a: &mut TestAppContext,
2666        cx_b: &mut TestAppContext,
2667    ) {
2668        cx_a.foreground().forbid_parking();
2669        let lang_registry = Arc::new(LanguageRegistry::test());
2670        let fs = FakeFs::new(cx_a.background());
2671
2672        // Set up a fake language server.
2673        let mut language = Language::new(
2674            LanguageConfig {
2675                name: "Rust".into(),
2676                path_suffixes: vec!["rs".to_string()],
2677                ..Default::default()
2678            },
2679            Some(tree_sitter_rust::language()),
2680        );
2681        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2682            capabilities: lsp::ServerCapabilities {
2683                completion_provider: Some(lsp::CompletionOptions {
2684                    trigger_characters: Some(vec![".".to_string()]),
2685                    ..Default::default()
2686                }),
2687                ..Default::default()
2688            },
2689            ..Default::default()
2690        });
2691        lang_registry.add(Arc::new(language));
2692
2693        // Connect to a server as 2 clients.
2694        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2695        let client_a = server.create_client(cx_a, "user_a").await;
2696        let client_b = server.create_client(cx_b, "user_b").await;
2697
2698        // Share a project as client A
2699        fs.insert_tree(
2700            "/a",
2701            json!({
2702                ".zed.toml": r#"collaborators = ["user_b"]"#,
2703                "main.rs": "fn main() { a }",
2704                "other.rs": "",
2705            }),
2706        )
2707        .await;
2708        let project_a = cx_a.update(|cx| {
2709            Project::local(
2710                client_a.clone(),
2711                client_a.user_store.clone(),
2712                lang_registry.clone(),
2713                fs.clone(),
2714                cx,
2715            )
2716        });
2717        let (worktree_a, _) = project_a
2718            .update(cx_a, |p, cx| {
2719                p.find_or_create_local_worktree("/a", true, cx)
2720            })
2721            .await
2722            .unwrap();
2723        worktree_a
2724            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2725            .await;
2726        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2727        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2728        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2729
2730        // Join the worktree as client B.
2731        let project_b = Project::remote(
2732            project_id,
2733            client_b.clone(),
2734            client_b.user_store.clone(),
2735            lang_registry.clone(),
2736            fs.clone(),
2737            &mut cx_b.to_async(),
2738        )
2739        .await
2740        .unwrap();
2741
2742        // Open a file in an editor as the guest.
2743        let buffer_b = project_b
2744            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2745            .await
2746            .unwrap();
2747        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2748        let editor_b = cx_b.add_view(window_b, |cx| {
2749            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2750        });
2751
2752        let fake_language_server = fake_language_servers.next().await.unwrap();
2753        buffer_b
2754            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2755            .await;
2756
2757        // Type a completion trigger character as the guest.
2758        editor_b.update(cx_b, |editor, cx| {
2759            editor.select_ranges([13..13], None, cx);
2760            editor.handle_input(&Input(".".into()), cx);
2761            cx.focus(&editor_b);
2762        });
2763
2764        // Receive a completion request as the host's language server.
2765        // Return some completions from the host's language server.
2766        cx_a.foreground().start_waiting();
2767        fake_language_server
2768            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2769                assert_eq!(
2770                    params.text_document_position.text_document.uri,
2771                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2772                );
2773                assert_eq!(
2774                    params.text_document_position.position,
2775                    lsp::Position::new(0, 14),
2776                );
2777
2778                Ok(Some(lsp::CompletionResponse::Array(vec![
2779                    lsp::CompletionItem {
2780                        label: "first_method(…)".into(),
2781                        detail: Some("fn(&mut self, B) -> C".into()),
2782                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2783                            new_text: "first_method($1)".to_string(),
2784                            range: lsp::Range::new(
2785                                lsp::Position::new(0, 14),
2786                                lsp::Position::new(0, 14),
2787                            ),
2788                        })),
2789                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2790                        ..Default::default()
2791                    },
2792                    lsp::CompletionItem {
2793                        label: "second_method(…)".into(),
2794                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2795                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2796                            new_text: "second_method()".to_string(),
2797                            range: lsp::Range::new(
2798                                lsp::Position::new(0, 14),
2799                                lsp::Position::new(0, 14),
2800                            ),
2801                        })),
2802                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2803                        ..Default::default()
2804                    },
2805                ])))
2806            })
2807            .next()
2808            .await
2809            .unwrap();
2810        cx_a.foreground().finish_waiting();
2811
2812        // Open the buffer on the host.
2813        let buffer_a = project_a
2814            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2815            .await
2816            .unwrap();
2817        buffer_a
2818            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2819            .await;
2820
2821        // Confirm a completion on the guest.
2822        editor_b
2823            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2824            .await;
2825        editor_b.update(cx_b, |editor, cx| {
2826            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2827            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2828        });
2829
2830        // Return a resolved completion from the host's language server.
2831        // The resolved completion has an additional text edit.
2832        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2833            |params, _| async move {
2834                assert_eq!(params.label, "first_method(…)");
2835                Ok(lsp::CompletionItem {
2836                    label: "first_method(…)".into(),
2837                    detail: Some("fn(&mut self, B) -> C".into()),
2838                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2839                        new_text: "first_method($1)".to_string(),
2840                        range: lsp::Range::new(
2841                            lsp::Position::new(0, 14),
2842                            lsp::Position::new(0, 14),
2843                        ),
2844                    })),
2845                    additional_text_edits: Some(vec![lsp::TextEdit {
2846                        new_text: "use d::SomeTrait;\n".to_string(),
2847                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2848                    }]),
2849                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2850                    ..Default::default()
2851                })
2852            },
2853        );
2854
2855        // The additional edit is applied.
2856        buffer_a
2857            .condition(&cx_a, |buffer, _| {
2858                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2859            })
2860            .await;
2861        buffer_b
2862            .condition(&cx_b, |buffer, _| {
2863                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2864            })
2865            .await;
2866    }
2867
2868    #[gpui::test(iterations = 10)]
2869    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2870        cx_a.foreground().forbid_parking();
2871        let lang_registry = Arc::new(LanguageRegistry::test());
2872        let fs = FakeFs::new(cx_a.background());
2873
2874        // Connect to a server as 2 clients.
2875        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2876        let client_a = server.create_client(cx_a, "user_a").await;
2877        let client_b = server.create_client(cx_b, "user_b").await;
2878
2879        // Share a project as client A
2880        fs.insert_tree(
2881            "/a",
2882            json!({
2883                ".zed.toml": r#"collaborators = ["user_b"]"#,
2884                "a.rs": "let one = 1;",
2885            }),
2886        )
2887        .await;
2888        let project_a = cx_a.update(|cx| {
2889            Project::local(
2890                client_a.clone(),
2891                client_a.user_store.clone(),
2892                lang_registry.clone(),
2893                fs.clone(),
2894                cx,
2895            )
2896        });
2897        let (worktree_a, _) = project_a
2898            .update(cx_a, |p, cx| {
2899                p.find_or_create_local_worktree("/a", true, cx)
2900            })
2901            .await
2902            .unwrap();
2903        worktree_a
2904            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2905            .await;
2906        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2907        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2908        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2909        let buffer_a = project_a
2910            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2911            .await
2912            .unwrap();
2913
2914        // Join the worktree as client B.
2915        let project_b = Project::remote(
2916            project_id,
2917            client_b.clone(),
2918            client_b.user_store.clone(),
2919            lang_registry.clone(),
2920            fs.clone(),
2921            &mut cx_b.to_async(),
2922        )
2923        .await
2924        .unwrap();
2925
2926        let buffer_b = cx_b
2927            .background()
2928            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2929            .await
2930            .unwrap();
2931        buffer_b.update(cx_b, |buffer, cx| {
2932            buffer.edit([(4..7, "six")], cx);
2933            buffer.edit([(10..11, "6")], cx);
2934            assert_eq!(buffer.text(), "let six = 6;");
2935            assert!(buffer.is_dirty());
2936            assert!(!buffer.has_conflict());
2937        });
2938        buffer_a
2939            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
2940            .await;
2941
2942        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
2943            .await
2944            .unwrap();
2945        buffer_a
2946            .condition(cx_a, |buffer, _| buffer.has_conflict())
2947            .await;
2948        buffer_b
2949            .condition(cx_b, |buffer, _| buffer.has_conflict())
2950            .await;
2951
2952        project_b
2953            .update(cx_b, |project, cx| {
2954                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
2955            })
2956            .await
2957            .unwrap();
2958        buffer_a.read_with(cx_a, |buffer, _| {
2959            assert_eq!(buffer.text(), "let seven = 7;");
2960            assert!(!buffer.is_dirty());
2961            assert!(!buffer.has_conflict());
2962        });
2963        buffer_b.read_with(cx_b, |buffer, _| {
2964            assert_eq!(buffer.text(), "let seven = 7;");
2965            assert!(!buffer.is_dirty());
2966            assert!(!buffer.has_conflict());
2967        });
2968
2969        buffer_a.update(cx_a, |buffer, cx| {
2970            // Undoing on the host is a no-op when the reload was initiated by the guest.
2971            buffer.undo(cx);
2972            assert_eq!(buffer.text(), "let seven = 7;");
2973            assert!(!buffer.is_dirty());
2974            assert!(!buffer.has_conflict());
2975        });
2976        buffer_b.update(cx_b, |buffer, cx| {
2977            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
2978            buffer.undo(cx);
2979            assert_eq!(buffer.text(), "let six = 6;");
2980            assert!(buffer.is_dirty());
2981            assert!(!buffer.has_conflict());
2982        });
2983    }
2984
2985    #[gpui::test(iterations = 10)]
2986    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2987        cx_a.foreground().forbid_parking();
2988        let lang_registry = Arc::new(LanguageRegistry::test());
2989        let fs = FakeFs::new(cx_a.background());
2990
2991        // Set up a fake language server.
2992        let mut language = Language::new(
2993            LanguageConfig {
2994                name: "Rust".into(),
2995                path_suffixes: vec!["rs".to_string()],
2996                ..Default::default()
2997            },
2998            Some(tree_sitter_rust::language()),
2999        );
3000        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3001        lang_registry.add(Arc::new(language));
3002
3003        // Connect to a server as 2 clients.
3004        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3005        let client_a = server.create_client(cx_a, "user_a").await;
3006        let client_b = server.create_client(cx_b, "user_b").await;
3007
3008        // Share a project as client A
3009        fs.insert_tree(
3010            "/a",
3011            json!({
3012                ".zed.toml": r#"collaborators = ["user_b"]"#,
3013                "a.rs": "let one = two",
3014            }),
3015        )
3016        .await;
3017        let project_a = cx_a.update(|cx| {
3018            Project::local(
3019                client_a.clone(),
3020                client_a.user_store.clone(),
3021                lang_registry.clone(),
3022                fs.clone(),
3023                cx,
3024            )
3025        });
3026        let (worktree_a, _) = project_a
3027            .update(cx_a, |p, cx| {
3028                p.find_or_create_local_worktree("/a", true, cx)
3029            })
3030            .await
3031            .unwrap();
3032        worktree_a
3033            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3034            .await;
3035        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3036        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3037        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3038
3039        // Join the worktree as client B.
3040        let project_b = Project::remote(
3041            project_id,
3042            client_b.clone(),
3043            client_b.user_store.clone(),
3044            lang_registry.clone(),
3045            fs.clone(),
3046            &mut cx_b.to_async(),
3047        )
3048        .await
3049        .unwrap();
3050
3051        let buffer_b = cx_b
3052            .background()
3053            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3054            .await
3055            .unwrap();
3056
3057        let fake_language_server = fake_language_servers.next().await.unwrap();
3058        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3059            Ok(Some(vec![
3060                lsp::TextEdit {
3061                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3062                    new_text: "h".to_string(),
3063                },
3064                lsp::TextEdit {
3065                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3066                    new_text: "y".to_string(),
3067                },
3068            ]))
3069        });
3070
3071        project_b
3072            .update(cx_b, |project, cx| {
3073                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3074            })
3075            .await
3076            .unwrap();
3077        assert_eq!(
3078            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3079            "let honey = two"
3080        );
3081    }
3082
3083    #[gpui::test(iterations = 10)]
3084    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3085        cx_a.foreground().forbid_parking();
3086        let lang_registry = Arc::new(LanguageRegistry::test());
3087        let fs = FakeFs::new(cx_a.background());
3088        fs.insert_tree(
3089            "/root-1",
3090            json!({
3091                ".zed.toml": r#"collaborators = ["user_b"]"#,
3092                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3093            }),
3094        )
3095        .await;
3096        fs.insert_tree(
3097            "/root-2",
3098            json!({
3099                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3100            }),
3101        )
3102        .await;
3103
3104        // Set up a fake language server.
3105        let mut language = Language::new(
3106            LanguageConfig {
3107                name: "Rust".into(),
3108                path_suffixes: vec!["rs".to_string()],
3109                ..Default::default()
3110            },
3111            Some(tree_sitter_rust::language()),
3112        );
3113        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3114        lang_registry.add(Arc::new(language));
3115
3116        // Connect to a server as 2 clients.
3117        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3118        let client_a = server.create_client(cx_a, "user_a").await;
3119        let client_b = server.create_client(cx_b, "user_b").await;
3120
3121        // Share a project as client A
3122        let project_a = cx_a.update(|cx| {
3123            Project::local(
3124                client_a.clone(),
3125                client_a.user_store.clone(),
3126                lang_registry.clone(),
3127                fs.clone(),
3128                cx,
3129            )
3130        });
3131        let (worktree_a, _) = project_a
3132            .update(cx_a, |p, cx| {
3133                p.find_or_create_local_worktree("/root-1", true, cx)
3134            })
3135            .await
3136            .unwrap();
3137        worktree_a
3138            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3139            .await;
3140        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3141        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3142        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3143
3144        // Join the worktree as client B.
3145        let project_b = Project::remote(
3146            project_id,
3147            client_b.clone(),
3148            client_b.user_store.clone(),
3149            lang_registry.clone(),
3150            fs.clone(),
3151            &mut cx_b.to_async(),
3152        )
3153        .await
3154        .unwrap();
3155
3156        // Open the file on client B.
3157        let buffer_b = cx_b
3158            .background()
3159            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3160            .await
3161            .unwrap();
3162
3163        // Request the definition of a symbol as the guest.
3164        let fake_language_server = fake_language_servers.next().await.unwrap();
3165        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3166            |_, _| async move {
3167                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3168                    lsp::Location::new(
3169                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3170                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3171                    ),
3172                )))
3173            },
3174        );
3175
3176        let definitions_1 = project_b
3177            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3178            .await
3179            .unwrap();
3180        cx_b.read(|cx| {
3181            assert_eq!(definitions_1.len(), 1);
3182            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3183            let target_buffer = definitions_1[0].buffer.read(cx);
3184            assert_eq!(
3185                target_buffer.text(),
3186                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3187            );
3188            assert_eq!(
3189                definitions_1[0].range.to_point(target_buffer),
3190                Point::new(0, 6)..Point::new(0, 9)
3191            );
3192        });
3193
3194        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3195        // the previous call to `definition`.
3196        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3197            |_, _| async move {
3198                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3199                    lsp::Location::new(
3200                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3201                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3202                    ),
3203                )))
3204            },
3205        );
3206
3207        let definitions_2 = project_b
3208            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3209            .await
3210            .unwrap();
3211        cx_b.read(|cx| {
3212            assert_eq!(definitions_2.len(), 1);
3213            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3214            let target_buffer = definitions_2[0].buffer.read(cx);
3215            assert_eq!(
3216                target_buffer.text(),
3217                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3218            );
3219            assert_eq!(
3220                definitions_2[0].range.to_point(target_buffer),
3221                Point::new(1, 6)..Point::new(1, 11)
3222            );
3223        });
3224        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3225    }
3226
3227    #[gpui::test(iterations = 10)]
3228    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3229        cx_a.foreground().forbid_parking();
3230        let lang_registry = Arc::new(LanguageRegistry::test());
3231        let fs = FakeFs::new(cx_a.background());
3232        fs.insert_tree(
3233            "/root-1",
3234            json!({
3235                ".zed.toml": r#"collaborators = ["user_b"]"#,
3236                "one.rs": "const ONE: usize = 1;",
3237                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3238            }),
3239        )
3240        .await;
3241        fs.insert_tree(
3242            "/root-2",
3243            json!({
3244                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3245            }),
3246        )
3247        .await;
3248
3249        // Set up a fake language server.
3250        let mut language = Language::new(
3251            LanguageConfig {
3252                name: "Rust".into(),
3253                path_suffixes: vec!["rs".to_string()],
3254                ..Default::default()
3255            },
3256            Some(tree_sitter_rust::language()),
3257        );
3258        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3259        lang_registry.add(Arc::new(language));
3260
3261        // Connect to a server as 2 clients.
3262        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3263        let client_a = server.create_client(cx_a, "user_a").await;
3264        let client_b = server.create_client(cx_b, "user_b").await;
3265
3266        // Share a project as client A
3267        let project_a = cx_a.update(|cx| {
3268            Project::local(
3269                client_a.clone(),
3270                client_a.user_store.clone(),
3271                lang_registry.clone(),
3272                fs.clone(),
3273                cx,
3274            )
3275        });
3276        let (worktree_a, _) = project_a
3277            .update(cx_a, |p, cx| {
3278                p.find_or_create_local_worktree("/root-1", true, cx)
3279            })
3280            .await
3281            .unwrap();
3282        worktree_a
3283            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3284            .await;
3285        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3286        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3287        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3288
3289        // Join the worktree as client B.
3290        let project_b = Project::remote(
3291            project_id,
3292            client_b.clone(),
3293            client_b.user_store.clone(),
3294            lang_registry.clone(),
3295            fs.clone(),
3296            &mut cx_b.to_async(),
3297        )
3298        .await
3299        .unwrap();
3300
3301        // Open the file on client B.
3302        let buffer_b = cx_b
3303            .background()
3304            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3305            .await
3306            .unwrap();
3307
3308        // Request references to a symbol as the guest.
3309        let fake_language_server = fake_language_servers.next().await.unwrap();
3310        fake_language_server.handle_request::<lsp::request::References, _, _>(
3311            |params, _| async move {
3312                assert_eq!(
3313                    params.text_document_position.text_document.uri.as_str(),
3314                    "file:///root-1/one.rs"
3315                );
3316                Ok(Some(vec![
3317                    lsp::Location {
3318                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3319                        range: lsp::Range::new(
3320                            lsp::Position::new(0, 24),
3321                            lsp::Position::new(0, 27),
3322                        ),
3323                    },
3324                    lsp::Location {
3325                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3326                        range: lsp::Range::new(
3327                            lsp::Position::new(0, 35),
3328                            lsp::Position::new(0, 38),
3329                        ),
3330                    },
3331                    lsp::Location {
3332                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3333                        range: lsp::Range::new(
3334                            lsp::Position::new(0, 37),
3335                            lsp::Position::new(0, 40),
3336                        ),
3337                    },
3338                ]))
3339            },
3340        );
3341
3342        let references = project_b
3343            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3344            .await
3345            .unwrap();
3346        cx_b.read(|cx| {
3347            assert_eq!(references.len(), 3);
3348            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3349
3350            let two_buffer = references[0].buffer.read(cx);
3351            let three_buffer = references[2].buffer.read(cx);
3352            assert_eq!(
3353                two_buffer.file().unwrap().path().as_ref(),
3354                Path::new("two.rs")
3355            );
3356            assert_eq!(references[1].buffer, references[0].buffer);
3357            assert_eq!(
3358                three_buffer.file().unwrap().full_path(cx),
3359                Path::new("three.rs")
3360            );
3361
3362            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3363            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3364            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3365        });
3366    }
3367
3368    #[gpui::test(iterations = 10)]
3369    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3370        cx_a.foreground().forbid_parking();
3371        let lang_registry = Arc::new(LanguageRegistry::test());
3372        let fs = FakeFs::new(cx_a.background());
3373        fs.insert_tree(
3374            "/root-1",
3375            json!({
3376                ".zed.toml": r#"collaborators = ["user_b"]"#,
3377                "a": "hello world",
3378                "b": "goodnight moon",
3379                "c": "a world of goo",
3380                "d": "world champion of clown world",
3381            }),
3382        )
3383        .await;
3384        fs.insert_tree(
3385            "/root-2",
3386            json!({
3387                "e": "disney world is fun",
3388            }),
3389        )
3390        .await;
3391
3392        // Connect to a server as 2 clients.
3393        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3394        let client_a = server.create_client(cx_a, "user_a").await;
3395        let client_b = server.create_client(cx_b, "user_b").await;
3396
3397        // Share a project as client A
3398        let project_a = cx_a.update(|cx| {
3399            Project::local(
3400                client_a.clone(),
3401                client_a.user_store.clone(),
3402                lang_registry.clone(),
3403                fs.clone(),
3404                cx,
3405            )
3406        });
3407        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3408
3409        let (worktree_1, _) = project_a
3410            .update(cx_a, |p, cx| {
3411                p.find_or_create_local_worktree("/root-1", true, cx)
3412            })
3413            .await
3414            .unwrap();
3415        worktree_1
3416            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3417            .await;
3418        let (worktree_2, _) = project_a
3419            .update(cx_a, |p, cx| {
3420                p.find_or_create_local_worktree("/root-2", true, cx)
3421            })
3422            .await
3423            .unwrap();
3424        worktree_2
3425            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3426            .await;
3427
3428        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3429
3430        // Join the worktree as client B.
3431        let project_b = Project::remote(
3432            project_id,
3433            client_b.clone(),
3434            client_b.user_store.clone(),
3435            lang_registry.clone(),
3436            fs.clone(),
3437            &mut cx_b.to_async(),
3438        )
3439        .await
3440        .unwrap();
3441
3442        let results = project_b
3443            .update(cx_b, |project, cx| {
3444                project.search(SearchQuery::text("world", false, false), cx)
3445            })
3446            .await
3447            .unwrap();
3448
3449        let mut ranges_by_path = results
3450            .into_iter()
3451            .map(|(buffer, ranges)| {
3452                buffer.read_with(cx_b, |buffer, cx| {
3453                    let path = buffer.file().unwrap().full_path(cx);
3454                    let offset_ranges = ranges
3455                        .into_iter()
3456                        .map(|range| range.to_offset(buffer))
3457                        .collect::<Vec<_>>();
3458                    (path, offset_ranges)
3459                })
3460            })
3461            .collect::<Vec<_>>();
3462        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3463
3464        assert_eq!(
3465            ranges_by_path,
3466            &[
3467                (PathBuf::from("root-1/a"), vec![6..11]),
3468                (PathBuf::from("root-1/c"), vec![2..7]),
3469                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3470                (PathBuf::from("root-2/e"), vec![7..12]),
3471            ]
3472        );
3473    }
3474
3475    #[gpui::test(iterations = 10)]
3476    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3477        cx_a.foreground().forbid_parking();
3478        let lang_registry = Arc::new(LanguageRegistry::test());
3479        let fs = FakeFs::new(cx_a.background());
3480        fs.insert_tree(
3481            "/root-1",
3482            json!({
3483                ".zed.toml": r#"collaborators = ["user_b"]"#,
3484                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3485            }),
3486        )
3487        .await;
3488
3489        // Set up a fake language server.
3490        let mut language = Language::new(
3491            LanguageConfig {
3492                name: "Rust".into(),
3493                path_suffixes: vec!["rs".to_string()],
3494                ..Default::default()
3495            },
3496            Some(tree_sitter_rust::language()),
3497        );
3498        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3499        lang_registry.add(Arc::new(language));
3500
3501        // Connect to a server as 2 clients.
3502        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3503        let client_a = server.create_client(cx_a, "user_a").await;
3504        let client_b = server.create_client(cx_b, "user_b").await;
3505
3506        // Share a project as client A
3507        let project_a = cx_a.update(|cx| {
3508            Project::local(
3509                client_a.clone(),
3510                client_a.user_store.clone(),
3511                lang_registry.clone(),
3512                fs.clone(),
3513                cx,
3514            )
3515        });
3516        let (worktree_a, _) = project_a
3517            .update(cx_a, |p, cx| {
3518                p.find_or_create_local_worktree("/root-1", true, cx)
3519            })
3520            .await
3521            .unwrap();
3522        worktree_a
3523            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3524            .await;
3525        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3526        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3527        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3528
3529        // Join the worktree as client B.
3530        let project_b = Project::remote(
3531            project_id,
3532            client_b.clone(),
3533            client_b.user_store.clone(),
3534            lang_registry.clone(),
3535            fs.clone(),
3536            &mut cx_b.to_async(),
3537        )
3538        .await
3539        .unwrap();
3540
3541        // Open the file on client B.
3542        let buffer_b = cx_b
3543            .background()
3544            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3545            .await
3546            .unwrap();
3547
3548        // Request document highlights as the guest.
3549        let fake_language_server = fake_language_servers.next().await.unwrap();
3550        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3551            |params, _| async move {
3552                assert_eq!(
3553                    params
3554                        .text_document_position_params
3555                        .text_document
3556                        .uri
3557                        .as_str(),
3558                    "file:///root-1/main.rs"
3559                );
3560                assert_eq!(
3561                    params.text_document_position_params.position,
3562                    lsp::Position::new(0, 34)
3563                );
3564                Ok(Some(vec![
3565                    lsp::DocumentHighlight {
3566                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3567                        range: lsp::Range::new(
3568                            lsp::Position::new(0, 10),
3569                            lsp::Position::new(0, 16),
3570                        ),
3571                    },
3572                    lsp::DocumentHighlight {
3573                        kind: Some(lsp::DocumentHighlightKind::READ),
3574                        range: lsp::Range::new(
3575                            lsp::Position::new(0, 32),
3576                            lsp::Position::new(0, 38),
3577                        ),
3578                    },
3579                    lsp::DocumentHighlight {
3580                        kind: Some(lsp::DocumentHighlightKind::READ),
3581                        range: lsp::Range::new(
3582                            lsp::Position::new(0, 41),
3583                            lsp::Position::new(0, 47),
3584                        ),
3585                    },
3586                ]))
3587            },
3588        );
3589
3590        let highlights = project_b
3591            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3592            .await
3593            .unwrap();
3594        buffer_b.read_with(cx_b, |buffer, _| {
3595            let snapshot = buffer.snapshot();
3596
3597            let highlights = highlights
3598                .into_iter()
3599                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3600                .collect::<Vec<_>>();
3601            assert_eq!(
3602                highlights,
3603                &[
3604                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3605                    (lsp::DocumentHighlightKind::READ, 32..38),
3606                    (lsp::DocumentHighlightKind::READ, 41..47)
3607                ]
3608            )
3609        });
3610    }
3611
3612    #[gpui::test(iterations = 10)]
3613    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3614        cx_a.foreground().forbid_parking();
3615        let lang_registry = Arc::new(LanguageRegistry::test());
3616        let fs = FakeFs::new(cx_a.background());
3617        fs.insert_tree(
3618            "/code",
3619            json!({
3620                "crate-1": {
3621                    ".zed.toml": r#"collaborators = ["user_b"]"#,
3622                    "one.rs": "const ONE: usize = 1;",
3623                },
3624                "crate-2": {
3625                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3626                },
3627                "private": {
3628                    "passwords.txt": "the-password",
3629                }
3630            }),
3631        )
3632        .await;
3633
3634        // Set up a fake language server.
3635        let mut language = Language::new(
3636            LanguageConfig {
3637                name: "Rust".into(),
3638                path_suffixes: vec!["rs".to_string()],
3639                ..Default::default()
3640            },
3641            Some(tree_sitter_rust::language()),
3642        );
3643        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3644        lang_registry.add(Arc::new(language));
3645
3646        // Connect to a server as 2 clients.
3647        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3648        let client_a = server.create_client(cx_a, "user_a").await;
3649        let client_b = server.create_client(cx_b, "user_b").await;
3650
3651        // Share a project as client A
3652        let project_a = cx_a.update(|cx| {
3653            Project::local(
3654                client_a.clone(),
3655                client_a.user_store.clone(),
3656                lang_registry.clone(),
3657                fs.clone(),
3658                cx,
3659            )
3660        });
3661        let (worktree_a, _) = project_a
3662            .update(cx_a, |p, cx| {
3663                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3664            })
3665            .await
3666            .unwrap();
3667        worktree_a
3668            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3669            .await;
3670        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3671        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3672        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3673
3674        // Join the worktree as client B.
3675        let project_b = Project::remote(
3676            project_id,
3677            client_b.clone(),
3678            client_b.user_store.clone(),
3679            lang_registry.clone(),
3680            fs.clone(),
3681            &mut cx_b.to_async(),
3682        )
3683        .await
3684        .unwrap();
3685
3686        // Cause the language server to start.
3687        let _buffer = cx_b
3688            .background()
3689            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3690            .await
3691            .unwrap();
3692
3693        let fake_language_server = fake_language_servers.next().await.unwrap();
3694        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3695            |_, _| async move {
3696                #[allow(deprecated)]
3697                Ok(Some(vec![lsp::SymbolInformation {
3698                    name: "TWO".into(),
3699                    location: lsp::Location {
3700                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3701                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3702                    },
3703                    kind: lsp::SymbolKind::CONSTANT,
3704                    tags: None,
3705                    container_name: None,
3706                    deprecated: None,
3707                }]))
3708            },
3709        );
3710
3711        // Request the definition of a symbol as the guest.
3712        let symbols = project_b
3713            .update(cx_b, |p, cx| p.symbols("two", cx))
3714            .await
3715            .unwrap();
3716        assert_eq!(symbols.len(), 1);
3717        assert_eq!(symbols[0].name, "TWO");
3718
3719        // Open one of the returned symbols.
3720        let buffer_b_2 = project_b
3721            .update(cx_b, |project, cx| {
3722                project.open_buffer_for_symbol(&symbols[0], cx)
3723            })
3724            .await
3725            .unwrap();
3726        buffer_b_2.read_with(cx_b, |buffer, _| {
3727            assert_eq!(
3728                buffer.file().unwrap().path().as_ref(),
3729                Path::new("../crate-2/two.rs")
3730            );
3731        });
3732
3733        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3734        let mut fake_symbol = symbols[0].clone();
3735        fake_symbol.path = Path::new("/code/secrets").into();
3736        let error = project_b
3737            .update(cx_b, |project, cx| {
3738                project.open_buffer_for_symbol(&fake_symbol, cx)
3739            })
3740            .await
3741            .unwrap_err();
3742        assert!(error.to_string().contains("invalid symbol signature"));
3743    }
3744
3745    #[gpui::test(iterations = 10)]
3746    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3747        cx_a: &mut TestAppContext,
3748        cx_b: &mut TestAppContext,
3749        mut rng: StdRng,
3750    ) {
3751        cx_a.foreground().forbid_parking();
3752        let lang_registry = Arc::new(LanguageRegistry::test());
3753        let fs = FakeFs::new(cx_a.background());
3754        fs.insert_tree(
3755            "/root",
3756            json!({
3757                ".zed.toml": r#"collaborators = ["user_b"]"#,
3758                "a.rs": "const ONE: usize = b::TWO;",
3759                "b.rs": "const TWO: usize = 2",
3760            }),
3761        )
3762        .await;
3763
3764        // Set up a fake language server.
3765        let mut language = Language::new(
3766            LanguageConfig {
3767                name: "Rust".into(),
3768                path_suffixes: vec!["rs".to_string()],
3769                ..Default::default()
3770            },
3771            Some(tree_sitter_rust::language()),
3772        );
3773        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3774        lang_registry.add(Arc::new(language));
3775
3776        // Connect to a server as 2 clients.
3777        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3778        let client_a = server.create_client(cx_a, "user_a").await;
3779        let client_b = server.create_client(cx_b, "user_b").await;
3780
3781        // Share a project as client A
3782        let project_a = cx_a.update(|cx| {
3783            Project::local(
3784                client_a.clone(),
3785                client_a.user_store.clone(),
3786                lang_registry.clone(),
3787                fs.clone(),
3788                cx,
3789            )
3790        });
3791
3792        let (worktree_a, _) = project_a
3793            .update(cx_a, |p, cx| {
3794                p.find_or_create_local_worktree("/root", true, cx)
3795            })
3796            .await
3797            .unwrap();
3798        worktree_a
3799            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3800            .await;
3801        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3802        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3803        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3804
3805        // Join the worktree as client B.
3806        let project_b = Project::remote(
3807            project_id,
3808            client_b.clone(),
3809            client_b.user_store.clone(),
3810            lang_registry.clone(),
3811            fs.clone(),
3812            &mut cx_b.to_async(),
3813        )
3814        .await
3815        .unwrap();
3816
3817        let buffer_b1 = cx_b
3818            .background()
3819            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3820            .await
3821            .unwrap();
3822
3823        let fake_language_server = fake_language_servers.next().await.unwrap();
3824        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3825            |_, _| async move {
3826                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3827                    lsp::Location::new(
3828                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
3829                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3830                    ),
3831                )))
3832            },
3833        );
3834
3835        let definitions;
3836        let buffer_b2;
3837        if rng.gen() {
3838            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3839            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3840        } else {
3841            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3842            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3843        }
3844
3845        let buffer_b2 = buffer_b2.await.unwrap();
3846        let definitions = definitions.await.unwrap();
3847        assert_eq!(definitions.len(), 1);
3848        assert_eq!(definitions[0].buffer, buffer_b2);
3849    }
3850
3851    #[gpui::test(iterations = 10)]
3852    async fn test_collaborating_with_code_actions(
3853        cx_a: &mut TestAppContext,
3854        cx_b: &mut TestAppContext,
3855    ) {
3856        cx_a.foreground().forbid_parking();
3857        let lang_registry = Arc::new(LanguageRegistry::test());
3858        let fs = FakeFs::new(cx_a.background());
3859        cx_b.update(|cx| editor::init(cx));
3860
3861        // Set up a fake language server.
3862        let mut language = Language::new(
3863            LanguageConfig {
3864                name: "Rust".into(),
3865                path_suffixes: vec!["rs".to_string()],
3866                ..Default::default()
3867            },
3868            Some(tree_sitter_rust::language()),
3869        );
3870        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3871        lang_registry.add(Arc::new(language));
3872
3873        // Connect to a server as 2 clients.
3874        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3875        let client_a = server.create_client(cx_a, "user_a").await;
3876        let client_b = server.create_client(cx_b, "user_b").await;
3877
3878        // Share a project as client A
3879        fs.insert_tree(
3880            "/a",
3881            json!({
3882                ".zed.toml": r#"collaborators = ["user_b"]"#,
3883                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3884                "other.rs": "pub fn foo() -> usize { 4 }",
3885            }),
3886        )
3887        .await;
3888        let project_a = cx_a.update(|cx| {
3889            Project::local(
3890                client_a.clone(),
3891                client_a.user_store.clone(),
3892                lang_registry.clone(),
3893                fs.clone(),
3894                cx,
3895            )
3896        });
3897        let (worktree_a, _) = project_a
3898            .update(cx_a, |p, cx| {
3899                p.find_or_create_local_worktree("/a", true, cx)
3900            })
3901            .await
3902            .unwrap();
3903        worktree_a
3904            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3905            .await;
3906        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3907        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3908        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3909
3910        // Join the worktree as client B.
3911        let project_b = Project::remote(
3912            project_id,
3913            client_b.clone(),
3914            client_b.user_store.clone(),
3915            lang_registry.clone(),
3916            fs.clone(),
3917            &mut cx_b.to_async(),
3918        )
3919        .await
3920        .unwrap();
3921        let mut params = cx_b.update(WorkspaceParams::test);
3922        params.languages = lang_registry.clone();
3923        params.client = client_b.client.clone();
3924        params.user_store = client_b.user_store.clone();
3925        params.project = project_b;
3926
3927        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3928        let editor_b = workspace_b
3929            .update(cx_b, |workspace, cx| {
3930                workspace.open_path((worktree_id, "main.rs"), true, cx)
3931            })
3932            .await
3933            .unwrap()
3934            .downcast::<Editor>()
3935            .unwrap();
3936
3937        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3938        fake_language_server
3939            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3940                assert_eq!(
3941                    params.text_document.uri,
3942                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3943                );
3944                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3945                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3946                Ok(None)
3947            })
3948            .next()
3949            .await;
3950
3951        // Move cursor to a location that contains code actions.
3952        editor_b.update(cx_b, |editor, cx| {
3953            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3954            cx.focus(&editor_b);
3955        });
3956
3957        fake_language_server
3958            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3959                assert_eq!(
3960                    params.text_document.uri,
3961                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3962                );
3963                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3964                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3965
3966                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
3967                    lsp::CodeAction {
3968                        title: "Inline into all callers".to_string(),
3969                        edit: Some(lsp::WorkspaceEdit {
3970                            changes: Some(
3971                                [
3972                                    (
3973                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3974                                        vec![lsp::TextEdit::new(
3975                                            lsp::Range::new(
3976                                                lsp::Position::new(1, 22),
3977                                                lsp::Position::new(1, 34),
3978                                            ),
3979                                            "4".to_string(),
3980                                        )],
3981                                    ),
3982                                    (
3983                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3984                                        vec![lsp::TextEdit::new(
3985                                            lsp::Range::new(
3986                                                lsp::Position::new(0, 0),
3987                                                lsp::Position::new(0, 27),
3988                                            ),
3989                                            "".to_string(),
3990                                        )],
3991                                    ),
3992                                ]
3993                                .into_iter()
3994                                .collect(),
3995                            ),
3996                            ..Default::default()
3997                        }),
3998                        data: Some(json!({
3999                            "codeActionParams": {
4000                                "range": {
4001                                    "start": {"line": 1, "column": 31},
4002                                    "end": {"line": 1, "column": 31},
4003                                }
4004                            }
4005                        })),
4006                        ..Default::default()
4007                    },
4008                )]))
4009            })
4010            .next()
4011            .await;
4012
4013        // Toggle code actions and wait for them to display.
4014        editor_b.update(cx_b, |editor, cx| {
4015            editor.toggle_code_actions(
4016                &ToggleCodeActions {
4017                    deployed_from_indicator: false,
4018                },
4019                cx,
4020            );
4021        });
4022        editor_b
4023            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4024            .await;
4025
4026        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4027
4028        // Confirming the code action will trigger a resolve request.
4029        let confirm_action = workspace_b
4030            .update(cx_b, |workspace, cx| {
4031                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4032            })
4033            .unwrap();
4034        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4035            |_, _| async move {
4036                Ok(lsp::CodeAction {
4037                    title: "Inline into all callers".to_string(),
4038                    edit: Some(lsp::WorkspaceEdit {
4039                        changes: Some(
4040                            [
4041                                (
4042                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4043                                    vec![lsp::TextEdit::new(
4044                                        lsp::Range::new(
4045                                            lsp::Position::new(1, 22),
4046                                            lsp::Position::new(1, 34),
4047                                        ),
4048                                        "4".to_string(),
4049                                    )],
4050                                ),
4051                                (
4052                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4053                                    vec![lsp::TextEdit::new(
4054                                        lsp::Range::new(
4055                                            lsp::Position::new(0, 0),
4056                                            lsp::Position::new(0, 27),
4057                                        ),
4058                                        "".to_string(),
4059                                    )],
4060                                ),
4061                            ]
4062                            .into_iter()
4063                            .collect(),
4064                        ),
4065                        ..Default::default()
4066                    }),
4067                    ..Default::default()
4068                })
4069            },
4070        );
4071
4072        // After the action is confirmed, an editor containing both modified files is opened.
4073        confirm_action.await.unwrap();
4074        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4075            workspace
4076                .active_item(cx)
4077                .unwrap()
4078                .downcast::<Editor>()
4079                .unwrap()
4080        });
4081        code_action_editor.update(cx_b, |editor, cx| {
4082            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4083            editor.undo(&Undo, cx);
4084            assert_eq!(
4085                editor.text(cx),
4086                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4087            );
4088            editor.redo(&Redo, cx);
4089            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4090        });
4091    }
4092
4093    #[gpui::test(iterations = 10)]
4094    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4095        cx_a.foreground().forbid_parking();
4096        let lang_registry = Arc::new(LanguageRegistry::test());
4097        let fs = FakeFs::new(cx_a.background());
4098        cx_b.update(|cx| editor::init(cx));
4099
4100        // Set up a fake language server.
4101        let mut language = Language::new(
4102            LanguageConfig {
4103                name: "Rust".into(),
4104                path_suffixes: vec!["rs".to_string()],
4105                ..Default::default()
4106            },
4107            Some(tree_sitter_rust::language()),
4108        );
4109        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4110            capabilities: lsp::ServerCapabilities {
4111                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4112                    prepare_provider: Some(true),
4113                    work_done_progress_options: Default::default(),
4114                })),
4115                ..Default::default()
4116            },
4117            ..Default::default()
4118        });
4119        lang_registry.add(Arc::new(language));
4120
4121        // Connect to a server as 2 clients.
4122        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4123        let client_a = server.create_client(cx_a, "user_a").await;
4124        let client_b = server.create_client(cx_b, "user_b").await;
4125
4126        // Share a project as client A
4127        fs.insert_tree(
4128            "/dir",
4129            json!({
4130                ".zed.toml": r#"collaborators = ["user_b"]"#,
4131                "one.rs": "const ONE: usize = 1;",
4132                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4133            }),
4134        )
4135        .await;
4136        let project_a = cx_a.update(|cx| {
4137            Project::local(
4138                client_a.clone(),
4139                client_a.user_store.clone(),
4140                lang_registry.clone(),
4141                fs.clone(),
4142                cx,
4143            )
4144        });
4145        let (worktree_a, _) = project_a
4146            .update(cx_a, |p, cx| {
4147                p.find_or_create_local_worktree("/dir", true, cx)
4148            })
4149            .await
4150            .unwrap();
4151        worktree_a
4152            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4153            .await;
4154        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4155        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4156        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4157
4158        // Join the worktree as client B.
4159        let project_b = Project::remote(
4160            project_id,
4161            client_b.clone(),
4162            client_b.user_store.clone(),
4163            lang_registry.clone(),
4164            fs.clone(),
4165            &mut cx_b.to_async(),
4166        )
4167        .await
4168        .unwrap();
4169        let mut params = cx_b.update(WorkspaceParams::test);
4170        params.languages = lang_registry.clone();
4171        params.client = client_b.client.clone();
4172        params.user_store = client_b.user_store.clone();
4173        params.project = project_b;
4174
4175        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4176        let editor_b = workspace_b
4177            .update(cx_b, |workspace, cx| {
4178                workspace.open_path((worktree_id, "one.rs"), true, cx)
4179            })
4180            .await
4181            .unwrap()
4182            .downcast::<Editor>()
4183            .unwrap();
4184        let fake_language_server = fake_language_servers.next().await.unwrap();
4185
4186        // Move cursor to a location that can be renamed.
4187        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4188            editor.select_ranges([7..7], None, cx);
4189            editor.rename(&Rename, cx).unwrap()
4190        });
4191
4192        fake_language_server
4193            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4194                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4195                assert_eq!(params.position, lsp::Position::new(0, 7));
4196                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4197                    lsp::Position::new(0, 6),
4198                    lsp::Position::new(0, 9),
4199                ))))
4200            })
4201            .next()
4202            .await
4203            .unwrap();
4204        prepare_rename.await.unwrap();
4205        editor_b.update(cx_b, |editor, cx| {
4206            let rename = editor.pending_rename().unwrap();
4207            let buffer = editor.buffer().read(cx).snapshot(cx);
4208            assert_eq!(
4209                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4210                6..9
4211            );
4212            rename.editor.update(cx, |rename_editor, cx| {
4213                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4214                    rename_buffer.edit([(0..3, "THREE")], cx);
4215                });
4216            });
4217        });
4218
4219        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4220            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4221        });
4222        fake_language_server
4223            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4224                assert_eq!(
4225                    params.text_document_position.text_document.uri.as_str(),
4226                    "file:///dir/one.rs"
4227                );
4228                assert_eq!(
4229                    params.text_document_position.position,
4230                    lsp::Position::new(0, 6)
4231                );
4232                assert_eq!(params.new_name, "THREE");
4233                Ok(Some(lsp::WorkspaceEdit {
4234                    changes: Some(
4235                        [
4236                            (
4237                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4238                                vec![lsp::TextEdit::new(
4239                                    lsp::Range::new(
4240                                        lsp::Position::new(0, 6),
4241                                        lsp::Position::new(0, 9),
4242                                    ),
4243                                    "THREE".to_string(),
4244                                )],
4245                            ),
4246                            (
4247                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4248                                vec![
4249                                    lsp::TextEdit::new(
4250                                        lsp::Range::new(
4251                                            lsp::Position::new(0, 24),
4252                                            lsp::Position::new(0, 27),
4253                                        ),
4254                                        "THREE".to_string(),
4255                                    ),
4256                                    lsp::TextEdit::new(
4257                                        lsp::Range::new(
4258                                            lsp::Position::new(0, 35),
4259                                            lsp::Position::new(0, 38),
4260                                        ),
4261                                        "THREE".to_string(),
4262                                    ),
4263                                ],
4264                            ),
4265                        ]
4266                        .into_iter()
4267                        .collect(),
4268                    ),
4269                    ..Default::default()
4270                }))
4271            })
4272            .next()
4273            .await
4274            .unwrap();
4275        confirm_rename.await.unwrap();
4276
4277        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4278            workspace
4279                .active_item(cx)
4280                .unwrap()
4281                .downcast::<Editor>()
4282                .unwrap()
4283        });
4284        rename_editor.update(cx_b, |editor, cx| {
4285            assert_eq!(
4286                editor.text(cx),
4287                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4288            );
4289            editor.undo(&Undo, cx);
4290            assert_eq!(
4291                editor.text(cx),
4292                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4293            );
4294            editor.redo(&Redo, cx);
4295            assert_eq!(
4296                editor.text(cx),
4297                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4298            );
4299        });
4300
4301        // Ensure temporary rename edits cannot be undone/redone.
4302        editor_b.update(cx_b, |editor, cx| {
4303            editor.undo(&Undo, cx);
4304            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4305            editor.undo(&Undo, cx);
4306            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4307            editor.redo(&Redo, cx);
4308            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4309        })
4310    }
4311
4312    #[gpui::test(iterations = 10)]
4313    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4314        cx_a.foreground().forbid_parking();
4315
4316        // Connect to a server as 2 clients.
4317        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4318        let client_a = server.create_client(cx_a, "user_a").await;
4319        let client_b = server.create_client(cx_b, "user_b").await;
4320
4321        // Create an org that includes these 2 users.
4322        let db = &server.app_state.db;
4323        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4324        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4325            .await
4326            .unwrap();
4327        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4328            .await
4329            .unwrap();
4330
4331        // Create a channel that includes all the users.
4332        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4333        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4334            .await
4335            .unwrap();
4336        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4337            .await
4338            .unwrap();
4339        db.create_channel_message(
4340            channel_id,
4341            client_b.current_user_id(&cx_b),
4342            "hello A, it's B.",
4343            OffsetDateTime::now_utc(),
4344            1,
4345        )
4346        .await
4347        .unwrap();
4348
4349        let channels_a = cx_a
4350            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4351        channels_a
4352            .condition(cx_a, |list, _| list.available_channels().is_some())
4353            .await;
4354        channels_a.read_with(cx_a, |list, _| {
4355            assert_eq!(
4356                list.available_channels().unwrap(),
4357                &[ChannelDetails {
4358                    id: channel_id.to_proto(),
4359                    name: "test-channel".to_string()
4360                }]
4361            )
4362        });
4363        let channel_a = channels_a.update(cx_a, |this, cx| {
4364            this.get_channel(channel_id.to_proto(), cx).unwrap()
4365        });
4366        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4367        channel_a
4368            .condition(&cx_a, |channel, _| {
4369                channel_messages(channel)
4370                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4371            })
4372            .await;
4373
4374        let channels_b = cx_b
4375            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4376        channels_b
4377            .condition(cx_b, |list, _| list.available_channels().is_some())
4378            .await;
4379        channels_b.read_with(cx_b, |list, _| {
4380            assert_eq!(
4381                list.available_channels().unwrap(),
4382                &[ChannelDetails {
4383                    id: channel_id.to_proto(),
4384                    name: "test-channel".to_string()
4385                }]
4386            )
4387        });
4388
4389        let channel_b = channels_b.update(cx_b, |this, cx| {
4390            this.get_channel(channel_id.to_proto(), cx).unwrap()
4391        });
4392        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4393        channel_b
4394            .condition(&cx_b, |channel, _| {
4395                channel_messages(channel)
4396                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4397            })
4398            .await;
4399
4400        channel_a
4401            .update(cx_a, |channel, cx| {
4402                channel
4403                    .send_message("oh, hi B.".to_string(), cx)
4404                    .unwrap()
4405                    .detach();
4406                let task = channel.send_message("sup".to_string(), cx).unwrap();
4407                assert_eq!(
4408                    channel_messages(channel),
4409                    &[
4410                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4411                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4412                        ("user_a".to_string(), "sup".to_string(), true)
4413                    ]
4414                );
4415                task
4416            })
4417            .await
4418            .unwrap();
4419
4420        channel_b
4421            .condition(&cx_b, |channel, _| {
4422                channel_messages(channel)
4423                    == [
4424                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4425                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4426                        ("user_a".to_string(), "sup".to_string(), false),
4427                    ]
4428            })
4429            .await;
4430
4431        assert_eq!(
4432            server
4433                .state()
4434                .await
4435                .channel(channel_id)
4436                .unwrap()
4437                .connection_ids
4438                .len(),
4439            2
4440        );
4441        cx_b.update(|_| drop(channel_b));
4442        server
4443            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4444            .await;
4445
4446        cx_a.update(|_| drop(channel_a));
4447        server
4448            .condition(|state| state.channel(channel_id).is_none())
4449            .await;
4450    }
4451
4452    #[gpui::test(iterations = 10)]
4453    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4454        cx_a.foreground().forbid_parking();
4455
4456        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4457        let client_a = server.create_client(cx_a, "user_a").await;
4458
4459        let db = &server.app_state.db;
4460        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4461        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4462        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4463            .await
4464            .unwrap();
4465        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4466            .await
4467            .unwrap();
4468
4469        let channels_a = cx_a
4470            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4471        channels_a
4472            .condition(cx_a, |list, _| list.available_channels().is_some())
4473            .await;
4474        let channel_a = channels_a.update(cx_a, |this, cx| {
4475            this.get_channel(channel_id.to_proto(), cx).unwrap()
4476        });
4477
4478        // Messages aren't allowed to be too long.
4479        channel_a
4480            .update(cx_a, |channel, cx| {
4481                let long_body = "this is long.\n".repeat(1024);
4482                channel.send_message(long_body, cx).unwrap()
4483            })
4484            .await
4485            .unwrap_err();
4486
4487        // Messages aren't allowed to be blank.
4488        channel_a.update(cx_a, |channel, cx| {
4489            channel.send_message(String::new(), cx).unwrap_err()
4490        });
4491
4492        // Leading and trailing whitespace are trimmed.
4493        channel_a
4494            .update(cx_a, |channel, cx| {
4495                channel
4496                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4497                    .unwrap()
4498            })
4499            .await
4500            .unwrap();
4501        assert_eq!(
4502            db.get_channel_messages(channel_id, 10, None)
4503                .await
4504                .unwrap()
4505                .iter()
4506                .map(|m| &m.body)
4507                .collect::<Vec<_>>(),
4508            &["surrounded by whitespace"]
4509        );
4510    }
4511
4512    #[gpui::test(iterations = 10)]
4513    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4514        cx_a.foreground().forbid_parking();
4515
4516        // Connect to a server as 2 clients.
4517        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4518        let client_a = server.create_client(cx_a, "user_a").await;
4519        let client_b = server.create_client(cx_b, "user_b").await;
4520        let mut status_b = client_b.status();
4521
4522        // Create an org that includes these 2 users.
4523        let db = &server.app_state.db;
4524        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4525        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4526            .await
4527            .unwrap();
4528        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4529            .await
4530            .unwrap();
4531
4532        // Create a channel that includes all the users.
4533        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4534        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4535            .await
4536            .unwrap();
4537        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4538            .await
4539            .unwrap();
4540        db.create_channel_message(
4541            channel_id,
4542            client_b.current_user_id(&cx_b),
4543            "hello A, it's B.",
4544            OffsetDateTime::now_utc(),
4545            2,
4546        )
4547        .await
4548        .unwrap();
4549
4550        let channels_a = cx_a
4551            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4552        channels_a
4553            .condition(cx_a, |list, _| list.available_channels().is_some())
4554            .await;
4555
4556        channels_a.read_with(cx_a, |list, _| {
4557            assert_eq!(
4558                list.available_channels().unwrap(),
4559                &[ChannelDetails {
4560                    id: channel_id.to_proto(),
4561                    name: "test-channel".to_string()
4562                }]
4563            )
4564        });
4565        let channel_a = channels_a.update(cx_a, |this, cx| {
4566            this.get_channel(channel_id.to_proto(), cx).unwrap()
4567        });
4568        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4569        channel_a
4570            .condition(&cx_a, |channel, _| {
4571                channel_messages(channel)
4572                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4573            })
4574            .await;
4575
4576        let channels_b = cx_b
4577            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4578        channels_b
4579            .condition(cx_b, |list, _| list.available_channels().is_some())
4580            .await;
4581        channels_b.read_with(cx_b, |list, _| {
4582            assert_eq!(
4583                list.available_channels().unwrap(),
4584                &[ChannelDetails {
4585                    id: channel_id.to_proto(),
4586                    name: "test-channel".to_string()
4587                }]
4588            )
4589        });
4590
4591        let channel_b = channels_b.update(cx_b, |this, cx| {
4592            this.get_channel(channel_id.to_proto(), cx).unwrap()
4593        });
4594        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4595        channel_b
4596            .condition(&cx_b, |channel, _| {
4597                channel_messages(channel)
4598                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4599            })
4600            .await;
4601
4602        // Disconnect client B, ensuring we can still access its cached channel data.
4603        server.forbid_connections();
4604        server.disconnect_client(client_b.current_user_id(&cx_b));
4605        cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4606        while !matches!(
4607            status_b.next().await,
4608            Some(client::Status::ReconnectionError { .. })
4609        ) {}
4610
4611        channels_b.read_with(cx_b, |channels, _| {
4612            assert_eq!(
4613                channels.available_channels().unwrap(),
4614                [ChannelDetails {
4615                    id: channel_id.to_proto(),
4616                    name: "test-channel".to_string()
4617                }]
4618            )
4619        });
4620        channel_b.read_with(cx_b, |channel, _| {
4621            assert_eq!(
4622                channel_messages(channel),
4623                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4624            )
4625        });
4626
4627        // Send a message from client B while it is disconnected.
4628        channel_b
4629            .update(cx_b, |channel, cx| {
4630                let task = channel
4631                    .send_message("can you see this?".to_string(), cx)
4632                    .unwrap();
4633                assert_eq!(
4634                    channel_messages(channel),
4635                    &[
4636                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4637                        ("user_b".to_string(), "can you see this?".to_string(), true)
4638                    ]
4639                );
4640                task
4641            })
4642            .await
4643            .unwrap_err();
4644
4645        // Send a message from client A while B is disconnected.
4646        channel_a
4647            .update(cx_a, |channel, cx| {
4648                channel
4649                    .send_message("oh, hi B.".to_string(), cx)
4650                    .unwrap()
4651                    .detach();
4652                let task = channel.send_message("sup".to_string(), cx).unwrap();
4653                assert_eq!(
4654                    channel_messages(channel),
4655                    &[
4656                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4657                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4658                        ("user_a".to_string(), "sup".to_string(), true)
4659                    ]
4660                );
4661                task
4662            })
4663            .await
4664            .unwrap();
4665
4666        // Give client B a chance to reconnect.
4667        server.allow_connections();
4668        cx_b.foreground().advance_clock(Duration::from_secs(10));
4669
4670        // Verify that B sees the new messages upon reconnection, as well as the message client B
4671        // sent while offline.
4672        channel_b
4673            .condition(&cx_b, |channel, _| {
4674                channel_messages(channel)
4675                    == [
4676                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4677                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4678                        ("user_a".to_string(), "sup".to_string(), false),
4679                        ("user_b".to_string(), "can you see this?".to_string(), false),
4680                    ]
4681            })
4682            .await;
4683
4684        // Ensure client A and B can communicate normally after reconnection.
4685        channel_a
4686            .update(cx_a, |channel, cx| {
4687                channel.send_message("you online?".to_string(), cx).unwrap()
4688            })
4689            .await
4690            .unwrap();
4691        channel_b
4692            .condition(&cx_b, |channel, _| {
4693                channel_messages(channel)
4694                    == [
4695                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4696                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4697                        ("user_a".to_string(), "sup".to_string(), false),
4698                        ("user_b".to_string(), "can you see this?".to_string(), false),
4699                        ("user_a".to_string(), "you online?".to_string(), false),
4700                    ]
4701            })
4702            .await;
4703
4704        channel_b
4705            .update(cx_b, |channel, cx| {
4706                channel.send_message("yep".to_string(), cx).unwrap()
4707            })
4708            .await
4709            .unwrap();
4710        channel_a
4711            .condition(&cx_a, |channel, _| {
4712                channel_messages(channel)
4713                    == [
4714                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4715                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4716                        ("user_a".to_string(), "sup".to_string(), false),
4717                        ("user_b".to_string(), "can you see this?".to_string(), false),
4718                        ("user_a".to_string(), "you online?".to_string(), false),
4719                        ("user_b".to_string(), "yep".to_string(), false),
4720                    ]
4721            })
4722            .await;
4723    }
4724
4725    #[gpui::test(iterations = 10)]
4726    async fn test_contacts(
4727        cx_a: &mut TestAppContext,
4728        cx_b: &mut TestAppContext,
4729        cx_c: &mut TestAppContext,
4730    ) {
4731        cx_a.foreground().forbid_parking();
4732        let lang_registry = Arc::new(LanguageRegistry::test());
4733        let fs = FakeFs::new(cx_a.background());
4734
4735        // Connect to a server as 3 clients.
4736        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4737        let client_a = server.create_client(cx_a, "user_a").await;
4738        let client_b = server.create_client(cx_b, "user_b").await;
4739        let client_c = server.create_client(cx_c, "user_c").await;
4740
4741        // Share a worktree as client A.
4742        fs.insert_tree(
4743            "/a",
4744            json!({
4745                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4746            }),
4747        )
4748        .await;
4749
4750        let project_a = cx_a.update(|cx| {
4751            Project::local(
4752                client_a.clone(),
4753                client_a.user_store.clone(),
4754                lang_registry.clone(),
4755                fs.clone(),
4756                cx,
4757            )
4758        });
4759        let (worktree_a, _) = project_a
4760            .update(cx_a, |p, cx| {
4761                p.find_or_create_local_worktree("/a", true, cx)
4762            })
4763            .await
4764            .unwrap();
4765        worktree_a
4766            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4767            .await;
4768
4769        client_a
4770            .user_store
4771            .condition(&cx_a, |user_store, _| {
4772                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4773            })
4774            .await;
4775        client_b
4776            .user_store
4777            .condition(&cx_b, |user_store, _| {
4778                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4779            })
4780            .await;
4781        client_c
4782            .user_store
4783            .condition(&cx_c, |user_store, _| {
4784                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4785            })
4786            .await;
4787
4788        let project_id = project_a
4789            .update(cx_a, |project, _| project.next_remote_id())
4790            .await;
4791        project_a
4792            .update(cx_a, |project, cx| project.share(cx))
4793            .await
4794            .unwrap();
4795        client_a
4796            .user_store
4797            .condition(&cx_a, |user_store, _| {
4798                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4799            })
4800            .await;
4801        client_b
4802            .user_store
4803            .condition(&cx_b, |user_store, _| {
4804                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4805            })
4806            .await;
4807        client_c
4808            .user_store
4809            .condition(&cx_c, |user_store, _| {
4810                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4811            })
4812            .await;
4813
4814        let _project_b = Project::remote(
4815            project_id,
4816            client_b.clone(),
4817            client_b.user_store.clone(),
4818            lang_registry.clone(),
4819            fs.clone(),
4820            &mut cx_b.to_async(),
4821        )
4822        .await
4823        .unwrap();
4824
4825        client_a
4826            .user_store
4827            .condition(&cx_a, |user_store, _| {
4828                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4829            })
4830            .await;
4831        client_b
4832            .user_store
4833            .condition(&cx_b, |user_store, _| {
4834                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4835            })
4836            .await;
4837        client_c
4838            .user_store
4839            .condition(&cx_c, |user_store, _| {
4840                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4841            })
4842            .await;
4843
4844        project_a
4845            .condition(&cx_a, |project, _| {
4846                project.collaborators().contains_key(&client_b.peer_id)
4847            })
4848            .await;
4849
4850        cx_a.update(move |_| drop(project_a));
4851        client_a
4852            .user_store
4853            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4854            .await;
4855        client_b
4856            .user_store
4857            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4858            .await;
4859        client_c
4860            .user_store
4861            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4862            .await;
4863
4864        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
4865            user_store
4866                .contacts()
4867                .iter()
4868                .map(|contact| {
4869                    let worktrees = contact
4870                        .projects
4871                        .iter()
4872                        .map(|p| {
4873                            (
4874                                p.worktree_root_names[0].as_str(),
4875                                p.is_shared,
4876                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4877                            )
4878                        })
4879                        .collect();
4880                    (contact.user.github_login.as_str(), worktrees)
4881                })
4882                .collect()
4883        }
4884    }
4885
4886    #[gpui::test(iterations = 10)]
4887    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4888        cx_a.foreground().forbid_parking();
4889        let fs = FakeFs::new(cx_a.background());
4890
4891        // 2 clients connect to a server.
4892        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4893        let mut client_a = server.create_client(cx_a, "user_a").await;
4894        let mut client_b = server.create_client(cx_b, "user_b").await;
4895        cx_a.update(editor::init);
4896        cx_b.update(editor::init);
4897
4898        // Client A shares a project.
4899        fs.insert_tree(
4900            "/a",
4901            json!({
4902                ".zed.toml": r#"collaborators = ["user_b"]"#,
4903                "1.txt": "one",
4904                "2.txt": "two",
4905                "3.txt": "three",
4906            }),
4907        )
4908        .await;
4909        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4910        project_a
4911            .update(cx_a, |project, cx| project.share(cx))
4912            .await
4913            .unwrap();
4914
4915        // Client B joins the project.
4916        let project_b = client_b
4917            .build_remote_project(
4918                project_a
4919                    .read_with(cx_a, |project, _| project.remote_id())
4920                    .unwrap(),
4921                cx_b,
4922            )
4923            .await;
4924
4925        // Client A opens some editors.
4926        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4927        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4928        let editor_a1 = workspace_a
4929            .update(cx_a, |workspace, cx| {
4930                workspace.open_path((worktree_id, "1.txt"), true, cx)
4931            })
4932            .await
4933            .unwrap()
4934            .downcast::<Editor>()
4935            .unwrap();
4936        let editor_a2 = workspace_a
4937            .update(cx_a, |workspace, cx| {
4938                workspace.open_path((worktree_id, "2.txt"), true, cx)
4939            })
4940            .await
4941            .unwrap()
4942            .downcast::<Editor>()
4943            .unwrap();
4944
4945        // Client B opens an editor.
4946        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4947        let editor_b1 = workspace_b
4948            .update(cx_b, |workspace, cx| {
4949                workspace.open_path((worktree_id, "1.txt"), true, cx)
4950            })
4951            .await
4952            .unwrap()
4953            .downcast::<Editor>()
4954            .unwrap();
4955
4956        let client_a_id = project_b.read_with(cx_b, |project, _| {
4957            project.collaborators().values().next().unwrap().peer_id
4958        });
4959        let client_b_id = project_a.read_with(cx_a, |project, _| {
4960            project.collaborators().values().next().unwrap().peer_id
4961        });
4962
4963        // When client B starts following client A, all visible view states are replicated to client B.
4964        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4965        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4966        workspace_b
4967            .update(cx_b, |workspace, cx| {
4968                workspace
4969                    .toggle_follow(&ToggleFollow(client_a_id), cx)
4970                    .unwrap()
4971            })
4972            .await
4973            .unwrap();
4974        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4975            workspace
4976                .active_item(cx)
4977                .unwrap()
4978                .downcast::<Editor>()
4979                .unwrap()
4980        });
4981        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4982        assert_eq!(
4983            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4984            Some((worktree_id, "2.txt").into())
4985        );
4986        assert_eq!(
4987            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4988            vec![2..3]
4989        );
4990        assert_eq!(
4991            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4992            vec![0..1]
4993        );
4994
4995        // When client A activates a different editor, client B does so as well.
4996        workspace_a.update(cx_a, |workspace, cx| {
4997            workspace.activate_item(&editor_a1, cx)
4998        });
4999        workspace_b
5000            .condition(cx_b, |workspace, cx| {
5001                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5002            })
5003            .await;
5004
5005        // When client A navigates back and forth, client B does so as well.
5006        workspace_a
5007            .update(cx_a, |workspace, cx| {
5008                workspace::Pane::go_back(workspace, None, cx)
5009            })
5010            .await;
5011        workspace_b
5012            .condition(cx_b, |workspace, cx| {
5013                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5014            })
5015            .await;
5016
5017        workspace_a
5018            .update(cx_a, |workspace, cx| {
5019                workspace::Pane::go_forward(workspace, None, cx)
5020            })
5021            .await;
5022        workspace_b
5023            .condition(cx_b, |workspace, cx| {
5024                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5025            })
5026            .await;
5027
5028        // Changes to client A's editor are reflected on client B.
5029        editor_a1.update(cx_a, |editor, cx| {
5030            editor.select_ranges([1..1, 2..2], None, cx);
5031        });
5032        editor_b1
5033            .condition(cx_b, |editor, cx| {
5034                editor.selected_ranges(cx) == vec![1..1, 2..2]
5035            })
5036            .await;
5037
5038        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5039        editor_b1
5040            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5041            .await;
5042
5043        editor_a1.update(cx_a, |editor, cx| {
5044            editor.select_ranges([3..3], None, cx);
5045            editor.set_scroll_position(vec2f(0., 100.), cx);
5046        });
5047        editor_b1
5048            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5049            .await;
5050
5051        // After unfollowing, client B stops receiving updates from client A.
5052        workspace_b.update(cx_b, |workspace, cx| {
5053            workspace.unfollow(&workspace.active_pane().clone(), cx)
5054        });
5055        workspace_a.update(cx_a, |workspace, cx| {
5056            workspace.activate_item(&editor_a2, cx)
5057        });
5058        cx_a.foreground().run_until_parked();
5059        assert_eq!(
5060            workspace_b.read_with(cx_b, |workspace, cx| workspace
5061                .active_item(cx)
5062                .unwrap()
5063                .id()),
5064            editor_b1.id()
5065        );
5066
5067        // Client A starts following client B.
5068        workspace_a
5069            .update(cx_a, |workspace, cx| {
5070                workspace
5071                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5072                    .unwrap()
5073            })
5074            .await
5075            .unwrap();
5076        assert_eq!(
5077            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5078            Some(client_b_id)
5079        );
5080        assert_eq!(
5081            workspace_a.read_with(cx_a, |workspace, cx| workspace
5082                .active_item(cx)
5083                .unwrap()
5084                .id()),
5085            editor_a1.id()
5086        );
5087
5088        // Following interrupts when client B disconnects.
5089        client_b.disconnect(&cx_b.to_async()).unwrap();
5090        cx_a.foreground().run_until_parked();
5091        assert_eq!(
5092            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5093            None
5094        );
5095    }
5096
5097    #[gpui::test(iterations = 10)]
5098    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5099        cx_a.foreground().forbid_parking();
5100        let fs = FakeFs::new(cx_a.background());
5101
5102        // 2 clients connect to a server.
5103        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5104        let mut client_a = server.create_client(cx_a, "user_a").await;
5105        let mut client_b = server.create_client(cx_b, "user_b").await;
5106        cx_a.update(editor::init);
5107        cx_b.update(editor::init);
5108
5109        // Client A shares a project.
5110        fs.insert_tree(
5111            "/a",
5112            json!({
5113                ".zed.toml": r#"collaborators = ["user_b"]"#,
5114                "1.txt": "one",
5115                "2.txt": "two",
5116                "3.txt": "three",
5117                "4.txt": "four",
5118            }),
5119        )
5120        .await;
5121        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5122        project_a
5123            .update(cx_a, |project, cx| project.share(cx))
5124            .await
5125            .unwrap();
5126
5127        // Client B joins the project.
5128        let project_b = client_b
5129            .build_remote_project(
5130                project_a
5131                    .read_with(cx_a, |project, _| project.remote_id())
5132                    .unwrap(),
5133                cx_b,
5134            )
5135            .await;
5136
5137        // Client A opens some editors.
5138        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5139        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5140        let _editor_a1 = workspace_a
5141            .update(cx_a, |workspace, cx| {
5142                workspace.open_path((worktree_id, "1.txt"), true, cx)
5143            })
5144            .await
5145            .unwrap()
5146            .downcast::<Editor>()
5147            .unwrap();
5148
5149        // Client B opens an editor.
5150        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5151        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5152        let _editor_b1 = workspace_b
5153            .update(cx_b, |workspace, cx| {
5154                workspace.open_path((worktree_id, "2.txt"), true, cx)
5155            })
5156            .await
5157            .unwrap()
5158            .downcast::<Editor>()
5159            .unwrap();
5160
5161        // Clients A and B follow each other in split panes
5162        workspace_a
5163            .update(cx_a, |workspace, cx| {
5164                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5165                assert_ne!(*workspace.active_pane(), pane_a1);
5166                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5167                workspace
5168                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5169                    .unwrap()
5170            })
5171            .await
5172            .unwrap();
5173        workspace_b
5174            .update(cx_b, |workspace, cx| {
5175                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5176                assert_ne!(*workspace.active_pane(), pane_b1);
5177                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5178                workspace
5179                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5180                    .unwrap()
5181            })
5182            .await
5183            .unwrap();
5184
5185        workspace_a
5186            .update(cx_a, |workspace, cx| {
5187                workspace.activate_next_pane(cx);
5188                assert_eq!(*workspace.active_pane(), pane_a1);
5189                workspace.open_path((worktree_id, "3.txt"), true, cx)
5190            })
5191            .await
5192            .unwrap();
5193        workspace_b
5194            .update(cx_b, |workspace, cx| {
5195                workspace.activate_next_pane(cx);
5196                assert_eq!(*workspace.active_pane(), pane_b1);
5197                workspace.open_path((worktree_id, "4.txt"), true, cx)
5198            })
5199            .await
5200            .unwrap();
5201        cx_a.foreground().run_until_parked();
5202
5203        // Ensure leader updates don't change the active pane of followers
5204        workspace_a.read_with(cx_a, |workspace, _| {
5205            assert_eq!(*workspace.active_pane(), pane_a1);
5206        });
5207        workspace_b.read_with(cx_b, |workspace, _| {
5208            assert_eq!(*workspace.active_pane(), pane_b1);
5209        });
5210
5211        // Ensure peers following each other doesn't cause an infinite loop.
5212        assert_eq!(
5213            workspace_a.read_with(cx_a, |workspace, cx| workspace
5214                .active_item(cx)
5215                .unwrap()
5216                .project_path(cx)),
5217            Some((worktree_id, "3.txt").into())
5218        );
5219        workspace_a.update(cx_a, |workspace, cx| {
5220            assert_eq!(
5221                workspace.active_item(cx).unwrap().project_path(cx),
5222                Some((worktree_id, "3.txt").into())
5223            );
5224            workspace.activate_next_pane(cx);
5225            assert_eq!(
5226                workspace.active_item(cx).unwrap().project_path(cx),
5227                Some((worktree_id, "4.txt").into())
5228            );
5229        });
5230        workspace_b.update(cx_b, |workspace, cx| {
5231            assert_eq!(
5232                workspace.active_item(cx).unwrap().project_path(cx),
5233                Some((worktree_id, "4.txt").into())
5234            );
5235            workspace.activate_next_pane(cx);
5236            assert_eq!(
5237                workspace.active_item(cx).unwrap().project_path(cx),
5238                Some((worktree_id, "3.txt").into())
5239            );
5240        });
5241    }
5242
5243    #[gpui::test(iterations = 10)]
5244    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5245        cx_a.foreground().forbid_parking();
5246        let fs = FakeFs::new(cx_a.background());
5247
5248        // 2 clients connect to a server.
5249        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5250        let mut client_a = server.create_client(cx_a, "user_a").await;
5251        let mut client_b = server.create_client(cx_b, "user_b").await;
5252        cx_a.update(editor::init);
5253        cx_b.update(editor::init);
5254
5255        // Client A shares a project.
5256        fs.insert_tree(
5257            "/a",
5258            json!({
5259                ".zed.toml": r#"collaborators = ["user_b"]"#,
5260                "1.txt": "one",
5261                "2.txt": "two",
5262                "3.txt": "three",
5263            }),
5264        )
5265        .await;
5266        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5267        project_a
5268            .update(cx_a, |project, cx| project.share(cx))
5269            .await
5270            .unwrap();
5271
5272        // Client B joins the project.
5273        let project_b = client_b
5274            .build_remote_project(
5275                project_a
5276                    .read_with(cx_a, |project, _| project.remote_id())
5277                    .unwrap(),
5278                cx_b,
5279            )
5280            .await;
5281
5282        // Client A opens some editors.
5283        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5284        let _editor_a1 = workspace_a
5285            .update(cx_a, |workspace, cx| {
5286                workspace.open_path((worktree_id, "1.txt"), true, cx)
5287            })
5288            .await
5289            .unwrap()
5290            .downcast::<Editor>()
5291            .unwrap();
5292
5293        // Client B starts following client A.
5294        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5295        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5296        let leader_id = project_b.read_with(cx_b, |project, _| {
5297            project.collaborators().values().next().unwrap().peer_id
5298        });
5299        workspace_b
5300            .update(cx_b, |workspace, cx| {
5301                workspace
5302                    .toggle_follow(&ToggleFollow(leader_id), cx)
5303                    .unwrap()
5304            })
5305            .await
5306            .unwrap();
5307        assert_eq!(
5308            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5309            Some(leader_id)
5310        );
5311        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5312            workspace
5313                .active_item(cx)
5314                .unwrap()
5315                .downcast::<Editor>()
5316                .unwrap()
5317        });
5318
5319        // When client B moves, it automatically stops following client A.
5320        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5321        assert_eq!(
5322            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5323            None
5324        );
5325
5326        workspace_b
5327            .update(cx_b, |workspace, cx| {
5328                workspace
5329                    .toggle_follow(&ToggleFollow(leader_id), cx)
5330                    .unwrap()
5331            })
5332            .await
5333            .unwrap();
5334        assert_eq!(
5335            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5336            Some(leader_id)
5337        );
5338
5339        // When client B edits, it automatically stops following client A.
5340        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5341        assert_eq!(
5342            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5343            None
5344        );
5345
5346        workspace_b
5347            .update(cx_b, |workspace, cx| {
5348                workspace
5349                    .toggle_follow(&ToggleFollow(leader_id), cx)
5350                    .unwrap()
5351            })
5352            .await
5353            .unwrap();
5354        assert_eq!(
5355            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5356            Some(leader_id)
5357        );
5358
5359        // When client B scrolls, it automatically stops following client A.
5360        editor_b2.update(cx_b, |editor, cx| {
5361            editor.set_scroll_position(vec2f(0., 3.), cx)
5362        });
5363        assert_eq!(
5364            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5365            None
5366        );
5367
5368        workspace_b
5369            .update(cx_b, |workspace, cx| {
5370                workspace
5371                    .toggle_follow(&ToggleFollow(leader_id), cx)
5372                    .unwrap()
5373            })
5374            .await
5375            .unwrap();
5376        assert_eq!(
5377            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5378            Some(leader_id)
5379        );
5380
5381        // When client B activates a different pane, it continues following client A in the original pane.
5382        workspace_b.update(cx_b, |workspace, cx| {
5383            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5384        });
5385        assert_eq!(
5386            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5387            Some(leader_id)
5388        );
5389
5390        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5391        assert_eq!(
5392            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5393            Some(leader_id)
5394        );
5395
5396        // When client B activates a different item in the original pane, it automatically stops following client A.
5397        workspace_b
5398            .update(cx_b, |workspace, cx| {
5399                workspace.open_path((worktree_id, "2.txt"), true, cx)
5400            })
5401            .await
5402            .unwrap();
5403        assert_eq!(
5404            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5405            None
5406        );
5407    }
5408
5409    #[gpui::test(iterations = 100)]
5410    async fn test_random_collaboration(
5411        cx: &mut TestAppContext,
5412        deterministic: Arc<Deterministic>,
5413        rng: StdRng,
5414    ) {
5415        cx.foreground().forbid_parking();
5416        let max_peers = env::var("MAX_PEERS")
5417            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5418            .unwrap_or(5);
5419        assert!(max_peers <= 5);
5420
5421        let max_operations = env::var("OPERATIONS")
5422            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5423            .unwrap_or(10);
5424
5425        let rng = Arc::new(Mutex::new(rng));
5426
5427        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5428        let host_language_registry = Arc::new(LanguageRegistry::test());
5429
5430        let fs = FakeFs::new(cx.background());
5431        fs.insert_tree(
5432            "/_collab",
5433            json!({
5434                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5435            }),
5436        )
5437        .await;
5438
5439        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5440        let mut clients = Vec::new();
5441        let mut user_ids = Vec::new();
5442        let mut op_start_signals = Vec::new();
5443        let files = Arc::new(Mutex::new(Vec::new()));
5444
5445        let mut next_entity_id = 100000;
5446        let mut host_cx = TestAppContext::new(
5447            cx.foreground_platform(),
5448            cx.platform(),
5449            deterministic.build_foreground(next_entity_id),
5450            deterministic.build_background(),
5451            cx.font_cache(),
5452            cx.leak_detector(),
5453            next_entity_id,
5454        );
5455        let host = server.create_client(&mut host_cx, "host").await;
5456        let host_project = host_cx.update(|cx| {
5457            Project::local(
5458                host.client.clone(),
5459                host.user_store.clone(),
5460                host_language_registry.clone(),
5461                fs.clone(),
5462                cx,
5463            )
5464        });
5465        let host_project_id = host_project
5466            .update(&mut host_cx, |p, _| p.next_remote_id())
5467            .await;
5468
5469        let (collab_worktree, _) = host_project
5470            .update(&mut host_cx, |project, cx| {
5471                project.find_or_create_local_worktree("/_collab", true, cx)
5472            })
5473            .await
5474            .unwrap();
5475        collab_worktree
5476            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5477            .await;
5478        host_project
5479            .update(&mut host_cx, |project, cx| project.share(cx))
5480            .await
5481            .unwrap();
5482
5483        // Set up fake language servers.
5484        let mut language = Language::new(
5485            LanguageConfig {
5486                name: "Rust".into(),
5487                path_suffixes: vec!["rs".to_string()],
5488                ..Default::default()
5489            },
5490            None,
5491        );
5492        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5493            name: "the-fake-language-server",
5494            capabilities: lsp::LanguageServer::full_capabilities(),
5495            initializer: Some(Box::new({
5496                let rng = rng.clone();
5497                let files = files.clone();
5498                let project = host_project.downgrade();
5499                move |fake_server: &mut FakeLanguageServer| {
5500                    fake_server.handle_request::<lsp::request::Completion, _, _>(
5501                        |_, _| async move {
5502                            Ok(Some(lsp::CompletionResponse::Array(vec![
5503                                lsp::CompletionItem {
5504                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5505                                        range: lsp::Range::new(
5506                                            lsp::Position::new(0, 0),
5507                                            lsp::Position::new(0, 0),
5508                                        ),
5509                                        new_text: "the-new-text".to_string(),
5510                                    })),
5511                                    ..Default::default()
5512                                },
5513                            ])))
5514                        },
5515                    );
5516
5517                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5518                        |_, _| async move {
5519                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5520                                lsp::CodeAction {
5521                                    title: "the-code-action".to_string(),
5522                                    ..Default::default()
5523                                },
5524                            )]))
5525                        },
5526                    );
5527
5528                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5529                        |params, _| async move {
5530                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5531                                params.position,
5532                                params.position,
5533                            ))))
5534                        },
5535                    );
5536
5537                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5538                        let files = files.clone();
5539                        let rng = rng.clone();
5540                        move |_, _| {
5541                            let files = files.clone();
5542                            let rng = rng.clone();
5543                            async move {
5544                                let files = files.lock();
5545                                let mut rng = rng.lock();
5546                                let count = rng.gen_range::<usize, _>(1..3);
5547                                let files = (0..count)
5548                                    .map(|_| files.choose(&mut *rng).unwrap())
5549                                    .collect::<Vec<_>>();
5550                                log::info!("LSP: Returning definitions in files {:?}", &files);
5551                                Ok(Some(lsp::GotoDefinitionResponse::Array(
5552                                    files
5553                                        .into_iter()
5554                                        .map(|file| lsp::Location {
5555                                            uri: lsp::Url::from_file_path(file).unwrap(),
5556                                            range: Default::default(),
5557                                        })
5558                                        .collect(),
5559                                )))
5560                            }
5561                        }
5562                    });
5563
5564                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5565                        let rng = rng.clone();
5566                        let project = project.clone();
5567                        move |params, mut cx| {
5568                            let highlights = if let Some(project) = project.upgrade(&cx) {
5569                                project.update(&mut cx, |project, cx| {
5570                                    let path = params
5571                                        .text_document_position_params
5572                                        .text_document
5573                                        .uri
5574                                        .to_file_path()
5575                                        .unwrap();
5576                                    let (worktree, relative_path) =
5577                                        project.find_local_worktree(&path, cx)?;
5578                                    let project_path =
5579                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
5580                                    let buffer =
5581                                        project.get_open_buffer(&project_path, cx)?.read(cx);
5582
5583                                    let mut highlights = Vec::new();
5584                                    let highlight_count = rng.lock().gen_range(1..=5);
5585                                    let mut prev_end = 0;
5586                                    for _ in 0..highlight_count {
5587                                        let range =
5588                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
5589
5590                                        highlights.push(lsp::DocumentHighlight {
5591                                            range: range_to_lsp(range.to_point_utf16(buffer)),
5592                                            kind: Some(lsp::DocumentHighlightKind::READ),
5593                                        });
5594                                        prev_end = range.end;
5595                                    }
5596                                    Some(highlights)
5597                                })
5598                            } else {
5599                                None
5600                            };
5601                            async move { Ok(highlights) }
5602                        }
5603                    });
5604                }
5605            })),
5606            ..Default::default()
5607        });
5608        host_language_registry.add(Arc::new(language));
5609
5610        let op_start_signal = futures::channel::mpsc::unbounded();
5611        user_ids.push(host.current_user_id(&host_cx));
5612        op_start_signals.push(op_start_signal.0);
5613        clients.push(host_cx.foreground().spawn(host.simulate_host(
5614            host_project,
5615            files,
5616            op_start_signal.1,
5617            rng.clone(),
5618            host_cx,
5619        )));
5620
5621        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5622            rng.lock().gen_range(0..max_operations)
5623        } else {
5624            max_operations
5625        };
5626        let mut available_guests = vec![
5627            "guest-1".to_string(),
5628            "guest-2".to_string(),
5629            "guest-3".to_string(),
5630            "guest-4".to_string(),
5631        ];
5632        let mut operations = 0;
5633        while operations < max_operations {
5634            if operations == disconnect_host_at {
5635                server.disconnect_client(user_ids[0]);
5636                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5637                drop(op_start_signals);
5638                let mut clients = futures::future::join_all(clients).await;
5639                cx.foreground().run_until_parked();
5640
5641                let (host, mut host_cx, host_err) = clients.remove(0);
5642                if let Some(host_err) = host_err {
5643                    log::error!("host error - {}", host_err);
5644                }
5645                host.project
5646                    .as_ref()
5647                    .unwrap()
5648                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5649                for (guest, mut guest_cx, guest_err) in clients {
5650                    if let Some(guest_err) = guest_err {
5651                        log::error!("{} error - {}", guest.username, guest_err);
5652                    }
5653                    let contacts = server
5654                        .store
5655                        .read()
5656                        .await
5657                        .contacts_for_user(guest.current_user_id(&guest_cx));
5658                    assert!(!contacts
5659                        .iter()
5660                        .flat_map(|contact| &contact.projects)
5661                        .any(|project| project.id == host_project_id));
5662                    guest
5663                        .project
5664                        .as_ref()
5665                        .unwrap()
5666                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5667                    guest_cx.update(|_| drop(guest));
5668                }
5669                host_cx.update(|_| drop(host));
5670
5671                return;
5672            }
5673
5674            let distribution = rng.lock().gen_range(0..100);
5675            match distribution {
5676                0..=19 if !available_guests.is_empty() => {
5677                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
5678                    let guest_username = available_guests.remove(guest_ix);
5679                    log::info!("Adding new connection for {}", guest_username);
5680                    next_entity_id += 100000;
5681                    let mut guest_cx = TestAppContext::new(
5682                        cx.foreground_platform(),
5683                        cx.platform(),
5684                        deterministic.build_foreground(next_entity_id),
5685                        deterministic.build_background(),
5686                        cx.font_cache(),
5687                        cx.leak_detector(),
5688                        next_entity_id,
5689                    );
5690                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
5691                    let guest_project = Project::remote(
5692                        host_project_id,
5693                        guest.client.clone(),
5694                        guest.user_store.clone(),
5695                        guest_lang_registry.clone(),
5696                        FakeFs::new(cx.background()),
5697                        &mut guest_cx.to_async(),
5698                    )
5699                    .await
5700                    .unwrap();
5701                    let op_start_signal = futures::channel::mpsc::unbounded();
5702                    user_ids.push(guest.current_user_id(&guest_cx));
5703                    op_start_signals.push(op_start_signal.0);
5704                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5705                        guest_username.clone(),
5706                        guest_project,
5707                        op_start_signal.1,
5708                        rng.clone(),
5709                        guest_cx,
5710                    )));
5711
5712                    log::info!("Added connection for {}", guest_username);
5713                    operations += 1;
5714                }
5715                20..=29 if clients.len() > 1 => {
5716                    log::info!("Removing guest");
5717                    let guest_ix = rng.lock().gen_range(1..clients.len());
5718                    let removed_guest_id = user_ids.remove(guest_ix);
5719                    let guest = clients.remove(guest_ix);
5720                    op_start_signals.remove(guest_ix);
5721                    server.disconnect_client(removed_guest_id);
5722                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5723                    let (guest, mut guest_cx, guest_err) = guest.await;
5724                    if let Some(guest_err) = guest_err {
5725                        log::error!("{} error - {}", guest.username, guest_err);
5726                    }
5727                    guest
5728                        .project
5729                        .as_ref()
5730                        .unwrap()
5731                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5732                    for user_id in &user_ids {
5733                        for contact in server.store.read().await.contacts_for_user(*user_id) {
5734                            assert_ne!(
5735                                contact.user_id, removed_guest_id.0 as u64,
5736                                "removed guest is still a contact of another peer"
5737                            );
5738                            for project in contact.projects {
5739                                for project_guest_id in project.guests {
5740                                    assert_ne!(
5741                                        project_guest_id, removed_guest_id.0 as u64,
5742                                        "removed guest appears as still participating on a project"
5743                                    );
5744                                }
5745                            }
5746                        }
5747                    }
5748
5749                    log::info!("{} removed", guest.username);
5750                    available_guests.push(guest.username.clone());
5751                    guest_cx.update(|_| drop(guest));
5752
5753                    operations += 1;
5754                }
5755                _ => {
5756                    while operations < max_operations && rng.lock().gen_bool(0.7) {
5757                        op_start_signals
5758                            .choose(&mut *rng.lock())
5759                            .unwrap()
5760                            .unbounded_send(())
5761                            .unwrap();
5762                        operations += 1;
5763                    }
5764
5765                    if rng.lock().gen_bool(0.8) {
5766                        cx.foreground().run_until_parked();
5767                    }
5768                }
5769            }
5770        }
5771
5772        drop(op_start_signals);
5773        let mut clients = futures::future::join_all(clients).await;
5774        cx.foreground().run_until_parked();
5775
5776        let (host_client, mut host_cx, host_err) = clients.remove(0);
5777        if let Some(host_err) = host_err {
5778            panic!("host error - {}", host_err);
5779        }
5780        let host_project = host_client.project.as_ref().unwrap();
5781        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5782            project
5783                .worktrees(cx)
5784                .map(|worktree| {
5785                    let snapshot = worktree.read(cx).snapshot();
5786                    (snapshot.id(), snapshot)
5787                })
5788                .collect::<BTreeMap<_, _>>()
5789        });
5790
5791        host_client
5792            .project
5793            .as_ref()
5794            .unwrap()
5795            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5796
5797        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5798            if let Some(guest_err) = guest_err {
5799                panic!("{} error - {}", guest_client.username, guest_err);
5800            }
5801            let worktree_snapshots =
5802                guest_client
5803                    .project
5804                    .as_ref()
5805                    .unwrap()
5806                    .read_with(&guest_cx, |project, cx| {
5807                        project
5808                            .worktrees(cx)
5809                            .map(|worktree| {
5810                                let worktree = worktree.read(cx);
5811                                (worktree.id(), worktree.snapshot())
5812                            })
5813                            .collect::<BTreeMap<_, _>>()
5814                    });
5815
5816            assert_eq!(
5817                worktree_snapshots.keys().collect::<Vec<_>>(),
5818                host_worktree_snapshots.keys().collect::<Vec<_>>(),
5819                "{} has different worktrees than the host",
5820                guest_client.username
5821            );
5822            for (id, host_snapshot) in &host_worktree_snapshots {
5823                let guest_snapshot = &worktree_snapshots[id];
5824                assert_eq!(
5825                    guest_snapshot.root_name(),
5826                    host_snapshot.root_name(),
5827                    "{} has different root name than the host for worktree {}",
5828                    guest_client.username,
5829                    id
5830                );
5831                assert_eq!(
5832                    guest_snapshot.entries(false).collect::<Vec<_>>(),
5833                    host_snapshot.entries(false).collect::<Vec<_>>(),
5834                    "{} has different snapshot than the host for worktree {}",
5835                    guest_client.username,
5836                    id
5837                );
5838                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
5839            }
5840
5841            guest_client
5842                .project
5843                .as_ref()
5844                .unwrap()
5845                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5846
5847            for guest_buffer in &guest_client.buffers {
5848                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5849                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5850                    project.buffer_for_id(buffer_id, cx).expect(&format!(
5851                        "host does not have buffer for guest:{}, peer:{}, id:{}",
5852                        guest_client.username, guest_client.peer_id, buffer_id
5853                    ))
5854                });
5855                let path = host_buffer
5856                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5857
5858                assert_eq!(
5859                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5860                    0,
5861                    "{}, buffer {}, path {:?} has deferred operations",
5862                    guest_client.username,
5863                    buffer_id,
5864                    path,
5865                );
5866                assert_eq!(
5867                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5868                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5869                    "{}, buffer {}, path {:?}, differs from the host's buffer",
5870                    guest_client.username,
5871                    buffer_id,
5872                    path
5873                );
5874            }
5875
5876            guest_cx.update(|_| drop(guest_client));
5877        }
5878
5879        host_cx.update(|_| drop(host_client));
5880    }
5881
5882    struct TestServer {
5883        peer: Arc<Peer>,
5884        app_state: Arc<AppState>,
5885        server: Arc<Server>,
5886        foreground: Rc<executor::Foreground>,
5887        notifications: mpsc::UnboundedReceiver<()>,
5888        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5889        forbid_connections: Arc<AtomicBool>,
5890        _test_db: TestDb,
5891    }
5892
5893    impl TestServer {
5894        async fn start(
5895            foreground: Rc<executor::Foreground>,
5896            background: Arc<executor::Background>,
5897        ) -> Self {
5898            let test_db = TestDb::fake(background);
5899            let app_state = Self::build_app_state(&test_db).await;
5900            let peer = Peer::new();
5901            let notifications = mpsc::unbounded();
5902            let server = Server::new(app_state.clone(), Some(notifications.0));
5903            Self {
5904                peer,
5905                app_state,
5906                server,
5907                foreground,
5908                notifications: notifications.1,
5909                connection_killers: Default::default(),
5910                forbid_connections: Default::default(),
5911                _test_db: test_db,
5912            }
5913        }
5914
5915        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
5916            cx.update(|cx| {
5917                let settings = Settings::test(cx);
5918                cx.set_global(settings);
5919            });
5920
5921            let http = FakeHttpClient::with_404_response();
5922            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
5923            let client_name = name.to_string();
5924            let mut client = Client::new(http.clone());
5925            let server = self.server.clone();
5926            let connection_killers = self.connection_killers.clone();
5927            let forbid_connections = self.forbid_connections.clone();
5928            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
5929
5930            Arc::get_mut(&mut client)
5931                .unwrap()
5932                .override_authenticate(move |cx| {
5933                    cx.spawn(|_| async move {
5934                        let access_token = "the-token".to_string();
5935                        Ok(Credentials {
5936                            user_id: user_id.0 as u64,
5937                            access_token,
5938                        })
5939                    })
5940                })
5941                .override_establish_connection(move |credentials, cx| {
5942                    assert_eq!(credentials.user_id, user_id.0 as u64);
5943                    assert_eq!(credentials.access_token, "the-token");
5944
5945                    let server = server.clone();
5946                    let connection_killers = connection_killers.clone();
5947                    let forbid_connections = forbid_connections.clone();
5948                    let client_name = client_name.clone();
5949                    let connection_id_tx = connection_id_tx.clone();
5950                    cx.spawn(move |cx| async move {
5951                        if forbid_connections.load(SeqCst) {
5952                            Err(EstablishConnectionError::other(anyhow!(
5953                                "server is forbidding connections"
5954                            )))
5955                        } else {
5956                            let (client_conn, server_conn, killed) =
5957                                Connection::in_memory(cx.background());
5958                            connection_killers.lock().insert(user_id, killed);
5959                            cx.background()
5960                                .spawn(server.handle_connection(
5961                                    server_conn,
5962                                    client_name,
5963                                    user_id,
5964                                    Some(connection_id_tx),
5965                                    cx.background(),
5966                                ))
5967                                .detach();
5968                            Ok(client_conn)
5969                        }
5970                    })
5971                });
5972
5973            client
5974                .authenticate_and_connect(false, &cx.to_async())
5975                .await
5976                .unwrap();
5977
5978            Channel::init(&client);
5979            Project::init(&client);
5980            cx.update(|cx| {
5981                workspace::init(&client, cx);
5982            });
5983
5984            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5985            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5986
5987            let client = TestClient {
5988                client,
5989                peer_id,
5990                username: name.to_string(),
5991                user_store,
5992                language_registry: Arc::new(LanguageRegistry::test()),
5993                project: Default::default(),
5994                buffers: Default::default(),
5995            };
5996            client.wait_for_current_user(cx).await;
5997            client
5998        }
5999
6000        fn disconnect_client(&self, user_id: UserId) {
6001            self.connection_killers
6002                .lock()
6003                .remove(&user_id)
6004                .unwrap()
6005                .store(true, SeqCst);
6006        }
6007
6008        fn forbid_connections(&self) {
6009            self.forbid_connections.store(true, SeqCst);
6010        }
6011
6012        fn allow_connections(&self) {
6013            self.forbid_connections.store(false, SeqCst);
6014        }
6015
6016        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6017            Arc::new(AppState {
6018                db: test_db.db().clone(),
6019                api_token: Default::default(),
6020            })
6021        }
6022
6023        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6024            self.server.store.read().await
6025        }
6026
6027        async fn condition<F>(&mut self, mut predicate: F)
6028        where
6029            F: FnMut(&Store) -> bool,
6030        {
6031            assert!(
6032                self.foreground.parking_forbidden(),
6033                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6034            );
6035            while !(predicate)(&*self.server.store.read().await) {
6036                self.foreground.start_waiting();
6037                self.notifications.next().await;
6038                self.foreground.finish_waiting();
6039            }
6040        }
6041    }
6042
6043    impl Deref for TestServer {
6044        type Target = Server;
6045
6046        fn deref(&self) -> &Self::Target {
6047            &self.server
6048        }
6049    }
6050
6051    impl Drop for TestServer {
6052        fn drop(&mut self) {
6053            self.peer.reset();
6054        }
6055    }
6056
6057    struct TestClient {
6058        client: Arc<Client>,
6059        username: String,
6060        pub peer_id: PeerId,
6061        pub user_store: ModelHandle<UserStore>,
6062        language_registry: Arc<LanguageRegistry>,
6063        project: Option<ModelHandle<Project>>,
6064        buffers: HashSet<ModelHandle<language::Buffer>>,
6065    }
6066
6067    impl Deref for TestClient {
6068        type Target = Arc<Client>;
6069
6070        fn deref(&self) -> &Self::Target {
6071            &self.client
6072        }
6073    }
6074
6075    impl TestClient {
6076        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6077            UserId::from_proto(
6078                self.user_store
6079                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6080            )
6081        }
6082
6083        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6084            let mut authed_user = self
6085                .user_store
6086                .read_with(cx, |user_store, _| user_store.watch_current_user());
6087            while authed_user.next().await.unwrap().is_none() {}
6088        }
6089
6090        async fn build_local_project(
6091            &mut self,
6092            fs: Arc<FakeFs>,
6093            root_path: impl AsRef<Path>,
6094            cx: &mut TestAppContext,
6095        ) -> (ModelHandle<Project>, WorktreeId) {
6096            let project = cx.update(|cx| {
6097                Project::local(
6098                    self.client.clone(),
6099                    self.user_store.clone(),
6100                    self.language_registry.clone(),
6101                    fs,
6102                    cx,
6103                )
6104            });
6105            self.project = Some(project.clone());
6106            let (worktree, _) = project
6107                .update(cx, |p, cx| {
6108                    p.find_or_create_local_worktree(root_path, true, cx)
6109                })
6110                .await
6111                .unwrap();
6112            worktree
6113                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6114                .await;
6115            project
6116                .update(cx, |project, _| project.next_remote_id())
6117                .await;
6118            (project, worktree.read_with(cx, |tree, _| tree.id()))
6119        }
6120
6121        async fn build_remote_project(
6122            &mut self,
6123            project_id: u64,
6124            cx: &mut TestAppContext,
6125        ) -> ModelHandle<Project> {
6126            let project = Project::remote(
6127                project_id,
6128                self.client.clone(),
6129                self.user_store.clone(),
6130                self.language_registry.clone(),
6131                FakeFs::new(cx.background()),
6132                &mut cx.to_async(),
6133            )
6134            .await
6135            .unwrap();
6136            self.project = Some(project.clone());
6137            project
6138        }
6139
6140        fn build_workspace(
6141            &self,
6142            project: &ModelHandle<Project>,
6143            cx: &mut TestAppContext,
6144        ) -> ViewHandle<Workspace> {
6145            let (window_id, _) = cx.add_window(|_| EmptyView);
6146            cx.add_view(window_id, |cx| {
6147                let fs = project.read(cx).fs().clone();
6148                Workspace::new(
6149                    &WorkspaceParams {
6150                        fs,
6151                        project: project.clone(),
6152                        user_store: self.user_store.clone(),
6153                        languages: self.language_registry.clone(),
6154                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6155                        channel_list: cx.add_model(|cx| {
6156                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6157                        }),
6158                        client: self.client.clone(),
6159                    },
6160                    cx,
6161                )
6162            })
6163        }
6164
6165        async fn simulate_host(
6166            mut self,
6167            project: ModelHandle<Project>,
6168            files: Arc<Mutex<Vec<PathBuf>>>,
6169            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6170            rng: Arc<Mutex<StdRng>>,
6171            mut cx: TestAppContext,
6172        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6173            async fn simulate_host_internal(
6174                client: &mut TestClient,
6175                project: ModelHandle<Project>,
6176                files: Arc<Mutex<Vec<PathBuf>>>,
6177                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6178                rng: Arc<Mutex<StdRng>>,
6179                cx: &mut TestAppContext,
6180            ) -> anyhow::Result<()> {
6181                let fs = project.read_with(cx, |project, _| project.fs().clone());
6182
6183                while op_start_signal.next().await.is_some() {
6184                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6185                    match distribution {
6186                        0..=20 if !files.lock().is_empty() => {
6187                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6188                            let mut path = path.as_path();
6189                            while let Some(parent_path) = path.parent() {
6190                                path = parent_path;
6191                                if rng.lock().gen() {
6192                                    break;
6193                                }
6194                            }
6195
6196                            log::info!("Host: find/create local worktree {:?}", path);
6197                            let find_or_create_worktree = project.update(cx, |project, cx| {
6198                                project.find_or_create_local_worktree(path, true, cx)
6199                            });
6200                            if rng.lock().gen() {
6201                                cx.background().spawn(find_or_create_worktree).detach();
6202                            } else {
6203                                find_or_create_worktree.await?;
6204                            }
6205                        }
6206                        10..=80 if !files.lock().is_empty() => {
6207                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6208                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6209                                let (worktree, path) = project
6210                                    .update(cx, |project, cx| {
6211                                        project.find_or_create_local_worktree(
6212                                            file.clone(),
6213                                            true,
6214                                            cx,
6215                                        )
6216                                    })
6217                                    .await?;
6218                                let project_path =
6219                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6220                                log::info!(
6221                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6222                                    file,
6223                                    project_path.0,
6224                                    project_path.1
6225                                );
6226                                let buffer = project
6227                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6228                                    .await
6229                                    .unwrap();
6230                                client.buffers.insert(buffer.clone());
6231                                buffer
6232                            } else {
6233                                client
6234                                    .buffers
6235                                    .iter()
6236                                    .choose(&mut *rng.lock())
6237                                    .unwrap()
6238                                    .clone()
6239                            };
6240
6241                            if rng.lock().gen_bool(0.1) {
6242                                cx.update(|cx| {
6243                                    log::info!(
6244                                        "Host: dropping buffer {:?}",
6245                                        buffer.read(cx).file().unwrap().full_path(cx)
6246                                    );
6247                                    client.buffers.remove(&buffer);
6248                                    drop(buffer);
6249                                });
6250                            } else {
6251                                buffer.update(cx, |buffer, cx| {
6252                                    log::info!(
6253                                        "Host: updating buffer {:?} ({})",
6254                                        buffer.file().unwrap().full_path(cx),
6255                                        buffer.remote_id()
6256                                    );
6257
6258                                    if rng.lock().gen_bool(0.7) {
6259                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6260                                    } else {
6261                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6262                                    }
6263                                });
6264                            }
6265                        }
6266                        _ => loop {
6267                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6268                            let mut path = PathBuf::new();
6269                            path.push("/");
6270                            for _ in 0..path_component_count {
6271                                let letter = rng.lock().gen_range(b'a'..=b'z');
6272                                path.push(std::str::from_utf8(&[letter]).unwrap());
6273                            }
6274                            path.set_extension("rs");
6275                            let parent_path = path.parent().unwrap();
6276
6277                            log::info!("Host: creating file {:?}", path,);
6278
6279                            if fs.create_dir(&parent_path).await.is_ok()
6280                                && fs.create_file(&path, Default::default()).await.is_ok()
6281                            {
6282                                files.lock().push(path);
6283                                break;
6284                            } else {
6285                                log::info!("Host: cannot create file");
6286                            }
6287                        },
6288                    }
6289
6290                    cx.background().simulate_random_delay().await;
6291                }
6292
6293                Ok(())
6294            }
6295
6296            let result = simulate_host_internal(
6297                &mut self,
6298                project.clone(),
6299                files,
6300                op_start_signal,
6301                rng,
6302                &mut cx,
6303            )
6304            .await;
6305            log::info!("Host done");
6306            self.project = Some(project);
6307            (self, cx, result.err())
6308        }
6309
6310        pub async fn simulate_guest(
6311            mut self,
6312            guest_username: String,
6313            project: ModelHandle<Project>,
6314            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6315            rng: Arc<Mutex<StdRng>>,
6316            mut cx: TestAppContext,
6317        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6318            async fn simulate_guest_internal(
6319                client: &mut TestClient,
6320                guest_username: &str,
6321                project: ModelHandle<Project>,
6322                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6323                rng: Arc<Mutex<StdRng>>,
6324                cx: &mut TestAppContext,
6325            ) -> anyhow::Result<()> {
6326                while op_start_signal.next().await.is_some() {
6327                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6328                        let worktree = if let Some(worktree) =
6329                            project.read_with(cx, |project, cx| {
6330                                project
6331                                    .worktrees(&cx)
6332                                    .filter(|worktree| {
6333                                        let worktree = worktree.read(cx);
6334                                        worktree.is_visible()
6335                                            && worktree.entries(false).any(|e| e.is_file())
6336                                    })
6337                                    .choose(&mut *rng.lock())
6338                            }) {
6339                            worktree
6340                        } else {
6341                            cx.background().simulate_random_delay().await;
6342                            continue;
6343                        };
6344
6345                        let (worktree_root_name, project_path) =
6346                            worktree.read_with(cx, |worktree, _| {
6347                                let entry = worktree
6348                                    .entries(false)
6349                                    .filter(|e| e.is_file())
6350                                    .choose(&mut *rng.lock())
6351                                    .unwrap();
6352                                (
6353                                    worktree.root_name().to_string(),
6354                                    (worktree.id(), entry.path.clone()),
6355                                )
6356                            });
6357                        log::info!(
6358                            "{}: opening path {:?} in worktree {} ({})",
6359                            guest_username,
6360                            project_path.1,
6361                            project_path.0,
6362                            worktree_root_name,
6363                        );
6364                        let buffer = project
6365                            .update(cx, |project, cx| {
6366                                project.open_buffer(project_path.clone(), cx)
6367                            })
6368                            .await?;
6369                        log::info!(
6370                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6371                            guest_username,
6372                            project_path.1,
6373                            project_path.0,
6374                            worktree_root_name,
6375                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6376                        );
6377                        client.buffers.insert(buffer.clone());
6378                        buffer
6379                    } else {
6380                        client
6381                            .buffers
6382                            .iter()
6383                            .choose(&mut *rng.lock())
6384                            .unwrap()
6385                            .clone()
6386                    };
6387
6388                    let choice = rng.lock().gen_range(0..100);
6389                    match choice {
6390                        0..=9 => {
6391                            cx.update(|cx| {
6392                                log::info!(
6393                                    "{}: dropping buffer {:?}",
6394                                    guest_username,
6395                                    buffer.read(cx).file().unwrap().full_path(cx)
6396                                );
6397                                client.buffers.remove(&buffer);
6398                                drop(buffer);
6399                            });
6400                        }
6401                        10..=19 => {
6402                            let completions = project.update(cx, |project, cx| {
6403                                log::info!(
6404                                    "{}: requesting completions for buffer {} ({:?})",
6405                                    guest_username,
6406                                    buffer.read(cx).remote_id(),
6407                                    buffer.read(cx).file().unwrap().full_path(cx)
6408                                );
6409                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6410                                project.completions(&buffer, offset, cx)
6411                            });
6412                            let completions = cx.background().spawn(async move {
6413                                completions
6414                                    .await
6415                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6416                            });
6417                            if rng.lock().gen_bool(0.3) {
6418                                log::info!("{}: detaching completions request", guest_username);
6419                                cx.update(|cx| completions.detach_and_log_err(cx));
6420                            } else {
6421                                completions.await?;
6422                            }
6423                        }
6424                        20..=29 => {
6425                            let code_actions = project.update(cx, |project, cx| {
6426                                log::info!(
6427                                    "{}: requesting code actions for buffer {} ({:?})",
6428                                    guest_username,
6429                                    buffer.read(cx).remote_id(),
6430                                    buffer.read(cx).file().unwrap().full_path(cx)
6431                                );
6432                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6433                                project.code_actions(&buffer, range, cx)
6434                            });
6435                            let code_actions = cx.background().spawn(async move {
6436                                code_actions.await.map_err(|err| {
6437                                    anyhow!("code actions request failed: {:?}", err)
6438                                })
6439                            });
6440                            if rng.lock().gen_bool(0.3) {
6441                                log::info!("{}: detaching code actions request", guest_username);
6442                                cx.update(|cx| code_actions.detach_and_log_err(cx));
6443                            } else {
6444                                code_actions.await?;
6445                            }
6446                        }
6447                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6448                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6449                                log::info!(
6450                                    "{}: saving buffer {} ({:?})",
6451                                    guest_username,
6452                                    buffer.remote_id(),
6453                                    buffer.file().unwrap().full_path(cx)
6454                                );
6455                                (buffer.version(), buffer.save(cx))
6456                            });
6457                            let save = cx.background().spawn(async move {
6458                                let (saved_version, _) = save
6459                                    .await
6460                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6461                                assert!(saved_version.observed_all(&requested_version));
6462                                Ok::<_, anyhow::Error>(())
6463                            });
6464                            if rng.lock().gen_bool(0.3) {
6465                                log::info!("{}: detaching save request", guest_username);
6466                                cx.update(|cx| save.detach_and_log_err(cx));
6467                            } else {
6468                                save.await?;
6469                            }
6470                        }
6471                        40..=44 => {
6472                            let prepare_rename = project.update(cx, |project, cx| {
6473                                log::info!(
6474                                    "{}: preparing rename for buffer {} ({:?})",
6475                                    guest_username,
6476                                    buffer.read(cx).remote_id(),
6477                                    buffer.read(cx).file().unwrap().full_path(cx)
6478                                );
6479                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6480                                project.prepare_rename(buffer, offset, cx)
6481                            });
6482                            let prepare_rename = cx.background().spawn(async move {
6483                                prepare_rename.await.map_err(|err| {
6484                                    anyhow!("prepare rename request failed: {:?}", err)
6485                                })
6486                            });
6487                            if rng.lock().gen_bool(0.3) {
6488                                log::info!("{}: detaching prepare rename request", guest_username);
6489                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6490                            } else {
6491                                prepare_rename.await?;
6492                            }
6493                        }
6494                        45..=49 => {
6495                            let definitions = project.update(cx, |project, cx| {
6496                                log::info!(
6497                                    "{}: requesting definitions for buffer {} ({:?})",
6498                                    guest_username,
6499                                    buffer.read(cx).remote_id(),
6500                                    buffer.read(cx).file().unwrap().full_path(cx)
6501                                );
6502                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6503                                project.definition(&buffer, offset, cx)
6504                            });
6505                            let definitions = cx.background().spawn(async move {
6506                                definitions
6507                                    .await
6508                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6509                            });
6510                            if rng.lock().gen_bool(0.3) {
6511                                log::info!("{}: detaching definitions request", guest_username);
6512                                cx.update(|cx| definitions.detach_and_log_err(cx));
6513                            } else {
6514                                client
6515                                    .buffers
6516                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6517                            }
6518                        }
6519                        50..=54 => {
6520                            let highlights = project.update(cx, |project, cx| {
6521                                log::info!(
6522                                    "{}: requesting highlights for buffer {} ({:?})",
6523                                    guest_username,
6524                                    buffer.read(cx).remote_id(),
6525                                    buffer.read(cx).file().unwrap().full_path(cx)
6526                                );
6527                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6528                                project.document_highlights(&buffer, offset, cx)
6529                            });
6530                            let highlights = cx.background().spawn(async move {
6531                                highlights
6532                                    .await
6533                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6534                            });
6535                            if rng.lock().gen_bool(0.3) {
6536                                log::info!("{}: detaching highlights request", guest_username);
6537                                cx.update(|cx| highlights.detach_and_log_err(cx));
6538                            } else {
6539                                highlights.await?;
6540                            }
6541                        }
6542                        55..=59 => {
6543                            let search = project.update(cx, |project, cx| {
6544                                let query = rng.lock().gen_range('a'..='z');
6545                                log::info!("{}: project-wide search {:?}", guest_username, query);
6546                                project.search(SearchQuery::text(query, false, false), cx)
6547                            });
6548                            let search = cx.background().spawn(async move {
6549                                search
6550                                    .await
6551                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
6552                            });
6553                            if rng.lock().gen_bool(0.3) {
6554                                log::info!("{}: detaching search request", guest_username);
6555                                cx.update(|cx| search.detach_and_log_err(cx));
6556                            } else {
6557                                client.buffers.extend(search.await?.into_keys());
6558                            }
6559                        }
6560                        60..=69 => {
6561                            let worktree = project
6562                                .read_with(cx, |project, cx| {
6563                                    project
6564                                        .worktrees(&cx)
6565                                        .filter(|worktree| {
6566                                            let worktree = worktree.read(cx);
6567                                            worktree.is_visible()
6568                                                && worktree.entries(false).any(|e| e.is_file())
6569                                                && worktree
6570                                                    .root_entry()
6571                                                    .map_or(false, |e| e.is_dir())
6572                                        })
6573                                        .choose(&mut *rng.lock())
6574                                })
6575                                .unwrap();
6576                            let (worktree_id, worktree_root_name) = worktree
6577                                .read_with(cx, |worktree, _| {
6578                                    (worktree.id(), worktree.root_name().to_string())
6579                                });
6580
6581                            let mut new_name = String::new();
6582                            for _ in 0..10 {
6583                                let letter = rng.lock().gen_range('a'..='z');
6584                                new_name.push(letter);
6585                            }
6586                            let mut new_path = PathBuf::new();
6587                            new_path.push(new_name);
6588                            new_path.set_extension("rs");
6589                            log::info!(
6590                                "{}: creating {:?} in worktree {} ({})",
6591                                guest_username,
6592                                new_path,
6593                                worktree_id,
6594                                worktree_root_name,
6595                            );
6596                            project
6597                                .update(cx, |project, cx| {
6598                                    project.create_entry((worktree_id, new_path), false, cx)
6599                                })
6600                                .unwrap()
6601                                .await?;
6602                        }
6603                        _ => {
6604                            buffer.update(cx, |buffer, cx| {
6605                                log::info!(
6606                                    "{}: updating buffer {} ({:?})",
6607                                    guest_username,
6608                                    buffer.remote_id(),
6609                                    buffer.file().unwrap().full_path(cx)
6610                                );
6611                                if rng.lock().gen_bool(0.7) {
6612                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6613                                } else {
6614                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6615                                }
6616                            });
6617                        }
6618                    }
6619                    cx.background().simulate_random_delay().await;
6620                }
6621                Ok(())
6622            }
6623
6624            let result = simulate_guest_internal(
6625                &mut self,
6626                &guest_username,
6627                project.clone(),
6628                op_start_signal,
6629                rng,
6630                &mut cx,
6631            )
6632            .await;
6633            log::info!("{}: done", guest_username);
6634
6635            self.project = Some(project);
6636            (self, cx, result.err())
6637        }
6638    }
6639
6640    impl Drop for TestClient {
6641        fn drop(&mut self) {
6642            self.client.tear_down();
6643        }
6644    }
6645
6646    impl Executor for Arc<gpui::executor::Background> {
6647        type Sleep = gpui::executor::Timer;
6648
6649        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6650            self.spawn(future).detach();
6651        }
6652
6653        fn sleep(&self, duration: Duration) -> Self::Sleep {
6654            self.as_ref().timer(duration)
6655        }
6656    }
6657
6658    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6659        channel
6660            .messages()
6661            .cursor::<()>()
6662            .map(|m| {
6663                (
6664                    m.sender.github_login.clone(),
6665                    m.body.clone(),
6666                    m.is_pending(),
6667                )
6668            })
6669            .collect()
6670    }
6671
6672    struct EmptyView;
6673
6674    impl gpui::Entity for EmptyView {
6675        type Event = ();
6676    }
6677
6678    impl gpui::View for EmptyView {
6679        fn ui_name() -> &'static str {
6680            "empty view"
6681        }
6682
6683        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6684            gpui::Element::boxed(gpui::elements::Empty)
6685        }
6686    }
6687}