rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_io::Timer;
  10use async_std::{
  11    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  12    task,
  13};
  14use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  15use collections::{HashMap, HashSet};
  16use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  17use log::{as_debug, as_display};
  18use rpc::{
  19    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  20    Connection, ConnectionId, Peer, TypedEnvelope,
  21};
  22use sha1::{Digest as _, Sha1};
  23use std::{
  24    any::TypeId,
  25    future::Future,
  26    marker::PhantomData,
  27    ops::{Deref, DerefMut},
  28    rc::Rc,
  29    sync::Arc,
  30    time::{Duration, Instant},
  31};
  32use store::{Store, Worktree};
  33use surf::StatusCode;
  34use tide::{
  35    http::headers::{HeaderName, CONNECTION, UPGRADE},
  36    Request, Response,
  37};
  38use time::OffsetDateTime;
  39use util::ResultExt;
  40
  41type MessageHandler = Box<
  42    dyn Send
  43        + Sync
  44        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  45>;
  46
  47pub struct Server {
  48    peer: Arc<Peer>,
  49    store: RwLock<Store>,
  50    app_state: Arc<AppState>,
  51    handlers: HashMap<TypeId, MessageHandler>,
  52    notifications: Option<mpsc::UnboundedSender<()>>,
  53}
  54
  55pub trait Executor: Send + Clone {
  56    type Timer: Send + Future;
  57    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  58    fn timer(&self, duration: Duration) -> Self::Timer;
  59}
  60
  61#[derive(Clone)]
  62pub struct RealExecutor;
  63
  64const MESSAGE_COUNT_PER_PAGE: usize = 100;
  65const MAX_MESSAGE_LEN: usize = 1024;
  66
  67struct StoreReadGuard<'a> {
  68    guard: RwLockReadGuard<'a, Store>,
  69    _not_send: PhantomData<Rc<()>>,
  70}
  71
  72struct StoreWriteGuard<'a> {
  73    guard: RwLockWriteGuard<'a, Store>,
  74    _not_send: PhantomData<Rc<()>>,
  75}
  76
  77impl Server {
  78    pub fn new(
  79        app_state: Arc<AppState>,
  80        peer: Arc<Peer>,
  81        notifications: Option<mpsc::UnboundedSender<()>>,
  82    ) -> Arc<Self> {
  83        let mut server = Self {
  84            peer,
  85            app_state,
  86            store: Default::default(),
  87            handlers: Default::default(),
  88            notifications,
  89        };
  90
  91        server
  92            .add_request_handler(Server::ping)
  93            .add_request_handler(Server::register_project)
  94            .add_message_handler(Server::unregister_project)
  95            .add_request_handler(Server::share_project)
  96            .add_message_handler(Server::unshare_project)
  97            .add_sync_request_handler(Server::join_project)
  98            .add_message_handler(Server::leave_project)
  99            .add_request_handler(Server::register_worktree)
 100            .add_message_handler(Server::unregister_worktree)
 101            .add_request_handler(Server::update_worktree)
 102            .add_message_handler(Server::start_language_server)
 103            .add_message_handler(Server::update_language_server)
 104            .add_message_handler(Server::update_diagnostic_summary)
 105            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 106            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 107            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 108            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 109            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 110            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 111            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 112            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 113            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 114            .add_request_handler(
 115                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 116            )
 117            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 118            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 119            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 120            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 121            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 122            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 123            .add_request_handler(Server::update_buffer)
 124            .add_message_handler(Server::update_buffer_file)
 125            .add_message_handler(Server::buffer_reloaded)
 126            .add_message_handler(Server::buffer_saved)
 127            .add_request_handler(Server::save_buffer)
 128            .add_request_handler(Server::get_channels)
 129            .add_request_handler(Server::get_users)
 130            .add_request_handler(Server::join_channel)
 131            .add_message_handler(Server::leave_channel)
 132            .add_request_handler(Server::send_channel_message)
 133            .add_request_handler(Server::follow)
 134            .add_message_handler(Server::unfollow)
 135            .add_message_handler(Server::update_followers)
 136            .add_request_handler(Server::get_channel_messages);
 137
 138        Arc::new(server)
 139    }
 140
 141    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 142    where
 143        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 144        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 145        M: EnvelopedMessage,
 146    {
 147        let prev_handler = self.handlers.insert(
 148            TypeId::of::<M>(),
 149            Box::new(move |server, envelope| {
 150                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 151                (handler)(server, *envelope).boxed()
 152            }),
 153        );
 154        if prev_handler.is_some() {
 155            panic!("registered a handler for the same message twice");
 156        }
 157        self
 158    }
 159
 160    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 161    where
 162        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 163        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 164        M: RequestMessage,
 165    {
 166        self.add_message_handler(move |server, envelope| {
 167            let receipt = envelope.receipt();
 168            let response = (handler)(server.clone(), envelope);
 169            async move {
 170                match response.await {
 171                    Ok(response) => {
 172                        server.peer.respond(receipt, response)?;
 173                        Ok(())
 174                    }
 175                    Err(error) => {
 176                        server.peer.respond_with_error(
 177                            receipt,
 178                            proto::Error {
 179                                message: error.to_string(),
 180                            },
 181                        )?;
 182                        Err(error)
 183                    }
 184                }
 185            }
 186        })
 187    }
 188
 189    /// Handle a request while holding a lock to the store. This is useful when we're registering
 190    /// a connection but we want to respond on the connection before anybody else can send on it.
 191    fn add_sync_request_handler<F, M>(&mut self, handler: F) -> &mut Self
 192    where
 193        F: 'static
 194            + Send
 195            + Sync
 196            + Fn(Arc<Self>, &mut Store, TypedEnvelope<M>) -> tide::Result<M::Response>,
 197        M: RequestMessage,
 198    {
 199        let handler = Arc::new(handler);
 200        self.add_message_handler(move |server, envelope| {
 201            let receipt = envelope.receipt();
 202            let handler = handler.clone();
 203            async move {
 204                let mut store = server.store.write().await;
 205                let response = (handler)(server.clone(), &mut *store, envelope);
 206                match response {
 207                    Ok(response) => {
 208                        server.peer.respond(receipt, response)?;
 209                        Ok(())
 210                    }
 211                    Err(error) => {
 212                        server.peer.respond_with_error(
 213                            receipt,
 214                            proto::Error {
 215                                message: error.to_string(),
 216                            },
 217                        )?;
 218                        Err(error)
 219                    }
 220                }
 221            }
 222        })
 223    }
 224
 225    pub fn handle_connection<E: Executor>(
 226        self: &Arc<Self>,
 227        connection: Connection,
 228        addr: String,
 229        user_id: UserId,
 230        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 231        executor: E,
 232    ) -> impl Future<Output = ()> {
 233        let mut this = self.clone();
 234        async move {
 235            let (connection_id, handle_io, mut incoming_rx) = this
 236                .peer
 237                .add_connection(connection, {
 238                    let executor = executor.clone();
 239                    move |duration| {
 240                        let timer = executor.timer(duration);
 241                        async move {
 242                            timer.await;
 243                        }
 244                    }
 245                })
 246                .await;
 247
 248            if let Some(send_connection_id) = send_connection_id.as_mut() {
 249                let _ = send_connection_id.send(connection_id).await;
 250            }
 251
 252            this.state_mut()
 253                .await
 254                .add_connection(connection_id, user_id);
 255            this.update_contacts_for_users(&[user_id]).await;
 256
 257            let handle_io = handle_io.fuse();
 258            futures::pin_mut!(handle_io);
 259            loop {
 260                let next_message = incoming_rx.next().fuse();
 261                futures::pin_mut!(next_message);
 262                futures::select_biased! {
 263                    result = handle_io => {
 264                        if let Err(err) = result {
 265                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 266                        }
 267                        break;
 268                    }
 269                    message = next_message => {
 270                        if let Some(message) = message {
 271                            let start_time = Instant::now();
 272                            let type_name = message.payload_type_name();
 273                            log::info!(connection_id = connection_id.0, type_name = type_name; "rpc message received");
 274                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 275                                let notifications = this.notifications.clone();
 276                                let is_background = message.is_background();
 277                                let handle_message = (handler)(this.clone(), message);
 278                                let handle_message = async move {
 279                                    if let Err(err) = handle_message.await {
 280                                        log::error!(connection_id = connection_id.0, type = type_name, error = as_display!(err); "rpc message error");
 281                                    } else {
 282                                        log::info!(connection_id = connection_id.0, type = type_name, duration = as_debug!(start_time.elapsed()); "rpc message handled");
 283                                    }
 284                                    if let Some(mut notifications) = notifications {
 285                                        let _ = notifications.send(()).await;
 286                                    }
 287                                };
 288                                if is_background {
 289                                    executor.spawn_detached(handle_message);
 290                                } else {
 291                                    handle_message.await;
 292                                }
 293                            } else {
 294                                log::warn!("unhandled message: {}", type_name);
 295                            }
 296                        } else {
 297                            log::info!(address = as_debug!(addr); "rpc connection closed");
 298                            break;
 299                        }
 300                    }
 301                }
 302            }
 303
 304            if let Err(err) = this.sign_out(connection_id).await {
 305                log::error!("error signing out connection {:?} - {:?}", addr, err);
 306            }
 307        }
 308    }
 309
 310    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 311        self.peer.disconnect(connection_id);
 312        let removed_connection = self.state_mut().await.remove_connection(connection_id)?;
 313
 314        for (project_id, project) in removed_connection.hosted_projects {
 315            if let Some(share) = project.share {
 316                broadcast(
 317                    connection_id,
 318                    share.guests.keys().copied().collect(),
 319                    |conn_id| {
 320                        self.peer
 321                            .send(conn_id, proto::UnshareProject { project_id })
 322                    },
 323                );
 324            }
 325        }
 326
 327        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 328            broadcast(connection_id, peer_ids, |conn_id| {
 329                self.peer.send(
 330                    conn_id,
 331                    proto::RemoveProjectCollaborator {
 332                        project_id,
 333                        peer_id: connection_id.0,
 334                    },
 335                )
 336            });
 337        }
 338
 339        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 340            .await;
 341        Ok(())
 342    }
 343
 344    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 345        Ok(proto::Ack {})
 346    }
 347
 348    async fn register_project(
 349        mut self: Arc<Server>,
 350        request: TypedEnvelope<proto::RegisterProject>,
 351    ) -> tide::Result<proto::RegisterProjectResponse> {
 352        let project_id = {
 353            let mut state = self.state_mut().await;
 354            let user_id = state.user_id_for_connection(request.sender_id)?;
 355            state.register_project(request.sender_id, user_id)
 356        };
 357        Ok(proto::RegisterProjectResponse { project_id })
 358    }
 359
 360    async fn unregister_project(
 361        mut self: Arc<Server>,
 362        request: TypedEnvelope<proto::UnregisterProject>,
 363    ) -> tide::Result<()> {
 364        let project = self
 365            .state_mut()
 366            .await
 367            .unregister_project(request.payload.project_id, request.sender_id)?;
 368        self.update_contacts_for_users(project.authorized_user_ids().iter())
 369            .await;
 370        Ok(())
 371    }
 372
 373    async fn share_project(
 374        mut self: Arc<Server>,
 375        request: TypedEnvelope<proto::ShareProject>,
 376    ) -> tide::Result<proto::Ack> {
 377        self.state_mut()
 378            .await
 379            .share_project(request.payload.project_id, request.sender_id);
 380        Ok(proto::Ack {})
 381    }
 382
 383    async fn unshare_project(
 384        mut self: Arc<Server>,
 385        request: TypedEnvelope<proto::UnshareProject>,
 386    ) -> tide::Result<()> {
 387        let project_id = request.payload.project_id;
 388        let project = self
 389            .state_mut()
 390            .await
 391            .unshare_project(project_id, request.sender_id)?;
 392
 393        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 394            self.peer
 395                .send(conn_id, proto::UnshareProject { project_id })
 396        });
 397        self.update_contacts_for_users(&project.authorized_user_ids)
 398            .await;
 399        Ok(())
 400    }
 401
 402    fn join_project(
 403        self: Arc<Server>,
 404        state: &mut Store,
 405        request: TypedEnvelope<proto::JoinProject>,
 406    ) -> tide::Result<proto::JoinProjectResponse> {
 407        let project_id = request.payload.project_id;
 408
 409        let user_id = state.user_id_for_connection(request.sender_id)?;
 410        let (response, connection_ids, contact_user_ids) = state
 411            .join_project(request.sender_id, user_id, project_id)
 412            .and_then(|joined| {
 413                let share = joined.project.share()?;
 414                let peer_count = share.guests.len();
 415                let mut collaborators = Vec::with_capacity(peer_count);
 416                collaborators.push(proto::Collaborator {
 417                    peer_id: joined.project.host_connection_id.0,
 418                    replica_id: 0,
 419                    user_id: joined.project.host_user_id.to_proto(),
 420                });
 421                let worktrees = share
 422                    .worktrees
 423                    .iter()
 424                    .filter_map(|(id, shared_worktree)| {
 425                        let worktree = joined.project.worktrees.get(&id)?;
 426                        Some(proto::Worktree {
 427                            id: *id,
 428                            root_name: worktree.root_name.clone(),
 429                            entries: shared_worktree.entries.values().cloned().collect(),
 430                            diagnostic_summaries: shared_worktree
 431                                .diagnostic_summaries
 432                                .values()
 433                                .cloned()
 434                                .collect(),
 435                            visible: worktree.visible,
 436                        })
 437                    })
 438                    .collect();
 439                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 440                    if *peer_conn_id != request.sender_id {
 441                        collaborators.push(proto::Collaborator {
 442                            peer_id: peer_conn_id.0,
 443                            replica_id: *peer_replica_id as u32,
 444                            user_id: peer_user_id.to_proto(),
 445                        });
 446                    }
 447                }
 448                let response = proto::JoinProjectResponse {
 449                    worktrees,
 450                    replica_id: joined.replica_id as u32,
 451                    collaborators,
 452                    language_servers: joined.project.language_servers.clone(),
 453                };
 454                let connection_ids = joined.project.connection_ids();
 455                let contact_user_ids = joined.project.authorized_user_ids();
 456                Ok((response, connection_ids, contact_user_ids))
 457            })?;
 458
 459        broadcast(request.sender_id, connection_ids, |conn_id| {
 460            self.peer.send(
 461                conn_id,
 462                proto::AddProjectCollaborator {
 463                    project_id,
 464                    collaborator: Some(proto::Collaborator {
 465                        peer_id: request.sender_id.0,
 466                        replica_id: response.replica_id,
 467                        user_id: user_id.to_proto(),
 468                    }),
 469                },
 470            )
 471        });
 472        self.update_contacts_for_users_sync(state, &contact_user_ids);
 473        Ok(response)
 474    }
 475
 476    async fn leave_project(
 477        mut self: Arc<Server>,
 478        request: TypedEnvelope<proto::LeaveProject>,
 479    ) -> tide::Result<()> {
 480        let sender_id = request.sender_id;
 481        let project_id = request.payload.project_id;
 482        let worktree = self
 483            .state_mut()
 484            .await
 485            .leave_project(sender_id, project_id)?;
 486
 487        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 488            self.peer.send(
 489                conn_id,
 490                proto::RemoveProjectCollaborator {
 491                    project_id,
 492                    peer_id: sender_id.0,
 493                },
 494            )
 495        });
 496        self.update_contacts_for_users(&worktree.authorized_user_ids)
 497            .await;
 498
 499        Ok(())
 500    }
 501
 502    async fn register_worktree(
 503        mut self: Arc<Server>,
 504        request: TypedEnvelope<proto::RegisterWorktree>,
 505    ) -> tide::Result<proto::Ack> {
 506        let host_user_id = self
 507            .state()
 508            .await
 509            .user_id_for_connection(request.sender_id)?;
 510
 511        let mut contact_user_ids = HashSet::default();
 512        contact_user_ids.insert(host_user_id);
 513        for github_login in &request.payload.authorized_logins {
 514            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 515            contact_user_ids.insert(contact_user_id);
 516        }
 517
 518        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 519        let guest_connection_ids;
 520        {
 521            let mut state = self.state_mut().await;
 522            guest_connection_ids = state
 523                .read_project(request.payload.project_id, request.sender_id)?
 524                .guest_connection_ids();
 525            state.register_worktree(
 526                request.payload.project_id,
 527                request.payload.worktree_id,
 528                request.sender_id,
 529                Worktree {
 530                    authorized_user_ids: contact_user_ids.clone(),
 531                    root_name: request.payload.root_name.clone(),
 532                    visible: request.payload.visible,
 533                },
 534            )?;
 535        }
 536        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 537            self.peer
 538                .forward_send(request.sender_id, connection_id, request.payload.clone())
 539        });
 540        self.update_contacts_for_users(&contact_user_ids).await;
 541        Ok(proto::Ack {})
 542    }
 543
 544    async fn unregister_worktree(
 545        mut self: Arc<Server>,
 546        request: TypedEnvelope<proto::UnregisterWorktree>,
 547    ) -> tide::Result<()> {
 548        let project_id = request.payload.project_id;
 549        let worktree_id = request.payload.worktree_id;
 550        let (worktree, guest_connection_ids) = self.state_mut().await.unregister_worktree(
 551            project_id,
 552            worktree_id,
 553            request.sender_id,
 554        )?;
 555        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 556            self.peer.send(
 557                conn_id,
 558                proto::UnregisterWorktree {
 559                    project_id,
 560                    worktree_id,
 561                },
 562            )
 563        });
 564        self.update_contacts_for_users(&worktree.authorized_user_ids)
 565            .await;
 566        Ok(())
 567    }
 568
 569    async fn update_worktree(
 570        mut self: Arc<Server>,
 571        request: TypedEnvelope<proto::UpdateWorktree>,
 572    ) -> tide::Result<proto::Ack> {
 573        let connection_ids = self.state_mut().await.update_worktree(
 574            request.sender_id,
 575            request.payload.project_id,
 576            request.payload.worktree_id,
 577            &request.payload.removed_entries,
 578            &request.payload.updated_entries,
 579        )?;
 580
 581        broadcast(request.sender_id, connection_ids, |connection_id| {
 582            self.peer
 583                .forward_send(request.sender_id, connection_id, request.payload.clone())
 584        });
 585
 586        Ok(proto::Ack {})
 587    }
 588
 589    async fn update_diagnostic_summary(
 590        mut self: Arc<Server>,
 591        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 592    ) -> tide::Result<()> {
 593        let summary = request
 594            .payload
 595            .summary
 596            .clone()
 597            .ok_or_else(|| anyhow!("invalid summary"))?;
 598        let receiver_ids = self.state_mut().await.update_diagnostic_summary(
 599            request.payload.project_id,
 600            request.payload.worktree_id,
 601            request.sender_id,
 602            summary,
 603        )?;
 604
 605        broadcast(request.sender_id, receiver_ids, |connection_id| {
 606            self.peer
 607                .forward_send(request.sender_id, connection_id, request.payload.clone())
 608        });
 609        Ok(())
 610    }
 611
 612    async fn start_language_server(
 613        mut self: Arc<Server>,
 614        request: TypedEnvelope<proto::StartLanguageServer>,
 615    ) -> tide::Result<()> {
 616        let receiver_ids = self.state_mut().await.start_language_server(
 617            request.payload.project_id,
 618            request.sender_id,
 619            request
 620                .payload
 621                .server
 622                .clone()
 623                .ok_or_else(|| anyhow!("invalid language server"))?,
 624        )?;
 625        broadcast(request.sender_id, receiver_ids, |connection_id| {
 626            self.peer
 627                .forward_send(request.sender_id, connection_id, request.payload.clone())
 628        });
 629        Ok(())
 630    }
 631
 632    async fn update_language_server(
 633        self: Arc<Server>,
 634        request: TypedEnvelope<proto::UpdateLanguageServer>,
 635    ) -> tide::Result<()> {
 636        let receiver_ids = self
 637            .state()
 638            .await
 639            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 640        broadcast(request.sender_id, receiver_ids, |connection_id| {
 641            self.peer
 642                .forward_send(request.sender_id, connection_id, request.payload.clone())
 643        });
 644        Ok(())
 645    }
 646
 647    async fn forward_project_request<T>(
 648        self: Arc<Server>,
 649        request: TypedEnvelope<T>,
 650    ) -> tide::Result<T::Response>
 651    where
 652        T: EntityMessage + RequestMessage,
 653    {
 654        let host_connection_id = self
 655            .state()
 656            .await
 657            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 658            .host_connection_id;
 659        Ok(self
 660            .peer
 661            .forward_request(request.sender_id, host_connection_id, request.payload)
 662            .await?)
 663    }
 664
 665    async fn save_buffer(
 666        self: Arc<Server>,
 667        request: TypedEnvelope<proto::SaveBuffer>,
 668    ) -> tide::Result<proto::BufferSaved> {
 669        let host = self
 670            .state()
 671            .await
 672            .read_project(request.payload.project_id, request.sender_id)?
 673            .host_connection_id;
 674        let response = self
 675            .peer
 676            .forward_request(request.sender_id, host, request.payload.clone())
 677            .await?;
 678
 679        let mut guests = self
 680            .state()
 681            .await
 682            .read_project(request.payload.project_id, request.sender_id)?
 683            .connection_ids();
 684        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 685        broadcast(host, guests, |conn_id| {
 686            self.peer.forward_send(host, conn_id, response.clone())
 687        });
 688
 689        Ok(response)
 690    }
 691
 692    async fn update_buffer(
 693        self: Arc<Server>,
 694        request: TypedEnvelope<proto::UpdateBuffer>,
 695    ) -> tide::Result<proto::Ack> {
 696        let receiver_ids = self
 697            .state()
 698            .await
 699            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 700        broadcast(request.sender_id, receiver_ids, |connection_id| {
 701            self.peer
 702                .forward_send(request.sender_id, connection_id, request.payload.clone())
 703        });
 704        Ok(proto::Ack {})
 705    }
 706
 707    async fn update_buffer_file(
 708        self: Arc<Server>,
 709        request: TypedEnvelope<proto::UpdateBufferFile>,
 710    ) -> tide::Result<()> {
 711        let receiver_ids = self
 712            .state()
 713            .await
 714            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 715        broadcast(request.sender_id, receiver_ids, |connection_id| {
 716            self.peer
 717                .forward_send(request.sender_id, connection_id, request.payload.clone())
 718        });
 719        Ok(())
 720    }
 721
 722    async fn buffer_reloaded(
 723        self: Arc<Server>,
 724        request: TypedEnvelope<proto::BufferReloaded>,
 725    ) -> tide::Result<()> {
 726        let receiver_ids = self
 727            .state()
 728            .await
 729            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 730        broadcast(request.sender_id, receiver_ids, |connection_id| {
 731            self.peer
 732                .forward_send(request.sender_id, connection_id, request.payload.clone())
 733        });
 734        Ok(())
 735    }
 736
 737    async fn buffer_saved(
 738        self: Arc<Server>,
 739        request: TypedEnvelope<proto::BufferSaved>,
 740    ) -> tide::Result<()> {
 741        let receiver_ids = self
 742            .state()
 743            .await
 744            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 745        broadcast(request.sender_id, receiver_ids, |connection_id| {
 746            self.peer
 747                .forward_send(request.sender_id, connection_id, request.payload.clone())
 748        });
 749        Ok(())
 750    }
 751
 752    async fn follow(
 753        self: Arc<Self>,
 754        request: TypedEnvelope<proto::Follow>,
 755    ) -> tide::Result<proto::FollowResponse> {
 756        let leader_id = ConnectionId(request.payload.leader_id);
 757        let follower_id = request.sender_id;
 758        if !self
 759            .state()
 760            .await
 761            .project_connection_ids(request.payload.project_id, follower_id)?
 762            .contains(&leader_id)
 763        {
 764            Err(anyhow!("no such peer"))?;
 765        }
 766        let mut response = self
 767            .peer
 768            .forward_request(request.sender_id, leader_id, request.payload)
 769            .await?;
 770        response
 771            .views
 772            .retain(|view| view.leader_id != Some(follower_id.0));
 773        Ok(response)
 774    }
 775
 776    async fn unfollow(
 777        self: Arc<Self>,
 778        request: TypedEnvelope<proto::Unfollow>,
 779    ) -> tide::Result<()> {
 780        let leader_id = ConnectionId(request.payload.leader_id);
 781        if !self
 782            .state()
 783            .await
 784            .project_connection_ids(request.payload.project_id, request.sender_id)?
 785            .contains(&leader_id)
 786        {
 787            Err(anyhow!("no such peer"))?;
 788        }
 789        self.peer
 790            .forward_send(request.sender_id, leader_id, request.payload)?;
 791        Ok(())
 792    }
 793
 794    async fn update_followers(
 795        self: Arc<Self>,
 796        request: TypedEnvelope<proto::UpdateFollowers>,
 797    ) -> tide::Result<()> {
 798        let connection_ids = self
 799            .state()
 800            .await
 801            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 802        let leader_id = request
 803            .payload
 804            .variant
 805            .as_ref()
 806            .and_then(|variant| match variant {
 807                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 808                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 809                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 810            });
 811        for follower_id in &request.payload.follower_ids {
 812            let follower_id = ConnectionId(*follower_id);
 813            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 814                self.peer
 815                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 816            }
 817        }
 818        Ok(())
 819    }
 820
 821    async fn get_channels(
 822        self: Arc<Server>,
 823        request: TypedEnvelope<proto::GetChannels>,
 824    ) -> tide::Result<proto::GetChannelsResponse> {
 825        let user_id = self
 826            .state()
 827            .await
 828            .user_id_for_connection(request.sender_id)?;
 829        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 830        Ok(proto::GetChannelsResponse {
 831            channels: channels
 832                .into_iter()
 833                .map(|chan| proto::Channel {
 834                    id: chan.id.to_proto(),
 835                    name: chan.name,
 836                })
 837                .collect(),
 838        })
 839    }
 840
 841    async fn get_users(
 842        self: Arc<Server>,
 843        request: TypedEnvelope<proto::GetUsers>,
 844    ) -> tide::Result<proto::GetUsersResponse> {
 845        let user_ids = request
 846            .payload
 847            .user_ids
 848            .into_iter()
 849            .map(UserId::from_proto)
 850            .collect();
 851        let users = self
 852            .app_state
 853            .db
 854            .get_users_by_ids(user_ids)
 855            .await?
 856            .into_iter()
 857            .map(|user| proto::User {
 858                id: user.id.to_proto(),
 859                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 860                github_login: user.github_login,
 861            })
 862            .collect();
 863        Ok(proto::GetUsersResponse { users })
 864    }
 865
 866    async fn update_contacts_for_users<'a>(
 867        self: &Arc<Server>,
 868        user_ids: impl IntoIterator<Item = &'a UserId>,
 869    ) {
 870        let state = self.state().await;
 871        for user_id in user_ids {
 872            let contacts = state.contacts_for_user(*user_id);
 873            for connection_id in state.connection_ids_for_user(*user_id) {
 874                self.peer
 875                    .send(
 876                        connection_id,
 877                        proto::UpdateContacts {
 878                            contacts: contacts.clone(),
 879                        },
 880                    )
 881                    .log_err();
 882            }
 883        }
 884    }
 885
 886    fn update_contacts_for_users_sync<'a>(
 887        self: &Arc<Self>,
 888        state: &Store,
 889        user_ids: impl IntoIterator<Item = &'a UserId>,
 890    ) {
 891        for user_id in user_ids {
 892            let contacts = state.contacts_for_user(*user_id);
 893            for connection_id in state.connection_ids_for_user(*user_id) {
 894                self.peer
 895                    .send(
 896                        connection_id,
 897                        proto::UpdateContacts {
 898                            contacts: contacts.clone(),
 899                        },
 900                    )
 901                    .log_err();
 902            }
 903        }
 904    }
 905
 906    async fn join_channel(
 907        mut self: Arc<Self>,
 908        request: TypedEnvelope<proto::JoinChannel>,
 909    ) -> tide::Result<proto::JoinChannelResponse> {
 910        let user_id = self
 911            .state()
 912            .await
 913            .user_id_for_connection(request.sender_id)?;
 914        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 915        if !self
 916            .app_state
 917            .db
 918            .can_user_access_channel(user_id, channel_id)
 919            .await?
 920        {
 921            Err(anyhow!("access denied"))?;
 922        }
 923
 924        self.state_mut()
 925            .await
 926            .join_channel(request.sender_id, channel_id);
 927        let messages = self
 928            .app_state
 929            .db
 930            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 931            .await?
 932            .into_iter()
 933            .map(|msg| proto::ChannelMessage {
 934                id: msg.id.to_proto(),
 935                body: msg.body,
 936                timestamp: msg.sent_at.unix_timestamp() as u64,
 937                sender_id: msg.sender_id.to_proto(),
 938                nonce: Some(msg.nonce.as_u128().into()),
 939            })
 940            .collect::<Vec<_>>();
 941        Ok(proto::JoinChannelResponse {
 942            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 943            messages,
 944        })
 945    }
 946
 947    async fn leave_channel(
 948        mut self: Arc<Self>,
 949        request: TypedEnvelope<proto::LeaveChannel>,
 950    ) -> tide::Result<()> {
 951        let user_id = self
 952            .state()
 953            .await
 954            .user_id_for_connection(request.sender_id)?;
 955        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 956        if !self
 957            .app_state
 958            .db
 959            .can_user_access_channel(user_id, channel_id)
 960            .await?
 961        {
 962            Err(anyhow!("access denied"))?;
 963        }
 964
 965        self.state_mut()
 966            .await
 967            .leave_channel(request.sender_id, channel_id);
 968
 969        Ok(())
 970    }
 971
 972    async fn send_channel_message(
 973        self: Arc<Self>,
 974        request: TypedEnvelope<proto::SendChannelMessage>,
 975    ) -> tide::Result<proto::SendChannelMessageResponse> {
 976        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 977        let user_id;
 978        let connection_ids;
 979        {
 980            let state = self.state().await;
 981            user_id = state.user_id_for_connection(request.sender_id)?;
 982            connection_ids = state.channel_connection_ids(channel_id)?;
 983        }
 984
 985        // Validate the message body.
 986        let body = request.payload.body.trim().to_string();
 987        if body.len() > MAX_MESSAGE_LEN {
 988            return Err(anyhow!("message is too long"))?;
 989        }
 990        if body.is_empty() {
 991            return Err(anyhow!("message can't be blank"))?;
 992        }
 993
 994        let timestamp = OffsetDateTime::now_utc();
 995        let nonce = request
 996            .payload
 997            .nonce
 998            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 999
1000        let message_id = self
1001            .app_state
1002            .db
1003            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1004            .await?
1005            .to_proto();
1006        let message = proto::ChannelMessage {
1007            sender_id: user_id.to_proto(),
1008            id: message_id,
1009            body,
1010            timestamp: timestamp.unix_timestamp() as u64,
1011            nonce: Some(nonce),
1012        };
1013        broadcast(request.sender_id, connection_ids, |conn_id| {
1014            self.peer.send(
1015                conn_id,
1016                proto::ChannelMessageSent {
1017                    channel_id: channel_id.to_proto(),
1018                    message: Some(message.clone()),
1019                },
1020            )
1021        });
1022        Ok(proto::SendChannelMessageResponse {
1023            message: Some(message),
1024        })
1025    }
1026
1027    async fn get_channel_messages(
1028        self: Arc<Self>,
1029        request: TypedEnvelope<proto::GetChannelMessages>,
1030    ) -> tide::Result<proto::GetChannelMessagesResponse> {
1031        let user_id = self
1032            .state()
1033            .await
1034            .user_id_for_connection(request.sender_id)?;
1035        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1036        if !self
1037            .app_state
1038            .db
1039            .can_user_access_channel(user_id, channel_id)
1040            .await?
1041        {
1042            Err(anyhow!("access denied"))?;
1043        }
1044
1045        let messages = self
1046            .app_state
1047            .db
1048            .get_channel_messages(
1049                channel_id,
1050                MESSAGE_COUNT_PER_PAGE,
1051                Some(MessageId::from_proto(request.payload.before_message_id)),
1052            )
1053            .await?
1054            .into_iter()
1055            .map(|msg| proto::ChannelMessage {
1056                id: msg.id.to_proto(),
1057                body: msg.body,
1058                timestamp: msg.sent_at.unix_timestamp() as u64,
1059                sender_id: msg.sender_id.to_proto(),
1060                nonce: Some(msg.nonce.as_u128().into()),
1061            })
1062            .collect::<Vec<_>>();
1063
1064        Ok(proto::GetChannelMessagesResponse {
1065            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1066            messages,
1067        })
1068    }
1069
1070    async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1071        #[cfg(test)]
1072        async_std::task::yield_now().await;
1073        let guard = self.store.read().await;
1074        #[cfg(test)]
1075        async_std::task::yield_now().await;
1076        StoreReadGuard {
1077            guard,
1078            _not_send: PhantomData,
1079        }
1080    }
1081
1082    async fn state_mut<'a>(self: &'a mut Arc<Self>) -> StoreWriteGuard<'a> {
1083        #[cfg(test)]
1084        async_std::task::yield_now().await;
1085        let guard = self.store.write().await;
1086        #[cfg(test)]
1087        async_std::task::yield_now().await;
1088        StoreWriteGuard {
1089            guard,
1090            _not_send: PhantomData,
1091        }
1092    }
1093}
1094
1095impl<'a> Deref for StoreReadGuard<'a> {
1096    type Target = Store;
1097
1098    fn deref(&self) -> &Self::Target {
1099        &*self.guard
1100    }
1101}
1102
1103impl<'a> Deref for StoreWriteGuard<'a> {
1104    type Target = Store;
1105
1106    fn deref(&self) -> &Self::Target {
1107        &*self.guard
1108    }
1109}
1110
1111impl<'a> DerefMut for StoreWriteGuard<'a> {
1112    fn deref_mut(&mut self) -> &mut Self::Target {
1113        &mut *self.guard
1114    }
1115}
1116
1117impl<'a> Drop for StoreWriteGuard<'a> {
1118    fn drop(&mut self) {
1119        #[cfg(test)]
1120        self.check_invariants();
1121    }
1122}
1123
1124impl Executor for RealExecutor {
1125    type Timer = Timer;
1126
1127    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1128        task::spawn(future);
1129    }
1130
1131    fn timer(&self, duration: Duration) -> Self::Timer {
1132        Timer::after(duration)
1133    }
1134}
1135
1136fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1137where
1138    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1139{
1140    for receiver_id in receiver_ids {
1141        if receiver_id != sender_id {
1142            f(receiver_id).log_err();
1143        }
1144    }
1145}
1146
1147pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1148    let server = Server::new(app.state().clone(), rpc.clone(), None);
1149    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1150        let server = server.clone();
1151        async move {
1152            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1153
1154            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1155            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1156            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1157            let client_protocol_version: Option<u32> = request
1158                .header("X-Zed-Protocol-Version")
1159                .and_then(|v| v.as_str().parse().ok());
1160
1161            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1162                return Ok(Response::new(StatusCode::UpgradeRequired));
1163            }
1164
1165            let header = match request.header("Sec-Websocket-Key") {
1166                Some(h) => h.as_str(),
1167                None => return Err(anyhow!("expected sec-websocket-key"))?,
1168            };
1169
1170            let user_id = process_auth_header(&request).await?;
1171
1172            let mut response = Response::new(StatusCode::SwitchingProtocols);
1173            response.insert_header(UPGRADE, "websocket");
1174            response.insert_header(CONNECTION, "Upgrade");
1175            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1176            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1177            response.insert_header("Sec-Websocket-Version", "13");
1178
1179            let http_res: &mut tide::http::Response = response.as_mut();
1180            let upgrade_receiver = http_res.recv_upgrade().await;
1181            let addr = request.remote().unwrap_or("unknown").to_string();
1182            task::spawn(async move {
1183                if let Some(stream) = upgrade_receiver.await {
1184                    server
1185                        .handle_connection(
1186                            Connection::new(
1187                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1188                            ),
1189                            addr,
1190                            user_id,
1191                            None,
1192                            RealExecutor,
1193                        )
1194                        .await;
1195                }
1196            });
1197
1198            Ok(response)
1199        }
1200    });
1201}
1202
1203fn header_contains_ignore_case<T>(
1204    request: &tide::Request<T>,
1205    header_name: HeaderName,
1206    value: &str,
1207) -> bool {
1208    request
1209        .header(header_name)
1210        .map(|h| {
1211            h.as_str()
1212                .split(',')
1213                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1214        })
1215        .unwrap_or(false)
1216}
1217
1218#[cfg(test)]
1219mod tests {
1220    use super::*;
1221    use crate::{
1222        auth,
1223        db::{tests::TestDb, UserId},
1224        github, AppState, Config,
1225    };
1226    use ::rpc::Peer;
1227    use client::{
1228        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1229        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1230    };
1231    use collections::BTreeMap;
1232    use editor::{
1233        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1234        ToOffset, ToggleCodeActions, Undo,
1235    };
1236    use gpui::{
1237        executor::{self, Deterministic},
1238        geometry::vector::vec2f,
1239        ModelHandle, TestAppContext, ViewHandle,
1240    };
1241    use language::{
1242        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1243        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1244    };
1245    use lsp::{self, FakeLanguageServer};
1246    use parking_lot::Mutex;
1247    use project::{
1248        fs::{FakeFs, Fs as _},
1249        search::SearchQuery,
1250        worktree::WorktreeHandle,
1251        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1252    };
1253    use rand::prelude::*;
1254    use rpc::PeerId;
1255    use serde_json::json;
1256    use settings::Settings;
1257    use sqlx::types::time::OffsetDateTime;
1258    use std::{
1259        env,
1260        ops::Deref,
1261        path::{Path, PathBuf},
1262        rc::Rc,
1263        sync::{
1264            atomic::{AtomicBool, Ordering::SeqCst},
1265            Arc,
1266        },
1267        time::Duration,
1268    };
1269    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1270
1271    #[cfg(test)]
1272    #[ctor::ctor]
1273    fn init_logger() {
1274        if std::env::var("RUST_LOG").is_ok() {
1275            env_logger::init();
1276        }
1277    }
1278
1279    #[gpui::test(iterations = 10)]
1280    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1281        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1282        let lang_registry = Arc::new(LanguageRegistry::test());
1283        let fs = FakeFs::new(cx_a.background());
1284        cx_a.foreground().forbid_parking();
1285
1286        // Connect to a server as 2 clients.
1287        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1288        let client_a = server.create_client(cx_a, "user_a").await;
1289        let client_b = server.create_client(cx_b, "user_b").await;
1290
1291        // Share a project as client A
1292        fs.insert_tree(
1293            "/a",
1294            json!({
1295                ".zed.toml": r#"collaborators = ["user_b"]"#,
1296                "a.txt": "a-contents",
1297                "b.txt": "b-contents",
1298            }),
1299        )
1300        .await;
1301        let project_a = cx_a.update(|cx| {
1302            Project::local(
1303                client_a.clone(),
1304                client_a.user_store.clone(),
1305                lang_registry.clone(),
1306                fs.clone(),
1307                cx,
1308            )
1309        });
1310        let (worktree_a, _) = project_a
1311            .update(cx_a, |p, cx| {
1312                p.find_or_create_local_worktree("/a", true, cx)
1313            })
1314            .await
1315            .unwrap();
1316        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1317        worktree_a
1318            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1319            .await;
1320        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1321        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1322
1323        // Join that project as client B
1324        let project_b = Project::remote(
1325            project_id,
1326            client_b.clone(),
1327            client_b.user_store.clone(),
1328            lang_registry.clone(),
1329            fs.clone(),
1330            &mut cx_b.to_async(),
1331        )
1332        .await
1333        .unwrap();
1334
1335        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1336            assert_eq!(
1337                project
1338                    .collaborators()
1339                    .get(&client_a.peer_id)
1340                    .unwrap()
1341                    .user
1342                    .github_login,
1343                "user_a"
1344            );
1345            project.replica_id()
1346        });
1347        project_a
1348            .condition(&cx_a, |tree, _| {
1349                tree.collaborators()
1350                    .get(&client_b.peer_id)
1351                    .map_or(false, |collaborator| {
1352                        collaborator.replica_id == replica_id_b
1353                            && collaborator.user.github_login == "user_b"
1354                    })
1355            })
1356            .await;
1357
1358        // Open the same file as client B and client A.
1359        let buffer_b = project_b
1360            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1361            .await
1362            .unwrap();
1363        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1364        project_a.read_with(cx_a, |project, cx| {
1365            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1366        });
1367        let buffer_a = project_a
1368            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1369            .await
1370            .unwrap();
1371
1372        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1373
1374        // TODO
1375        // // Create a selection set as client B and see that selection set as client A.
1376        // buffer_a
1377        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1378        //     .await;
1379
1380        // Edit the buffer as client B and see that edit as client A.
1381        editor_b.update(cx_b, |editor, cx| {
1382            editor.handle_input(&Input("ok, ".into()), cx)
1383        });
1384        buffer_a
1385            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1386            .await;
1387
1388        // TODO
1389        // // Remove the selection set as client B, see those selections disappear as client A.
1390        cx_b.update(move |_| drop(editor_b));
1391        // buffer_a
1392        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1393        //     .await;
1394
1395        // Dropping the client B's project removes client B from client A's collaborators.
1396        cx_b.update(move |_| drop(project_b));
1397        project_a
1398            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1399            .await;
1400    }
1401
1402    #[gpui::test(iterations = 10)]
1403    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1404        let lang_registry = Arc::new(LanguageRegistry::test());
1405        let fs = FakeFs::new(cx_a.background());
1406        cx_a.foreground().forbid_parking();
1407
1408        // Connect to a server as 2 clients.
1409        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1410        let client_a = server.create_client(cx_a, "user_a").await;
1411        let client_b = server.create_client(cx_b, "user_b").await;
1412
1413        // Share a project as client A
1414        fs.insert_tree(
1415            "/a",
1416            json!({
1417                ".zed.toml": r#"collaborators = ["user_b"]"#,
1418                "a.txt": "a-contents",
1419                "b.txt": "b-contents",
1420            }),
1421        )
1422        .await;
1423        let project_a = cx_a.update(|cx| {
1424            Project::local(
1425                client_a.clone(),
1426                client_a.user_store.clone(),
1427                lang_registry.clone(),
1428                fs.clone(),
1429                cx,
1430            )
1431        });
1432        let (worktree_a, _) = project_a
1433            .update(cx_a, |p, cx| {
1434                p.find_or_create_local_worktree("/a", true, cx)
1435            })
1436            .await
1437            .unwrap();
1438        worktree_a
1439            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1440            .await;
1441        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1442        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1443        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1444        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1445
1446        // Join that project as client B
1447        let project_b = Project::remote(
1448            project_id,
1449            client_b.clone(),
1450            client_b.user_store.clone(),
1451            lang_registry.clone(),
1452            fs.clone(),
1453            &mut cx_b.to_async(),
1454        )
1455        .await
1456        .unwrap();
1457        project_b
1458            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1459            .await
1460            .unwrap();
1461
1462        // Unshare the project as client A
1463        project_a.update(cx_a, |project, cx| project.unshare(cx));
1464        project_b
1465            .condition(cx_b, |project, _| project.is_read_only())
1466            .await;
1467        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1468        cx_b.update(|_| {
1469            drop(project_b);
1470        });
1471
1472        // Share the project again and ensure guests can still join.
1473        project_a
1474            .update(cx_a, |project, cx| project.share(cx))
1475            .await
1476            .unwrap();
1477        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1478
1479        let project_b2 = Project::remote(
1480            project_id,
1481            client_b.clone(),
1482            client_b.user_store.clone(),
1483            lang_registry.clone(),
1484            fs.clone(),
1485            &mut cx_b.to_async(),
1486        )
1487        .await
1488        .unwrap();
1489        project_b2
1490            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1491            .await
1492            .unwrap();
1493    }
1494
1495    #[gpui::test(iterations = 10)]
1496    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1497        let lang_registry = Arc::new(LanguageRegistry::test());
1498        let fs = FakeFs::new(cx_a.background());
1499        cx_a.foreground().forbid_parking();
1500
1501        // Connect to a server as 2 clients.
1502        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1503        let client_a = server.create_client(cx_a, "user_a").await;
1504        let client_b = server.create_client(cx_b, "user_b").await;
1505
1506        // Share a project as client A
1507        fs.insert_tree(
1508            "/a",
1509            json!({
1510                ".zed.toml": r#"collaborators = ["user_b"]"#,
1511                "a.txt": "a-contents",
1512                "b.txt": "b-contents",
1513            }),
1514        )
1515        .await;
1516        let project_a = cx_a.update(|cx| {
1517            Project::local(
1518                client_a.clone(),
1519                client_a.user_store.clone(),
1520                lang_registry.clone(),
1521                fs.clone(),
1522                cx,
1523            )
1524        });
1525        let (worktree_a, _) = project_a
1526            .update(cx_a, |p, cx| {
1527                p.find_or_create_local_worktree("/a", true, cx)
1528            })
1529            .await
1530            .unwrap();
1531        worktree_a
1532            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1533            .await;
1534        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1535        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1536        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1537        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1538
1539        // Join that project as client B
1540        let project_b = Project::remote(
1541            project_id,
1542            client_b.clone(),
1543            client_b.user_store.clone(),
1544            lang_registry.clone(),
1545            fs.clone(),
1546            &mut cx_b.to_async(),
1547        )
1548        .await
1549        .unwrap();
1550        project_b
1551            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1552            .await
1553            .unwrap();
1554
1555        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1556        server.disconnect_client(client_a.current_user_id(cx_a));
1557        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1558        project_a
1559            .condition(cx_a, |project, _| project.collaborators().is_empty())
1560            .await;
1561        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1562        project_b
1563            .condition(cx_b, |project, _| project.is_read_only())
1564            .await;
1565        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1566        cx_b.update(|_| {
1567            drop(project_b);
1568        });
1569
1570        // Await reconnection
1571        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1572
1573        // Share the project again and ensure guests can still join.
1574        project_a
1575            .update(cx_a, |project, cx| project.share(cx))
1576            .await
1577            .unwrap();
1578        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1579
1580        let project_b2 = Project::remote(
1581            project_id,
1582            client_b.clone(),
1583            client_b.user_store.clone(),
1584            lang_registry.clone(),
1585            fs.clone(),
1586            &mut cx_b.to_async(),
1587        )
1588        .await
1589        .unwrap();
1590        project_b2
1591            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1592            .await
1593            .unwrap();
1594    }
1595
1596    #[gpui::test(iterations = 10)]
1597    async fn test_propagate_saves_and_fs_changes(
1598        cx_a: &mut TestAppContext,
1599        cx_b: &mut TestAppContext,
1600        cx_c: &mut TestAppContext,
1601    ) {
1602        let lang_registry = Arc::new(LanguageRegistry::test());
1603        let fs = FakeFs::new(cx_a.background());
1604        cx_a.foreground().forbid_parking();
1605
1606        // Connect to a server as 3 clients.
1607        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1608        let client_a = server.create_client(cx_a, "user_a").await;
1609        let client_b = server.create_client(cx_b, "user_b").await;
1610        let client_c = server.create_client(cx_c, "user_c").await;
1611
1612        // Share a worktree as client A.
1613        fs.insert_tree(
1614            "/a",
1615            json!({
1616                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1617                "file1": "",
1618                "file2": ""
1619            }),
1620        )
1621        .await;
1622        let project_a = cx_a.update(|cx| {
1623            Project::local(
1624                client_a.clone(),
1625                client_a.user_store.clone(),
1626                lang_registry.clone(),
1627                fs.clone(),
1628                cx,
1629            )
1630        });
1631        let (worktree_a, _) = project_a
1632            .update(cx_a, |p, cx| {
1633                p.find_or_create_local_worktree("/a", true, cx)
1634            })
1635            .await
1636            .unwrap();
1637        worktree_a
1638            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1639            .await;
1640        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1641        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1642        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1643
1644        // Join that worktree as clients B and C.
1645        let project_b = Project::remote(
1646            project_id,
1647            client_b.clone(),
1648            client_b.user_store.clone(),
1649            lang_registry.clone(),
1650            fs.clone(),
1651            &mut cx_b.to_async(),
1652        )
1653        .await
1654        .unwrap();
1655        let project_c = Project::remote(
1656            project_id,
1657            client_c.clone(),
1658            client_c.user_store.clone(),
1659            lang_registry.clone(),
1660            fs.clone(),
1661            &mut cx_c.to_async(),
1662        )
1663        .await
1664        .unwrap();
1665        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1666        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1667
1668        // Open and edit a buffer as both guests B and C.
1669        let buffer_b = project_b
1670            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1671            .await
1672            .unwrap();
1673        let buffer_c = project_c
1674            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1675            .await
1676            .unwrap();
1677        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1678        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1679
1680        // Open and edit that buffer as the host.
1681        let buffer_a = project_a
1682            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1683            .await
1684            .unwrap();
1685
1686        buffer_a
1687            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1688            .await;
1689        buffer_a.update(cx_a, |buf, cx| {
1690            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1691        });
1692
1693        // Wait for edits to propagate
1694        buffer_a
1695            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1696            .await;
1697        buffer_b
1698            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1699            .await;
1700        buffer_c
1701            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1702            .await;
1703
1704        // Edit the buffer as the host and concurrently save as guest B.
1705        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1706        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1707        save_b.await.unwrap();
1708        assert_eq!(
1709            fs.load("/a/file1".as_ref()).await.unwrap(),
1710            "hi-a, i-am-c, i-am-b, i-am-a"
1711        );
1712        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1713        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1714        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1715
1716        worktree_a.flush_fs_events(cx_a).await;
1717
1718        // Make changes on host's file system, see those changes on guest worktrees.
1719        fs.rename(
1720            "/a/file1".as_ref(),
1721            "/a/file1-renamed".as_ref(),
1722            Default::default(),
1723        )
1724        .await
1725        .unwrap();
1726
1727        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1728            .await
1729            .unwrap();
1730        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1731
1732        worktree_a
1733            .condition(&cx_a, |tree, _| {
1734                tree.paths()
1735                    .map(|p| p.to_string_lossy())
1736                    .collect::<Vec<_>>()
1737                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1738            })
1739            .await;
1740        worktree_b
1741            .condition(&cx_b, |tree, _| {
1742                tree.paths()
1743                    .map(|p| p.to_string_lossy())
1744                    .collect::<Vec<_>>()
1745                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1746            })
1747            .await;
1748        worktree_c
1749            .condition(&cx_c, |tree, _| {
1750                tree.paths()
1751                    .map(|p| p.to_string_lossy())
1752                    .collect::<Vec<_>>()
1753                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1754            })
1755            .await;
1756
1757        // Ensure buffer files are updated as well.
1758        buffer_a
1759            .condition(&cx_a, |buf, _| {
1760                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1761            })
1762            .await;
1763        buffer_b
1764            .condition(&cx_b, |buf, _| {
1765                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1766            })
1767            .await;
1768        buffer_c
1769            .condition(&cx_c, |buf, _| {
1770                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1771            })
1772            .await;
1773    }
1774
1775    #[gpui::test(iterations = 10)]
1776    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1777        cx_a.foreground().forbid_parking();
1778        let lang_registry = Arc::new(LanguageRegistry::test());
1779        let fs = FakeFs::new(cx_a.background());
1780
1781        // Connect to a server as 2 clients.
1782        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1783        let client_a = server.create_client(cx_a, "user_a").await;
1784        let client_b = server.create_client(cx_b, "user_b").await;
1785
1786        // Share a project as client A
1787        fs.insert_tree(
1788            "/dir",
1789            json!({
1790                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1791                "a.txt": "a-contents",
1792            }),
1793        )
1794        .await;
1795
1796        let project_a = cx_a.update(|cx| {
1797            Project::local(
1798                client_a.clone(),
1799                client_a.user_store.clone(),
1800                lang_registry.clone(),
1801                fs.clone(),
1802                cx,
1803            )
1804        });
1805        let (worktree_a, _) = project_a
1806            .update(cx_a, |p, cx| {
1807                p.find_or_create_local_worktree("/dir", true, cx)
1808            })
1809            .await
1810            .unwrap();
1811        worktree_a
1812            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1813            .await;
1814        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1815        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1816        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1817
1818        // Join that project as client B
1819        let project_b = Project::remote(
1820            project_id,
1821            client_b.clone(),
1822            client_b.user_store.clone(),
1823            lang_registry.clone(),
1824            fs.clone(),
1825            &mut cx_b.to_async(),
1826        )
1827        .await
1828        .unwrap();
1829
1830        // Open a buffer as client B
1831        let buffer_b = project_b
1832            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1833            .await
1834            .unwrap();
1835
1836        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1837        buffer_b.read_with(cx_b, |buf, _| {
1838            assert!(buf.is_dirty());
1839            assert!(!buf.has_conflict());
1840        });
1841
1842        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1843        buffer_b
1844            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1845            .await;
1846        buffer_b.read_with(cx_b, |buf, _| {
1847            assert!(!buf.has_conflict());
1848        });
1849
1850        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1851        buffer_b.read_with(cx_b, |buf, _| {
1852            assert!(buf.is_dirty());
1853            assert!(!buf.has_conflict());
1854        });
1855    }
1856
1857    #[gpui::test(iterations = 10)]
1858    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1859        cx_a.foreground().forbid_parking();
1860        let lang_registry = Arc::new(LanguageRegistry::test());
1861        let fs = FakeFs::new(cx_a.background());
1862
1863        // Connect to a server as 2 clients.
1864        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1865        let client_a = server.create_client(cx_a, "user_a").await;
1866        let client_b = server.create_client(cx_b, "user_b").await;
1867
1868        // Share a project as client A
1869        fs.insert_tree(
1870            "/dir",
1871            json!({
1872                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1873                "a.txt": "a-contents",
1874            }),
1875        )
1876        .await;
1877
1878        let project_a = cx_a.update(|cx| {
1879            Project::local(
1880                client_a.clone(),
1881                client_a.user_store.clone(),
1882                lang_registry.clone(),
1883                fs.clone(),
1884                cx,
1885            )
1886        });
1887        let (worktree_a, _) = project_a
1888            .update(cx_a, |p, cx| {
1889                p.find_or_create_local_worktree("/dir", true, cx)
1890            })
1891            .await
1892            .unwrap();
1893        worktree_a
1894            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1895            .await;
1896        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1897        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1898        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1899
1900        // Join that project as client B
1901        let project_b = Project::remote(
1902            project_id,
1903            client_b.clone(),
1904            client_b.user_store.clone(),
1905            lang_registry.clone(),
1906            fs.clone(),
1907            &mut cx_b.to_async(),
1908        )
1909        .await
1910        .unwrap();
1911        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1912
1913        // Open a buffer as client B
1914        let buffer_b = project_b
1915            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1916            .await
1917            .unwrap();
1918        buffer_b.read_with(cx_b, |buf, _| {
1919            assert!(!buf.is_dirty());
1920            assert!(!buf.has_conflict());
1921        });
1922
1923        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1924            .await
1925            .unwrap();
1926        buffer_b
1927            .condition(&cx_b, |buf, _| {
1928                buf.text() == "new contents" && !buf.is_dirty()
1929            })
1930            .await;
1931        buffer_b.read_with(cx_b, |buf, _| {
1932            assert!(!buf.has_conflict());
1933        });
1934    }
1935
1936    #[gpui::test(iterations = 10)]
1937    async fn test_editing_while_guest_opens_buffer(
1938        cx_a: &mut TestAppContext,
1939        cx_b: &mut TestAppContext,
1940    ) {
1941        cx_a.foreground().forbid_parking();
1942        let lang_registry = Arc::new(LanguageRegistry::test());
1943        let fs = FakeFs::new(cx_a.background());
1944
1945        // Connect to a server as 2 clients.
1946        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1947        let client_a = server.create_client(cx_a, "user_a").await;
1948        let client_b = server.create_client(cx_b, "user_b").await;
1949
1950        // Share a project as client A
1951        fs.insert_tree(
1952            "/dir",
1953            json!({
1954                ".zed.toml": r#"collaborators = ["user_b"]"#,
1955                "a.txt": "a-contents",
1956            }),
1957        )
1958        .await;
1959        let project_a = cx_a.update(|cx| {
1960            Project::local(
1961                client_a.clone(),
1962                client_a.user_store.clone(),
1963                lang_registry.clone(),
1964                fs.clone(),
1965                cx,
1966            )
1967        });
1968        let (worktree_a, _) = project_a
1969            .update(cx_a, |p, cx| {
1970                p.find_or_create_local_worktree("/dir", true, cx)
1971            })
1972            .await
1973            .unwrap();
1974        worktree_a
1975            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1976            .await;
1977        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1978        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1979        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1980
1981        // Join that project as client B
1982        let project_b = Project::remote(
1983            project_id,
1984            client_b.clone(),
1985            client_b.user_store.clone(),
1986            lang_registry.clone(),
1987            fs.clone(),
1988            &mut cx_b.to_async(),
1989        )
1990        .await
1991        .unwrap();
1992
1993        // Open a buffer as client A
1994        let buffer_a = project_a
1995            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1996            .await
1997            .unwrap();
1998
1999        // Start opening the same buffer as client B
2000        let buffer_b = cx_b
2001            .background()
2002            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2003
2004        // Edit the buffer as client A while client B is still opening it.
2005        cx_b.background().simulate_random_delay().await;
2006        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
2007        cx_b.background().simulate_random_delay().await;
2008        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
2009
2010        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2011        let buffer_b = buffer_b.await.unwrap();
2012        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2013    }
2014
2015    #[gpui::test(iterations = 10)]
2016    async fn test_leaving_worktree_while_opening_buffer(
2017        cx_a: &mut TestAppContext,
2018        cx_b: &mut TestAppContext,
2019    ) {
2020        cx_a.foreground().forbid_parking();
2021        let lang_registry = Arc::new(LanguageRegistry::test());
2022        let fs = FakeFs::new(cx_a.background());
2023
2024        // Connect to a server as 2 clients.
2025        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2026        let client_a = server.create_client(cx_a, "user_a").await;
2027        let client_b = server.create_client(cx_b, "user_b").await;
2028
2029        // Share a project as client A
2030        fs.insert_tree(
2031            "/dir",
2032            json!({
2033                ".zed.toml": r#"collaborators = ["user_b"]"#,
2034                "a.txt": "a-contents",
2035            }),
2036        )
2037        .await;
2038        let project_a = cx_a.update(|cx| {
2039            Project::local(
2040                client_a.clone(),
2041                client_a.user_store.clone(),
2042                lang_registry.clone(),
2043                fs.clone(),
2044                cx,
2045            )
2046        });
2047        let (worktree_a, _) = project_a
2048            .update(cx_a, |p, cx| {
2049                p.find_or_create_local_worktree("/dir", true, cx)
2050            })
2051            .await
2052            .unwrap();
2053        worktree_a
2054            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2055            .await;
2056        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2057        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2058        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2059
2060        // Join that project as client B
2061        let project_b = Project::remote(
2062            project_id,
2063            client_b.clone(),
2064            client_b.user_store.clone(),
2065            lang_registry.clone(),
2066            fs.clone(),
2067            &mut cx_b.to_async(),
2068        )
2069        .await
2070        .unwrap();
2071
2072        // See that a guest has joined as client A.
2073        project_a
2074            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2075            .await;
2076
2077        // Begin opening a buffer as client B, but leave the project before the open completes.
2078        let buffer_b = cx_b
2079            .background()
2080            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2081        cx_b.update(|_| drop(project_b));
2082        drop(buffer_b);
2083
2084        // See that the guest has left.
2085        project_a
2086            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2087            .await;
2088    }
2089
2090    #[gpui::test(iterations = 10)]
2091    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2092        cx_a.foreground().forbid_parking();
2093        let lang_registry = Arc::new(LanguageRegistry::test());
2094        let fs = FakeFs::new(cx_a.background());
2095
2096        // Connect to a server as 2 clients.
2097        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2098        let client_a = server.create_client(cx_a, "user_a").await;
2099        let client_b = server.create_client(cx_b, "user_b").await;
2100
2101        // Share a project as client A
2102        fs.insert_tree(
2103            "/a",
2104            json!({
2105                ".zed.toml": r#"collaborators = ["user_b"]"#,
2106                "a.txt": "a-contents",
2107                "b.txt": "b-contents",
2108            }),
2109        )
2110        .await;
2111        let project_a = cx_a.update(|cx| {
2112            Project::local(
2113                client_a.clone(),
2114                client_a.user_store.clone(),
2115                lang_registry.clone(),
2116                fs.clone(),
2117                cx,
2118            )
2119        });
2120        let (worktree_a, _) = project_a
2121            .update(cx_a, |p, cx| {
2122                p.find_or_create_local_worktree("/a", true, cx)
2123            })
2124            .await
2125            .unwrap();
2126        worktree_a
2127            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2128            .await;
2129        let project_id = project_a
2130            .update(cx_a, |project, _| project.next_remote_id())
2131            .await;
2132        project_a
2133            .update(cx_a, |project, cx| project.share(cx))
2134            .await
2135            .unwrap();
2136
2137        // Join that project as client B
2138        let _project_b = Project::remote(
2139            project_id,
2140            client_b.clone(),
2141            client_b.user_store.clone(),
2142            lang_registry.clone(),
2143            fs.clone(),
2144            &mut cx_b.to_async(),
2145        )
2146        .await
2147        .unwrap();
2148
2149        // Client A sees that a guest has joined.
2150        project_a
2151            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2152            .await;
2153
2154        // Drop client B's connection and ensure client A observes client B leaving the project.
2155        client_b.disconnect(&cx_b.to_async()).unwrap();
2156        project_a
2157            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2158            .await;
2159
2160        // Rejoin the project as client B
2161        let _project_b = Project::remote(
2162            project_id,
2163            client_b.clone(),
2164            client_b.user_store.clone(),
2165            lang_registry.clone(),
2166            fs.clone(),
2167            &mut cx_b.to_async(),
2168        )
2169        .await
2170        .unwrap();
2171
2172        // Client A sees that a guest has re-joined.
2173        project_a
2174            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2175            .await;
2176
2177        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2178        client_b.wait_for_current_user(cx_b).await;
2179        server.disconnect_client(client_b.current_user_id(cx_b));
2180        cx_a.foreground().advance_clock(Duration::from_secs(3));
2181        project_a
2182            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2183            .await;
2184    }
2185
2186    #[gpui::test(iterations = 10)]
2187    async fn test_collaborating_with_diagnostics(
2188        cx_a: &mut TestAppContext,
2189        cx_b: &mut TestAppContext,
2190    ) {
2191        cx_a.foreground().forbid_parking();
2192        let lang_registry = Arc::new(LanguageRegistry::test());
2193        let fs = FakeFs::new(cx_a.background());
2194
2195        // Set up a fake language server.
2196        let mut language = Language::new(
2197            LanguageConfig {
2198                name: "Rust".into(),
2199                path_suffixes: vec!["rs".to_string()],
2200                ..Default::default()
2201            },
2202            Some(tree_sitter_rust::language()),
2203        );
2204        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2205        lang_registry.add(Arc::new(language));
2206
2207        // Connect to a server as 2 clients.
2208        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2209        let client_a = server.create_client(cx_a, "user_a").await;
2210        let client_b = server.create_client(cx_b, "user_b").await;
2211
2212        // Share a project as client A
2213        fs.insert_tree(
2214            "/a",
2215            json!({
2216                ".zed.toml": r#"collaborators = ["user_b"]"#,
2217                "a.rs": "let one = two",
2218                "other.rs": "",
2219            }),
2220        )
2221        .await;
2222        let project_a = cx_a.update(|cx| {
2223            Project::local(
2224                client_a.clone(),
2225                client_a.user_store.clone(),
2226                lang_registry.clone(),
2227                fs.clone(),
2228                cx,
2229            )
2230        });
2231        let (worktree_a, _) = project_a
2232            .update(cx_a, |p, cx| {
2233                p.find_or_create_local_worktree("/a", true, cx)
2234            })
2235            .await
2236            .unwrap();
2237        worktree_a
2238            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2239            .await;
2240        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2241        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2242        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2243
2244        // Cause the language server to start.
2245        let _ = cx_a
2246            .background()
2247            .spawn(project_a.update(cx_a, |project, cx| {
2248                project.open_buffer(
2249                    ProjectPath {
2250                        worktree_id,
2251                        path: Path::new("other.rs").into(),
2252                    },
2253                    cx,
2254                )
2255            }))
2256            .await
2257            .unwrap();
2258
2259        // Simulate a language server reporting errors for a file.
2260        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2261        fake_language_server
2262            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2263            .await;
2264        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2265            lsp::PublishDiagnosticsParams {
2266                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2267                version: None,
2268                diagnostics: vec![lsp::Diagnostic {
2269                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2270                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2271                    message: "message 1".to_string(),
2272                    ..Default::default()
2273                }],
2274            },
2275        );
2276
2277        // Wait for server to see the diagnostics update.
2278        server
2279            .condition(|store| {
2280                let worktree = store
2281                    .project(project_id)
2282                    .unwrap()
2283                    .share
2284                    .as_ref()
2285                    .unwrap()
2286                    .worktrees
2287                    .get(&worktree_id.to_proto())
2288                    .unwrap();
2289
2290                !worktree.diagnostic_summaries.is_empty()
2291            })
2292            .await;
2293
2294        // Join the worktree as client B.
2295        let project_b = Project::remote(
2296            project_id,
2297            client_b.clone(),
2298            client_b.user_store.clone(),
2299            lang_registry.clone(),
2300            fs.clone(),
2301            &mut cx_b.to_async(),
2302        )
2303        .await
2304        .unwrap();
2305
2306        project_b.read_with(cx_b, |project, cx| {
2307            assert_eq!(
2308                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2309                &[(
2310                    ProjectPath {
2311                        worktree_id,
2312                        path: Arc::from(Path::new("a.rs")),
2313                    },
2314                    DiagnosticSummary {
2315                        error_count: 1,
2316                        warning_count: 0,
2317                        ..Default::default()
2318                    },
2319                )]
2320            )
2321        });
2322
2323        // Simulate a language server reporting more errors for a file.
2324        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2325            lsp::PublishDiagnosticsParams {
2326                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2327                version: None,
2328                diagnostics: vec![
2329                    lsp::Diagnostic {
2330                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2331                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2332                        message: "message 1".to_string(),
2333                        ..Default::default()
2334                    },
2335                    lsp::Diagnostic {
2336                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2337                        range: lsp::Range::new(
2338                            lsp::Position::new(0, 10),
2339                            lsp::Position::new(0, 13),
2340                        ),
2341                        message: "message 2".to_string(),
2342                        ..Default::default()
2343                    },
2344                ],
2345            },
2346        );
2347
2348        // Client b gets the updated summaries
2349        project_b
2350            .condition(&cx_b, |project, cx| {
2351                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2352                    == &[(
2353                        ProjectPath {
2354                            worktree_id,
2355                            path: Arc::from(Path::new("a.rs")),
2356                        },
2357                        DiagnosticSummary {
2358                            error_count: 1,
2359                            warning_count: 1,
2360                            ..Default::default()
2361                        },
2362                    )]
2363            })
2364            .await;
2365
2366        // Open the file with the errors on client B. They should be present.
2367        let buffer_b = cx_b
2368            .background()
2369            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2370            .await
2371            .unwrap();
2372
2373        buffer_b.read_with(cx_b, |buffer, _| {
2374            assert_eq!(
2375                buffer
2376                    .snapshot()
2377                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2378                    .map(|entry| entry)
2379                    .collect::<Vec<_>>(),
2380                &[
2381                    DiagnosticEntry {
2382                        range: Point::new(0, 4)..Point::new(0, 7),
2383                        diagnostic: Diagnostic {
2384                            group_id: 0,
2385                            message: "message 1".to_string(),
2386                            severity: lsp::DiagnosticSeverity::ERROR,
2387                            is_primary: true,
2388                            ..Default::default()
2389                        }
2390                    },
2391                    DiagnosticEntry {
2392                        range: Point::new(0, 10)..Point::new(0, 13),
2393                        diagnostic: Diagnostic {
2394                            group_id: 1,
2395                            severity: lsp::DiagnosticSeverity::WARNING,
2396                            message: "message 2".to_string(),
2397                            is_primary: true,
2398                            ..Default::default()
2399                        }
2400                    }
2401                ]
2402            );
2403        });
2404    }
2405
2406    #[gpui::test(iterations = 10)]
2407    async fn test_collaborating_with_completion(
2408        cx_a: &mut TestAppContext,
2409        cx_b: &mut TestAppContext,
2410    ) {
2411        cx_a.foreground().forbid_parking();
2412        let lang_registry = Arc::new(LanguageRegistry::test());
2413        let fs = FakeFs::new(cx_a.background());
2414
2415        // Set up a fake language server.
2416        let mut language = Language::new(
2417            LanguageConfig {
2418                name: "Rust".into(),
2419                path_suffixes: vec!["rs".to_string()],
2420                ..Default::default()
2421            },
2422            Some(tree_sitter_rust::language()),
2423        );
2424        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2425            capabilities: lsp::ServerCapabilities {
2426                completion_provider: Some(lsp::CompletionOptions {
2427                    trigger_characters: Some(vec![".".to_string()]),
2428                    ..Default::default()
2429                }),
2430                ..Default::default()
2431            },
2432            ..Default::default()
2433        });
2434        lang_registry.add(Arc::new(language));
2435
2436        // Connect to a server as 2 clients.
2437        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2438        let client_a = server.create_client(cx_a, "user_a").await;
2439        let client_b = server.create_client(cx_b, "user_b").await;
2440
2441        // Share a project as client A
2442        fs.insert_tree(
2443            "/a",
2444            json!({
2445                ".zed.toml": r#"collaborators = ["user_b"]"#,
2446                "main.rs": "fn main() { a }",
2447                "other.rs": "",
2448            }),
2449        )
2450        .await;
2451        let project_a = cx_a.update(|cx| {
2452            Project::local(
2453                client_a.clone(),
2454                client_a.user_store.clone(),
2455                lang_registry.clone(),
2456                fs.clone(),
2457                cx,
2458            )
2459        });
2460        let (worktree_a, _) = project_a
2461            .update(cx_a, |p, cx| {
2462                p.find_or_create_local_worktree("/a", true, cx)
2463            })
2464            .await
2465            .unwrap();
2466        worktree_a
2467            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2468            .await;
2469        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2470        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2471        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2472
2473        // Join the worktree as client B.
2474        let project_b = Project::remote(
2475            project_id,
2476            client_b.clone(),
2477            client_b.user_store.clone(),
2478            lang_registry.clone(),
2479            fs.clone(),
2480            &mut cx_b.to_async(),
2481        )
2482        .await
2483        .unwrap();
2484
2485        // Open a file in an editor as the guest.
2486        let buffer_b = project_b
2487            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2488            .await
2489            .unwrap();
2490        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2491        let editor_b = cx_b.add_view(window_b, |cx| {
2492            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2493        });
2494
2495        let fake_language_server = fake_language_servers.next().await.unwrap();
2496        buffer_b
2497            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2498            .await;
2499
2500        // Type a completion trigger character as the guest.
2501        editor_b.update(cx_b, |editor, cx| {
2502            editor.select_ranges([13..13], None, cx);
2503            editor.handle_input(&Input(".".into()), cx);
2504            cx.focus(&editor_b);
2505        });
2506
2507        // Receive a completion request as the host's language server.
2508        // Return some completions from the host's language server.
2509        cx_a.foreground().start_waiting();
2510        fake_language_server
2511            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2512                assert_eq!(
2513                    params.text_document_position.text_document.uri,
2514                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2515                );
2516                assert_eq!(
2517                    params.text_document_position.position,
2518                    lsp::Position::new(0, 14),
2519                );
2520
2521                Ok(Some(lsp::CompletionResponse::Array(vec![
2522                    lsp::CompletionItem {
2523                        label: "first_method(…)".into(),
2524                        detail: Some("fn(&mut self, B) -> C".into()),
2525                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2526                            new_text: "first_method($1)".to_string(),
2527                            range: lsp::Range::new(
2528                                lsp::Position::new(0, 14),
2529                                lsp::Position::new(0, 14),
2530                            ),
2531                        })),
2532                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2533                        ..Default::default()
2534                    },
2535                    lsp::CompletionItem {
2536                        label: "second_method(…)".into(),
2537                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2538                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2539                            new_text: "second_method()".to_string(),
2540                            range: lsp::Range::new(
2541                                lsp::Position::new(0, 14),
2542                                lsp::Position::new(0, 14),
2543                            ),
2544                        })),
2545                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2546                        ..Default::default()
2547                    },
2548                ])))
2549            })
2550            .next()
2551            .await
2552            .unwrap();
2553        cx_a.foreground().finish_waiting();
2554
2555        // Open the buffer on the host.
2556        let buffer_a = project_a
2557            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2558            .await
2559            .unwrap();
2560        buffer_a
2561            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2562            .await;
2563
2564        // Confirm a completion on the guest.
2565        editor_b
2566            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2567            .await;
2568        editor_b.update(cx_b, |editor, cx| {
2569            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2570            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2571        });
2572
2573        // Return a resolved completion from the host's language server.
2574        // The resolved completion has an additional text edit.
2575        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2576            |params, _| async move {
2577                assert_eq!(params.label, "first_method(…)");
2578                Ok(lsp::CompletionItem {
2579                    label: "first_method(…)".into(),
2580                    detail: Some("fn(&mut self, B) -> C".into()),
2581                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2582                        new_text: "first_method($1)".to_string(),
2583                        range: lsp::Range::new(
2584                            lsp::Position::new(0, 14),
2585                            lsp::Position::new(0, 14),
2586                        ),
2587                    })),
2588                    additional_text_edits: Some(vec![lsp::TextEdit {
2589                        new_text: "use d::SomeTrait;\n".to_string(),
2590                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2591                    }]),
2592                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2593                    ..Default::default()
2594                })
2595            },
2596        );
2597
2598        // The additional edit is applied.
2599        buffer_a
2600            .condition(&cx_a, |buffer, _| {
2601                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2602            })
2603            .await;
2604        buffer_b
2605            .condition(&cx_b, |buffer, _| {
2606                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2607            })
2608            .await;
2609    }
2610
2611    #[gpui::test(iterations = 10)]
2612    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2613        cx_a.foreground().forbid_parking();
2614        let lang_registry = Arc::new(LanguageRegistry::test());
2615        let fs = FakeFs::new(cx_a.background());
2616
2617        // Connect to a server as 2 clients.
2618        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2619        let client_a = server.create_client(cx_a, "user_a").await;
2620        let client_b = server.create_client(cx_b, "user_b").await;
2621
2622        // Share a project as client A
2623        fs.insert_tree(
2624            "/a",
2625            json!({
2626                ".zed.toml": r#"collaborators = ["user_b"]"#,
2627                "a.rs": "let one = 1;",
2628            }),
2629        )
2630        .await;
2631        let project_a = cx_a.update(|cx| {
2632            Project::local(
2633                client_a.clone(),
2634                client_a.user_store.clone(),
2635                lang_registry.clone(),
2636                fs.clone(),
2637                cx,
2638            )
2639        });
2640        let (worktree_a, _) = project_a
2641            .update(cx_a, |p, cx| {
2642                p.find_or_create_local_worktree("/a", true, cx)
2643            })
2644            .await
2645            .unwrap();
2646        worktree_a
2647            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2648            .await;
2649        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2650        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2651        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2652        let buffer_a = project_a
2653            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2654            .await
2655            .unwrap();
2656
2657        // Join the worktree as client B.
2658        let project_b = Project::remote(
2659            project_id,
2660            client_b.clone(),
2661            client_b.user_store.clone(),
2662            lang_registry.clone(),
2663            fs.clone(),
2664            &mut cx_b.to_async(),
2665        )
2666        .await
2667        .unwrap();
2668
2669        let buffer_b = cx_b
2670            .background()
2671            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2672            .await
2673            .unwrap();
2674        buffer_b.update(cx_b, |buffer, cx| {
2675            buffer.edit([4..7], "six", cx);
2676            buffer.edit([10..11], "6", cx);
2677            assert_eq!(buffer.text(), "let six = 6;");
2678            assert!(buffer.is_dirty());
2679            assert!(!buffer.has_conflict());
2680        });
2681        buffer_a
2682            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
2683            .await;
2684
2685        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
2686            .await
2687            .unwrap();
2688        buffer_a
2689            .condition(cx_a, |buffer, _| buffer.has_conflict())
2690            .await;
2691        buffer_b
2692            .condition(cx_b, |buffer, _| buffer.has_conflict())
2693            .await;
2694
2695        project_b
2696            .update(cx_b, |project, cx| {
2697                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
2698            })
2699            .await
2700            .unwrap();
2701        buffer_a.read_with(cx_a, |buffer, _| {
2702            assert_eq!(buffer.text(), "let seven = 7;");
2703            assert!(!buffer.is_dirty());
2704            assert!(!buffer.has_conflict());
2705        });
2706        buffer_b.read_with(cx_b, |buffer, _| {
2707            assert_eq!(buffer.text(), "let seven = 7;");
2708            assert!(!buffer.is_dirty());
2709            assert!(!buffer.has_conflict());
2710        });
2711
2712        buffer_a.update(cx_a, |buffer, cx| {
2713            // Undoing on the host is a no-op when the reload was initiated by the guest.
2714            buffer.undo(cx);
2715            assert_eq!(buffer.text(), "let seven = 7;");
2716            assert!(!buffer.is_dirty());
2717            assert!(!buffer.has_conflict());
2718        });
2719        buffer_b.update(cx_b, |buffer, cx| {
2720            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
2721            buffer.undo(cx);
2722            assert_eq!(buffer.text(), "let six = 6;");
2723            assert!(buffer.is_dirty());
2724            assert!(!buffer.has_conflict());
2725        });
2726    }
2727
2728    #[gpui::test(iterations = 10)]
2729    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2730        cx_a.foreground().forbid_parking();
2731        let lang_registry = Arc::new(LanguageRegistry::test());
2732        let fs = FakeFs::new(cx_a.background());
2733
2734        // Set up a fake language server.
2735        let mut language = Language::new(
2736            LanguageConfig {
2737                name: "Rust".into(),
2738                path_suffixes: vec!["rs".to_string()],
2739                ..Default::default()
2740            },
2741            Some(tree_sitter_rust::language()),
2742        );
2743        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2744        lang_registry.add(Arc::new(language));
2745
2746        // Connect to a server as 2 clients.
2747        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2748        let client_a = server.create_client(cx_a, "user_a").await;
2749        let client_b = server.create_client(cx_b, "user_b").await;
2750
2751        // Share a project as client A
2752        fs.insert_tree(
2753            "/a",
2754            json!({
2755                ".zed.toml": r#"collaborators = ["user_b"]"#,
2756                "a.rs": "let one = two",
2757            }),
2758        )
2759        .await;
2760        let project_a = cx_a.update(|cx| {
2761            Project::local(
2762                client_a.clone(),
2763                client_a.user_store.clone(),
2764                lang_registry.clone(),
2765                fs.clone(),
2766                cx,
2767            )
2768        });
2769        let (worktree_a, _) = project_a
2770            .update(cx_a, |p, cx| {
2771                p.find_or_create_local_worktree("/a", true, cx)
2772            })
2773            .await
2774            .unwrap();
2775        worktree_a
2776            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2777            .await;
2778        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2779        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2780        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2781
2782        // Join the worktree as client B.
2783        let project_b = Project::remote(
2784            project_id,
2785            client_b.clone(),
2786            client_b.user_store.clone(),
2787            lang_registry.clone(),
2788            fs.clone(),
2789            &mut cx_b.to_async(),
2790        )
2791        .await
2792        .unwrap();
2793
2794        let buffer_b = cx_b
2795            .background()
2796            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2797            .await
2798            .unwrap();
2799
2800        let fake_language_server = fake_language_servers.next().await.unwrap();
2801        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
2802            Ok(Some(vec![
2803                lsp::TextEdit {
2804                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2805                    new_text: "h".to_string(),
2806                },
2807                lsp::TextEdit {
2808                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2809                    new_text: "y".to_string(),
2810                },
2811            ]))
2812        });
2813
2814        project_b
2815            .update(cx_b, |project, cx| {
2816                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2817            })
2818            .await
2819            .unwrap();
2820        assert_eq!(
2821            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2822            "let honey = two"
2823        );
2824    }
2825
2826    #[gpui::test(iterations = 10)]
2827    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2828        cx_a.foreground().forbid_parking();
2829        let lang_registry = Arc::new(LanguageRegistry::test());
2830        let fs = FakeFs::new(cx_a.background());
2831        fs.insert_tree(
2832            "/root-1",
2833            json!({
2834                ".zed.toml": r#"collaborators = ["user_b"]"#,
2835                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2836            }),
2837        )
2838        .await;
2839        fs.insert_tree(
2840            "/root-2",
2841            json!({
2842                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2843            }),
2844        )
2845        .await;
2846
2847        // Set up a fake language server.
2848        let mut language = Language::new(
2849            LanguageConfig {
2850                name: "Rust".into(),
2851                path_suffixes: vec!["rs".to_string()],
2852                ..Default::default()
2853            },
2854            Some(tree_sitter_rust::language()),
2855        );
2856        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2857        lang_registry.add(Arc::new(language));
2858
2859        // Connect to a server as 2 clients.
2860        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2861        let client_a = server.create_client(cx_a, "user_a").await;
2862        let client_b = server.create_client(cx_b, "user_b").await;
2863
2864        // Share a project as client A
2865        let project_a = cx_a.update(|cx| {
2866            Project::local(
2867                client_a.clone(),
2868                client_a.user_store.clone(),
2869                lang_registry.clone(),
2870                fs.clone(),
2871                cx,
2872            )
2873        });
2874        let (worktree_a, _) = project_a
2875            .update(cx_a, |p, cx| {
2876                p.find_or_create_local_worktree("/root-1", true, cx)
2877            })
2878            .await
2879            .unwrap();
2880        worktree_a
2881            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2882            .await;
2883        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2884        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2885        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2886
2887        // Join the worktree as client B.
2888        let project_b = Project::remote(
2889            project_id,
2890            client_b.clone(),
2891            client_b.user_store.clone(),
2892            lang_registry.clone(),
2893            fs.clone(),
2894            &mut cx_b.to_async(),
2895        )
2896        .await
2897        .unwrap();
2898
2899        // Open the file on client B.
2900        let buffer_b = cx_b
2901            .background()
2902            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2903            .await
2904            .unwrap();
2905
2906        // Request the definition of a symbol as the guest.
2907        let fake_language_server = fake_language_servers.next().await.unwrap();
2908        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2909            |_, _| async move {
2910                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2911                    lsp::Location::new(
2912                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2913                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2914                    ),
2915                )))
2916            },
2917        );
2918
2919        let definitions_1 = project_b
2920            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2921            .await
2922            .unwrap();
2923        cx_b.read(|cx| {
2924            assert_eq!(definitions_1.len(), 1);
2925            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2926            let target_buffer = definitions_1[0].buffer.read(cx);
2927            assert_eq!(
2928                target_buffer.text(),
2929                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2930            );
2931            assert_eq!(
2932                definitions_1[0].range.to_point(target_buffer),
2933                Point::new(0, 6)..Point::new(0, 9)
2934            );
2935        });
2936
2937        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2938        // the previous call to `definition`.
2939        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2940            |_, _| async move {
2941                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2942                    lsp::Location::new(
2943                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2944                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2945                    ),
2946                )))
2947            },
2948        );
2949
2950        let definitions_2 = project_b
2951            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2952            .await
2953            .unwrap();
2954        cx_b.read(|cx| {
2955            assert_eq!(definitions_2.len(), 1);
2956            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2957            let target_buffer = definitions_2[0].buffer.read(cx);
2958            assert_eq!(
2959                target_buffer.text(),
2960                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2961            );
2962            assert_eq!(
2963                definitions_2[0].range.to_point(target_buffer),
2964                Point::new(1, 6)..Point::new(1, 11)
2965            );
2966        });
2967        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2968    }
2969
2970    #[gpui::test(iterations = 10)]
2971    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2972        cx_a.foreground().forbid_parking();
2973        let lang_registry = Arc::new(LanguageRegistry::test());
2974        let fs = FakeFs::new(cx_a.background());
2975        fs.insert_tree(
2976            "/root-1",
2977            json!({
2978                ".zed.toml": r#"collaborators = ["user_b"]"#,
2979                "one.rs": "const ONE: usize = 1;",
2980                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2981            }),
2982        )
2983        .await;
2984        fs.insert_tree(
2985            "/root-2",
2986            json!({
2987                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2988            }),
2989        )
2990        .await;
2991
2992        // Set up a fake language server.
2993        let mut language = Language::new(
2994            LanguageConfig {
2995                name: "Rust".into(),
2996                path_suffixes: vec!["rs".to_string()],
2997                ..Default::default()
2998            },
2999            Some(tree_sitter_rust::language()),
3000        );
3001        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3002        lang_registry.add(Arc::new(language));
3003
3004        // Connect to a server as 2 clients.
3005        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3006        let client_a = server.create_client(cx_a, "user_a").await;
3007        let client_b = server.create_client(cx_b, "user_b").await;
3008
3009        // Share a project as client A
3010        let project_a = cx_a.update(|cx| {
3011            Project::local(
3012                client_a.clone(),
3013                client_a.user_store.clone(),
3014                lang_registry.clone(),
3015                fs.clone(),
3016                cx,
3017            )
3018        });
3019        let (worktree_a, _) = project_a
3020            .update(cx_a, |p, cx| {
3021                p.find_or_create_local_worktree("/root-1", true, cx)
3022            })
3023            .await
3024            .unwrap();
3025        worktree_a
3026            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3027            .await;
3028        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3029        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3030        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3031
3032        // Join the worktree as client B.
3033        let project_b = Project::remote(
3034            project_id,
3035            client_b.clone(),
3036            client_b.user_store.clone(),
3037            lang_registry.clone(),
3038            fs.clone(),
3039            &mut cx_b.to_async(),
3040        )
3041        .await
3042        .unwrap();
3043
3044        // Open the file on client B.
3045        let buffer_b = cx_b
3046            .background()
3047            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3048            .await
3049            .unwrap();
3050
3051        // Request references to a symbol as the guest.
3052        let fake_language_server = fake_language_servers.next().await.unwrap();
3053        fake_language_server.handle_request::<lsp::request::References, _, _>(
3054            |params, _| async move {
3055                assert_eq!(
3056                    params.text_document_position.text_document.uri.as_str(),
3057                    "file:///root-1/one.rs"
3058                );
3059                Ok(Some(vec![
3060                    lsp::Location {
3061                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3062                        range: lsp::Range::new(
3063                            lsp::Position::new(0, 24),
3064                            lsp::Position::new(0, 27),
3065                        ),
3066                    },
3067                    lsp::Location {
3068                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3069                        range: lsp::Range::new(
3070                            lsp::Position::new(0, 35),
3071                            lsp::Position::new(0, 38),
3072                        ),
3073                    },
3074                    lsp::Location {
3075                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3076                        range: lsp::Range::new(
3077                            lsp::Position::new(0, 37),
3078                            lsp::Position::new(0, 40),
3079                        ),
3080                    },
3081                ]))
3082            },
3083        );
3084
3085        let references = project_b
3086            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3087            .await
3088            .unwrap();
3089        cx_b.read(|cx| {
3090            assert_eq!(references.len(), 3);
3091            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3092
3093            let two_buffer = references[0].buffer.read(cx);
3094            let three_buffer = references[2].buffer.read(cx);
3095            assert_eq!(
3096                two_buffer.file().unwrap().path().as_ref(),
3097                Path::new("two.rs")
3098            );
3099            assert_eq!(references[1].buffer, references[0].buffer);
3100            assert_eq!(
3101                three_buffer.file().unwrap().full_path(cx),
3102                Path::new("three.rs")
3103            );
3104
3105            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3106            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3107            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3108        });
3109    }
3110
3111    #[gpui::test(iterations = 10)]
3112    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3113        cx_a.foreground().forbid_parking();
3114        let lang_registry = Arc::new(LanguageRegistry::test());
3115        let fs = FakeFs::new(cx_a.background());
3116        fs.insert_tree(
3117            "/root-1",
3118            json!({
3119                ".zed.toml": r#"collaborators = ["user_b"]"#,
3120                "a": "hello world",
3121                "b": "goodnight moon",
3122                "c": "a world of goo",
3123                "d": "world champion of clown world",
3124            }),
3125        )
3126        .await;
3127        fs.insert_tree(
3128            "/root-2",
3129            json!({
3130                "e": "disney world is fun",
3131            }),
3132        )
3133        .await;
3134
3135        // Connect to a server as 2 clients.
3136        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3137        let client_a = server.create_client(cx_a, "user_a").await;
3138        let client_b = server.create_client(cx_b, "user_b").await;
3139
3140        // Share a project as client A
3141        let project_a = cx_a.update(|cx| {
3142            Project::local(
3143                client_a.clone(),
3144                client_a.user_store.clone(),
3145                lang_registry.clone(),
3146                fs.clone(),
3147                cx,
3148            )
3149        });
3150        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3151
3152        let (worktree_1, _) = project_a
3153            .update(cx_a, |p, cx| {
3154                p.find_or_create_local_worktree("/root-1", true, cx)
3155            })
3156            .await
3157            .unwrap();
3158        worktree_1
3159            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3160            .await;
3161        let (worktree_2, _) = project_a
3162            .update(cx_a, |p, cx| {
3163                p.find_or_create_local_worktree("/root-2", true, cx)
3164            })
3165            .await
3166            .unwrap();
3167        worktree_2
3168            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3169            .await;
3170
3171        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3172
3173        // Join the worktree as client B.
3174        let project_b = Project::remote(
3175            project_id,
3176            client_b.clone(),
3177            client_b.user_store.clone(),
3178            lang_registry.clone(),
3179            fs.clone(),
3180            &mut cx_b.to_async(),
3181        )
3182        .await
3183        .unwrap();
3184
3185        let results = project_b
3186            .update(cx_b, |project, cx| {
3187                project.search(SearchQuery::text("world", false, false), cx)
3188            })
3189            .await
3190            .unwrap();
3191
3192        let mut ranges_by_path = results
3193            .into_iter()
3194            .map(|(buffer, ranges)| {
3195                buffer.read_with(cx_b, |buffer, cx| {
3196                    let path = buffer.file().unwrap().full_path(cx);
3197                    let offset_ranges = ranges
3198                        .into_iter()
3199                        .map(|range| range.to_offset(buffer))
3200                        .collect::<Vec<_>>();
3201                    (path, offset_ranges)
3202                })
3203            })
3204            .collect::<Vec<_>>();
3205        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3206
3207        assert_eq!(
3208            ranges_by_path,
3209            &[
3210                (PathBuf::from("root-1/a"), vec![6..11]),
3211                (PathBuf::from("root-1/c"), vec![2..7]),
3212                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3213                (PathBuf::from("root-2/e"), vec![7..12]),
3214            ]
3215        );
3216    }
3217
3218    #[gpui::test(iterations = 10)]
3219    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3220        cx_a.foreground().forbid_parking();
3221        let lang_registry = Arc::new(LanguageRegistry::test());
3222        let fs = FakeFs::new(cx_a.background());
3223        fs.insert_tree(
3224            "/root-1",
3225            json!({
3226                ".zed.toml": r#"collaborators = ["user_b"]"#,
3227                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3228            }),
3229        )
3230        .await;
3231
3232        // Set up a fake language server.
3233        let mut language = Language::new(
3234            LanguageConfig {
3235                name: "Rust".into(),
3236                path_suffixes: vec!["rs".to_string()],
3237                ..Default::default()
3238            },
3239            Some(tree_sitter_rust::language()),
3240        );
3241        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3242        lang_registry.add(Arc::new(language));
3243
3244        // Connect to a server as 2 clients.
3245        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3246        let client_a = server.create_client(cx_a, "user_a").await;
3247        let client_b = server.create_client(cx_b, "user_b").await;
3248
3249        // Share a project as client A
3250        let project_a = cx_a.update(|cx| {
3251            Project::local(
3252                client_a.clone(),
3253                client_a.user_store.clone(),
3254                lang_registry.clone(),
3255                fs.clone(),
3256                cx,
3257            )
3258        });
3259        let (worktree_a, _) = project_a
3260            .update(cx_a, |p, cx| {
3261                p.find_or_create_local_worktree("/root-1", true, cx)
3262            })
3263            .await
3264            .unwrap();
3265        worktree_a
3266            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3267            .await;
3268        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3269        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3270        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3271
3272        // Join the worktree as client B.
3273        let project_b = Project::remote(
3274            project_id,
3275            client_b.clone(),
3276            client_b.user_store.clone(),
3277            lang_registry.clone(),
3278            fs.clone(),
3279            &mut cx_b.to_async(),
3280        )
3281        .await
3282        .unwrap();
3283
3284        // Open the file on client B.
3285        let buffer_b = cx_b
3286            .background()
3287            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3288            .await
3289            .unwrap();
3290
3291        // Request document highlights as the guest.
3292        let fake_language_server = fake_language_servers.next().await.unwrap();
3293        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3294            |params, _| async move {
3295                assert_eq!(
3296                    params
3297                        .text_document_position_params
3298                        .text_document
3299                        .uri
3300                        .as_str(),
3301                    "file:///root-1/main.rs"
3302                );
3303                assert_eq!(
3304                    params.text_document_position_params.position,
3305                    lsp::Position::new(0, 34)
3306                );
3307                Ok(Some(vec![
3308                    lsp::DocumentHighlight {
3309                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3310                        range: lsp::Range::new(
3311                            lsp::Position::new(0, 10),
3312                            lsp::Position::new(0, 16),
3313                        ),
3314                    },
3315                    lsp::DocumentHighlight {
3316                        kind: Some(lsp::DocumentHighlightKind::READ),
3317                        range: lsp::Range::new(
3318                            lsp::Position::new(0, 32),
3319                            lsp::Position::new(0, 38),
3320                        ),
3321                    },
3322                    lsp::DocumentHighlight {
3323                        kind: Some(lsp::DocumentHighlightKind::READ),
3324                        range: lsp::Range::new(
3325                            lsp::Position::new(0, 41),
3326                            lsp::Position::new(0, 47),
3327                        ),
3328                    },
3329                ]))
3330            },
3331        );
3332
3333        let highlights = project_b
3334            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3335            .await
3336            .unwrap();
3337        buffer_b.read_with(cx_b, |buffer, _| {
3338            let snapshot = buffer.snapshot();
3339
3340            let highlights = highlights
3341                .into_iter()
3342                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3343                .collect::<Vec<_>>();
3344            assert_eq!(
3345                highlights,
3346                &[
3347                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3348                    (lsp::DocumentHighlightKind::READ, 32..38),
3349                    (lsp::DocumentHighlightKind::READ, 41..47)
3350                ]
3351            )
3352        });
3353    }
3354
3355    #[gpui::test(iterations = 10)]
3356    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3357        cx_a.foreground().forbid_parking();
3358        let lang_registry = Arc::new(LanguageRegistry::test());
3359        let fs = FakeFs::new(cx_a.background());
3360        fs.insert_tree(
3361            "/code",
3362            json!({
3363                "crate-1": {
3364                    ".zed.toml": r#"collaborators = ["user_b"]"#,
3365                    "one.rs": "const ONE: usize = 1;",
3366                },
3367                "crate-2": {
3368                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3369                },
3370                "private": {
3371                    "passwords.txt": "the-password",
3372                }
3373            }),
3374        )
3375        .await;
3376
3377        // Set up a fake language server.
3378        let mut language = Language::new(
3379            LanguageConfig {
3380                name: "Rust".into(),
3381                path_suffixes: vec!["rs".to_string()],
3382                ..Default::default()
3383            },
3384            Some(tree_sitter_rust::language()),
3385        );
3386        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3387        lang_registry.add(Arc::new(language));
3388
3389        // Connect to a server as 2 clients.
3390        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3391        let client_a = server.create_client(cx_a, "user_a").await;
3392        let client_b = server.create_client(cx_b, "user_b").await;
3393
3394        // Share a project as client A
3395        let project_a = cx_a.update(|cx| {
3396            Project::local(
3397                client_a.clone(),
3398                client_a.user_store.clone(),
3399                lang_registry.clone(),
3400                fs.clone(),
3401                cx,
3402            )
3403        });
3404        let (worktree_a, _) = project_a
3405            .update(cx_a, |p, cx| {
3406                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3407            })
3408            .await
3409            .unwrap();
3410        worktree_a
3411            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3412            .await;
3413        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3414        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3415        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3416
3417        // Join the worktree as client B.
3418        let project_b = Project::remote(
3419            project_id,
3420            client_b.clone(),
3421            client_b.user_store.clone(),
3422            lang_registry.clone(),
3423            fs.clone(),
3424            &mut cx_b.to_async(),
3425        )
3426        .await
3427        .unwrap();
3428
3429        // Cause the language server to start.
3430        let _buffer = cx_b
3431            .background()
3432            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3433            .await
3434            .unwrap();
3435
3436        let fake_language_server = fake_language_servers.next().await.unwrap();
3437        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3438            |_, _| async move {
3439                #[allow(deprecated)]
3440                Ok(Some(vec![lsp::SymbolInformation {
3441                    name: "TWO".into(),
3442                    location: lsp::Location {
3443                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3444                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3445                    },
3446                    kind: lsp::SymbolKind::CONSTANT,
3447                    tags: None,
3448                    container_name: None,
3449                    deprecated: None,
3450                }]))
3451            },
3452        );
3453
3454        // Request the definition of a symbol as the guest.
3455        let symbols = project_b
3456            .update(cx_b, |p, cx| p.symbols("two", cx))
3457            .await
3458            .unwrap();
3459        assert_eq!(symbols.len(), 1);
3460        assert_eq!(symbols[0].name, "TWO");
3461
3462        // Open one of the returned symbols.
3463        let buffer_b_2 = project_b
3464            .update(cx_b, |project, cx| {
3465                project.open_buffer_for_symbol(&symbols[0], cx)
3466            })
3467            .await
3468            .unwrap();
3469        buffer_b_2.read_with(cx_b, |buffer, _| {
3470            assert_eq!(
3471                buffer.file().unwrap().path().as_ref(),
3472                Path::new("../crate-2/two.rs")
3473            );
3474        });
3475
3476        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3477        let mut fake_symbol = symbols[0].clone();
3478        fake_symbol.path = Path::new("/code/secrets").into();
3479        let error = project_b
3480            .update(cx_b, |project, cx| {
3481                project.open_buffer_for_symbol(&fake_symbol, cx)
3482            })
3483            .await
3484            .unwrap_err();
3485        assert!(error.to_string().contains("invalid symbol signature"));
3486    }
3487
3488    #[gpui::test(iterations = 10)]
3489    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3490        cx_a: &mut TestAppContext,
3491        cx_b: &mut TestAppContext,
3492        mut rng: StdRng,
3493    ) {
3494        cx_a.foreground().forbid_parking();
3495        let lang_registry = Arc::new(LanguageRegistry::test());
3496        let fs = FakeFs::new(cx_a.background());
3497        fs.insert_tree(
3498            "/root",
3499            json!({
3500                ".zed.toml": r#"collaborators = ["user_b"]"#,
3501                "a.rs": "const ONE: usize = b::TWO;",
3502                "b.rs": "const TWO: usize = 2",
3503            }),
3504        )
3505        .await;
3506
3507        // Set up a fake language server.
3508        let mut language = Language::new(
3509            LanguageConfig {
3510                name: "Rust".into(),
3511                path_suffixes: vec!["rs".to_string()],
3512                ..Default::default()
3513            },
3514            Some(tree_sitter_rust::language()),
3515        );
3516        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3517        lang_registry.add(Arc::new(language));
3518
3519        // Connect to a server as 2 clients.
3520        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3521        let client_a = server.create_client(cx_a, "user_a").await;
3522        let client_b = server.create_client(cx_b, "user_b").await;
3523
3524        // Share a project as client A
3525        let project_a = cx_a.update(|cx| {
3526            Project::local(
3527                client_a.clone(),
3528                client_a.user_store.clone(),
3529                lang_registry.clone(),
3530                fs.clone(),
3531                cx,
3532            )
3533        });
3534
3535        let (worktree_a, _) = project_a
3536            .update(cx_a, |p, cx| {
3537                p.find_or_create_local_worktree("/root", true, cx)
3538            })
3539            .await
3540            .unwrap();
3541        worktree_a
3542            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3543            .await;
3544        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3545        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3546        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3547
3548        // Join the worktree as client B.
3549        let project_b = Project::remote(
3550            project_id,
3551            client_b.clone(),
3552            client_b.user_store.clone(),
3553            lang_registry.clone(),
3554            fs.clone(),
3555            &mut cx_b.to_async(),
3556        )
3557        .await
3558        .unwrap();
3559
3560        let buffer_b1 = cx_b
3561            .background()
3562            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3563            .await
3564            .unwrap();
3565
3566        let fake_language_server = fake_language_servers.next().await.unwrap();
3567        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3568            |_, _| async move {
3569                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3570                    lsp::Location::new(
3571                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
3572                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3573                    ),
3574                )))
3575            },
3576        );
3577
3578        let definitions;
3579        let buffer_b2;
3580        if rng.gen() {
3581            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3582            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3583        } else {
3584            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3585            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3586        }
3587
3588        let buffer_b2 = buffer_b2.await.unwrap();
3589        let definitions = definitions.await.unwrap();
3590        assert_eq!(definitions.len(), 1);
3591        assert_eq!(definitions[0].buffer, buffer_b2);
3592    }
3593
3594    #[gpui::test(iterations = 10)]
3595    async fn test_collaborating_with_code_actions(
3596        cx_a: &mut TestAppContext,
3597        cx_b: &mut TestAppContext,
3598    ) {
3599        cx_a.foreground().forbid_parking();
3600        let lang_registry = Arc::new(LanguageRegistry::test());
3601        let fs = FakeFs::new(cx_a.background());
3602        cx_b.update(|cx| editor::init(cx));
3603
3604        // Set up a fake language server.
3605        let mut language = Language::new(
3606            LanguageConfig {
3607                name: "Rust".into(),
3608                path_suffixes: vec!["rs".to_string()],
3609                ..Default::default()
3610            },
3611            Some(tree_sitter_rust::language()),
3612        );
3613        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3614        lang_registry.add(Arc::new(language));
3615
3616        // Connect to a server as 2 clients.
3617        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3618        let client_a = server.create_client(cx_a, "user_a").await;
3619        let client_b = server.create_client(cx_b, "user_b").await;
3620
3621        // Share a project as client A
3622        fs.insert_tree(
3623            "/a",
3624            json!({
3625                ".zed.toml": r#"collaborators = ["user_b"]"#,
3626                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3627                "other.rs": "pub fn foo() -> usize { 4 }",
3628            }),
3629        )
3630        .await;
3631        let project_a = cx_a.update(|cx| {
3632            Project::local(
3633                client_a.clone(),
3634                client_a.user_store.clone(),
3635                lang_registry.clone(),
3636                fs.clone(),
3637                cx,
3638            )
3639        });
3640        let (worktree_a, _) = project_a
3641            .update(cx_a, |p, cx| {
3642                p.find_or_create_local_worktree("/a", true, cx)
3643            })
3644            .await
3645            .unwrap();
3646        worktree_a
3647            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3648            .await;
3649        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3650        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3651        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3652
3653        // Join the worktree as client B.
3654        let project_b = Project::remote(
3655            project_id,
3656            client_b.clone(),
3657            client_b.user_store.clone(),
3658            lang_registry.clone(),
3659            fs.clone(),
3660            &mut cx_b.to_async(),
3661        )
3662        .await
3663        .unwrap();
3664        let mut params = cx_b.update(WorkspaceParams::test);
3665        params.languages = lang_registry.clone();
3666        params.client = client_b.client.clone();
3667        params.user_store = client_b.user_store.clone();
3668        params.project = project_b;
3669
3670        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3671        let editor_b = workspace_b
3672            .update(cx_b, |workspace, cx| {
3673                workspace.open_path((worktree_id, "main.rs"), cx)
3674            })
3675            .await
3676            .unwrap()
3677            .downcast::<Editor>()
3678            .unwrap();
3679
3680        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3681        fake_language_server
3682            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3683                assert_eq!(
3684                    params.text_document.uri,
3685                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3686                );
3687                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3688                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3689                Ok(None)
3690            })
3691            .next()
3692            .await;
3693
3694        // Move cursor to a location that contains code actions.
3695        editor_b.update(cx_b, |editor, cx| {
3696            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3697            cx.focus(&editor_b);
3698        });
3699
3700        fake_language_server
3701            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3702                assert_eq!(
3703                    params.text_document.uri,
3704                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3705                );
3706                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3707                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3708
3709                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
3710                    lsp::CodeAction {
3711                        title: "Inline into all callers".to_string(),
3712                        edit: Some(lsp::WorkspaceEdit {
3713                            changes: Some(
3714                                [
3715                                    (
3716                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3717                                        vec![lsp::TextEdit::new(
3718                                            lsp::Range::new(
3719                                                lsp::Position::new(1, 22),
3720                                                lsp::Position::new(1, 34),
3721                                            ),
3722                                            "4".to_string(),
3723                                        )],
3724                                    ),
3725                                    (
3726                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3727                                        vec![lsp::TextEdit::new(
3728                                            lsp::Range::new(
3729                                                lsp::Position::new(0, 0),
3730                                                lsp::Position::new(0, 27),
3731                                            ),
3732                                            "".to_string(),
3733                                        )],
3734                                    ),
3735                                ]
3736                                .into_iter()
3737                                .collect(),
3738                            ),
3739                            ..Default::default()
3740                        }),
3741                        data: Some(json!({
3742                            "codeActionParams": {
3743                                "range": {
3744                                    "start": {"line": 1, "column": 31},
3745                                    "end": {"line": 1, "column": 31},
3746                                }
3747                            }
3748                        })),
3749                        ..Default::default()
3750                    },
3751                )]))
3752            })
3753            .next()
3754            .await;
3755
3756        // Toggle code actions and wait for them to display.
3757        editor_b.update(cx_b, |editor, cx| {
3758            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3759        });
3760        editor_b
3761            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3762            .await;
3763
3764        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3765
3766        // Confirming the code action will trigger a resolve request.
3767        let confirm_action = workspace_b
3768            .update(cx_b, |workspace, cx| {
3769                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3770            })
3771            .unwrap();
3772        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
3773            |_, _| async move {
3774                Ok(lsp::CodeAction {
3775                    title: "Inline into all callers".to_string(),
3776                    edit: Some(lsp::WorkspaceEdit {
3777                        changes: Some(
3778                            [
3779                                (
3780                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3781                                    vec![lsp::TextEdit::new(
3782                                        lsp::Range::new(
3783                                            lsp::Position::new(1, 22),
3784                                            lsp::Position::new(1, 34),
3785                                        ),
3786                                        "4".to_string(),
3787                                    )],
3788                                ),
3789                                (
3790                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
3791                                    vec![lsp::TextEdit::new(
3792                                        lsp::Range::new(
3793                                            lsp::Position::new(0, 0),
3794                                            lsp::Position::new(0, 27),
3795                                        ),
3796                                        "".to_string(),
3797                                    )],
3798                                ),
3799                            ]
3800                            .into_iter()
3801                            .collect(),
3802                        ),
3803                        ..Default::default()
3804                    }),
3805                    ..Default::default()
3806                })
3807            },
3808        );
3809
3810        // After the action is confirmed, an editor containing both modified files is opened.
3811        confirm_action.await.unwrap();
3812        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3813            workspace
3814                .active_item(cx)
3815                .unwrap()
3816                .downcast::<Editor>()
3817                .unwrap()
3818        });
3819        code_action_editor.update(cx_b, |editor, cx| {
3820            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3821            editor.undo(&Undo, cx);
3822            assert_eq!(
3823                editor.text(cx),
3824                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3825            );
3826            editor.redo(&Redo, cx);
3827            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3828        });
3829    }
3830
3831    #[gpui::test(iterations = 10)]
3832    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3833        cx_a.foreground().forbid_parking();
3834        let lang_registry = Arc::new(LanguageRegistry::test());
3835        let fs = FakeFs::new(cx_a.background());
3836        cx_b.update(|cx| editor::init(cx));
3837
3838        // Set up a fake language server.
3839        let mut language = Language::new(
3840            LanguageConfig {
3841                name: "Rust".into(),
3842                path_suffixes: vec!["rs".to_string()],
3843                ..Default::default()
3844            },
3845            Some(tree_sitter_rust::language()),
3846        );
3847        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3848        lang_registry.add(Arc::new(language));
3849
3850        // Connect to a server as 2 clients.
3851        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3852        let client_a = server.create_client(cx_a, "user_a").await;
3853        let client_b = server.create_client(cx_b, "user_b").await;
3854
3855        // Share a project as client A
3856        fs.insert_tree(
3857            "/dir",
3858            json!({
3859                ".zed.toml": r#"collaborators = ["user_b"]"#,
3860                "one.rs": "const ONE: usize = 1;",
3861                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3862            }),
3863        )
3864        .await;
3865        let project_a = cx_a.update(|cx| {
3866            Project::local(
3867                client_a.clone(),
3868                client_a.user_store.clone(),
3869                lang_registry.clone(),
3870                fs.clone(),
3871                cx,
3872            )
3873        });
3874        let (worktree_a, _) = project_a
3875            .update(cx_a, |p, cx| {
3876                p.find_or_create_local_worktree("/dir", true, cx)
3877            })
3878            .await
3879            .unwrap();
3880        worktree_a
3881            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3882            .await;
3883        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3884        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3885        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3886
3887        // Join the worktree as client B.
3888        let project_b = Project::remote(
3889            project_id,
3890            client_b.clone(),
3891            client_b.user_store.clone(),
3892            lang_registry.clone(),
3893            fs.clone(),
3894            &mut cx_b.to_async(),
3895        )
3896        .await
3897        .unwrap();
3898        let mut params = cx_b.update(WorkspaceParams::test);
3899        params.languages = lang_registry.clone();
3900        params.client = client_b.client.clone();
3901        params.user_store = client_b.user_store.clone();
3902        params.project = project_b;
3903
3904        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3905        let editor_b = workspace_b
3906            .update(cx_b, |workspace, cx| {
3907                workspace.open_path((worktree_id, "one.rs"), cx)
3908            })
3909            .await
3910            .unwrap()
3911            .downcast::<Editor>()
3912            .unwrap();
3913        let fake_language_server = fake_language_servers.next().await.unwrap();
3914
3915        // Move cursor to a location that can be renamed.
3916        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3917            editor.select_ranges([7..7], None, cx);
3918            editor.rename(&Rename, cx).unwrap()
3919        });
3920
3921        fake_language_server
3922            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
3923                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3924                assert_eq!(params.position, lsp::Position::new(0, 7));
3925                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3926                    lsp::Position::new(0, 6),
3927                    lsp::Position::new(0, 9),
3928                ))))
3929            })
3930            .next()
3931            .await
3932            .unwrap();
3933        prepare_rename.await.unwrap();
3934        editor_b.update(cx_b, |editor, cx| {
3935            let rename = editor.pending_rename().unwrap();
3936            let buffer = editor.buffer().read(cx).snapshot(cx);
3937            assert_eq!(
3938                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3939                6..9
3940            );
3941            rename.editor.update(cx, |rename_editor, cx| {
3942                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3943                    rename_buffer.edit([0..3], "THREE", cx);
3944                });
3945            });
3946        });
3947
3948        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3949            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3950        });
3951        fake_language_server
3952            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
3953                assert_eq!(
3954                    params.text_document_position.text_document.uri.as_str(),
3955                    "file:///dir/one.rs"
3956                );
3957                assert_eq!(
3958                    params.text_document_position.position,
3959                    lsp::Position::new(0, 6)
3960                );
3961                assert_eq!(params.new_name, "THREE");
3962                Ok(Some(lsp::WorkspaceEdit {
3963                    changes: Some(
3964                        [
3965                            (
3966                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3967                                vec![lsp::TextEdit::new(
3968                                    lsp::Range::new(
3969                                        lsp::Position::new(0, 6),
3970                                        lsp::Position::new(0, 9),
3971                                    ),
3972                                    "THREE".to_string(),
3973                                )],
3974                            ),
3975                            (
3976                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3977                                vec![
3978                                    lsp::TextEdit::new(
3979                                        lsp::Range::new(
3980                                            lsp::Position::new(0, 24),
3981                                            lsp::Position::new(0, 27),
3982                                        ),
3983                                        "THREE".to_string(),
3984                                    ),
3985                                    lsp::TextEdit::new(
3986                                        lsp::Range::new(
3987                                            lsp::Position::new(0, 35),
3988                                            lsp::Position::new(0, 38),
3989                                        ),
3990                                        "THREE".to_string(),
3991                                    ),
3992                                ],
3993                            ),
3994                        ]
3995                        .into_iter()
3996                        .collect(),
3997                    ),
3998                    ..Default::default()
3999                }))
4000            })
4001            .next()
4002            .await
4003            .unwrap();
4004        confirm_rename.await.unwrap();
4005
4006        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4007            workspace
4008                .active_item(cx)
4009                .unwrap()
4010                .downcast::<Editor>()
4011                .unwrap()
4012        });
4013        rename_editor.update(cx_b, |editor, cx| {
4014            assert_eq!(
4015                editor.text(cx),
4016                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
4017            );
4018            editor.undo(&Undo, cx);
4019            assert_eq!(
4020                editor.text(cx),
4021                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
4022            );
4023            editor.redo(&Redo, cx);
4024            assert_eq!(
4025                editor.text(cx),
4026                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
4027            );
4028        });
4029
4030        // Ensure temporary rename edits cannot be undone/redone.
4031        editor_b.update(cx_b, |editor, cx| {
4032            editor.undo(&Undo, cx);
4033            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4034            editor.undo(&Undo, cx);
4035            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4036            editor.redo(&Redo, cx);
4037            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4038        })
4039    }
4040
4041    #[gpui::test(iterations = 10)]
4042    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4043        cx_a.foreground().forbid_parking();
4044
4045        // Connect to a server as 2 clients.
4046        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4047        let client_a = server.create_client(cx_a, "user_a").await;
4048        let client_b = server.create_client(cx_b, "user_b").await;
4049
4050        // Create an org that includes these 2 users.
4051        let db = &server.app_state.db;
4052        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4053        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4054            .await
4055            .unwrap();
4056        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4057            .await
4058            .unwrap();
4059
4060        // Create a channel that includes all the users.
4061        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4062        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4063            .await
4064            .unwrap();
4065        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4066            .await
4067            .unwrap();
4068        db.create_channel_message(
4069            channel_id,
4070            client_b.current_user_id(&cx_b),
4071            "hello A, it's B.",
4072            OffsetDateTime::now_utc(),
4073            1,
4074        )
4075        .await
4076        .unwrap();
4077
4078        let channels_a = cx_a
4079            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4080        channels_a
4081            .condition(cx_a, |list, _| list.available_channels().is_some())
4082            .await;
4083        channels_a.read_with(cx_a, |list, _| {
4084            assert_eq!(
4085                list.available_channels().unwrap(),
4086                &[ChannelDetails {
4087                    id: channel_id.to_proto(),
4088                    name: "test-channel".to_string()
4089                }]
4090            )
4091        });
4092        let channel_a = channels_a.update(cx_a, |this, cx| {
4093            this.get_channel(channel_id.to_proto(), cx).unwrap()
4094        });
4095        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4096        channel_a
4097            .condition(&cx_a, |channel, _| {
4098                channel_messages(channel)
4099                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4100            })
4101            .await;
4102
4103        let channels_b = cx_b
4104            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4105        channels_b
4106            .condition(cx_b, |list, _| list.available_channels().is_some())
4107            .await;
4108        channels_b.read_with(cx_b, |list, _| {
4109            assert_eq!(
4110                list.available_channels().unwrap(),
4111                &[ChannelDetails {
4112                    id: channel_id.to_proto(),
4113                    name: "test-channel".to_string()
4114                }]
4115            )
4116        });
4117
4118        let channel_b = channels_b.update(cx_b, |this, cx| {
4119            this.get_channel(channel_id.to_proto(), cx).unwrap()
4120        });
4121        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4122        channel_b
4123            .condition(&cx_b, |channel, _| {
4124                channel_messages(channel)
4125                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4126            })
4127            .await;
4128
4129        channel_a
4130            .update(cx_a, |channel, cx| {
4131                channel
4132                    .send_message("oh, hi B.".to_string(), cx)
4133                    .unwrap()
4134                    .detach();
4135                let task = channel.send_message("sup".to_string(), cx).unwrap();
4136                assert_eq!(
4137                    channel_messages(channel),
4138                    &[
4139                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4140                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4141                        ("user_a".to_string(), "sup".to_string(), true)
4142                    ]
4143                );
4144                task
4145            })
4146            .await
4147            .unwrap();
4148
4149        channel_b
4150            .condition(&cx_b, |channel, _| {
4151                channel_messages(channel)
4152                    == [
4153                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4154                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4155                        ("user_a".to_string(), "sup".to_string(), false),
4156                    ]
4157            })
4158            .await;
4159
4160        assert_eq!(
4161            server
4162                .state()
4163                .await
4164                .channel(channel_id)
4165                .unwrap()
4166                .connection_ids
4167                .len(),
4168            2
4169        );
4170        cx_b.update(|_| drop(channel_b));
4171        server
4172            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4173            .await;
4174
4175        cx_a.update(|_| drop(channel_a));
4176        server
4177            .condition(|state| state.channel(channel_id).is_none())
4178            .await;
4179    }
4180
4181    #[gpui::test(iterations = 10)]
4182    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4183        cx_a.foreground().forbid_parking();
4184
4185        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4186        let client_a = server.create_client(cx_a, "user_a").await;
4187
4188        let db = &server.app_state.db;
4189        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4190        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4191        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4192            .await
4193            .unwrap();
4194        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4195            .await
4196            .unwrap();
4197
4198        let channels_a = cx_a
4199            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4200        channels_a
4201            .condition(cx_a, |list, _| list.available_channels().is_some())
4202            .await;
4203        let channel_a = channels_a.update(cx_a, |this, cx| {
4204            this.get_channel(channel_id.to_proto(), cx).unwrap()
4205        });
4206
4207        // Messages aren't allowed to be too long.
4208        channel_a
4209            .update(cx_a, |channel, cx| {
4210                let long_body = "this is long.\n".repeat(1024);
4211                channel.send_message(long_body, cx).unwrap()
4212            })
4213            .await
4214            .unwrap_err();
4215
4216        // Messages aren't allowed to be blank.
4217        channel_a.update(cx_a, |channel, cx| {
4218            channel.send_message(String::new(), cx).unwrap_err()
4219        });
4220
4221        // Leading and trailing whitespace are trimmed.
4222        channel_a
4223            .update(cx_a, |channel, cx| {
4224                channel
4225                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4226                    .unwrap()
4227            })
4228            .await
4229            .unwrap();
4230        assert_eq!(
4231            db.get_channel_messages(channel_id, 10, None)
4232                .await
4233                .unwrap()
4234                .iter()
4235                .map(|m| &m.body)
4236                .collect::<Vec<_>>(),
4237            &["surrounded by whitespace"]
4238        );
4239    }
4240
4241    #[gpui::test(iterations = 10)]
4242    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4243        cx_a.foreground().forbid_parking();
4244
4245        // Connect to a server as 2 clients.
4246        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4247        let client_a = server.create_client(cx_a, "user_a").await;
4248        let client_b = server.create_client(cx_b, "user_b").await;
4249        let mut status_b = client_b.status();
4250
4251        // Create an org that includes these 2 users.
4252        let db = &server.app_state.db;
4253        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4254        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4255            .await
4256            .unwrap();
4257        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4258            .await
4259            .unwrap();
4260
4261        // Create a channel that includes all the users.
4262        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4263        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4264            .await
4265            .unwrap();
4266        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4267            .await
4268            .unwrap();
4269        db.create_channel_message(
4270            channel_id,
4271            client_b.current_user_id(&cx_b),
4272            "hello A, it's B.",
4273            OffsetDateTime::now_utc(),
4274            2,
4275        )
4276        .await
4277        .unwrap();
4278
4279        let channels_a = cx_a
4280            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4281        channels_a
4282            .condition(cx_a, |list, _| list.available_channels().is_some())
4283            .await;
4284
4285        channels_a.read_with(cx_a, |list, _| {
4286            assert_eq!(
4287                list.available_channels().unwrap(),
4288                &[ChannelDetails {
4289                    id: channel_id.to_proto(),
4290                    name: "test-channel".to_string()
4291                }]
4292            )
4293        });
4294        let channel_a = channels_a.update(cx_a, |this, cx| {
4295            this.get_channel(channel_id.to_proto(), cx).unwrap()
4296        });
4297        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4298        channel_a
4299            .condition(&cx_a, |channel, _| {
4300                channel_messages(channel)
4301                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4302            })
4303            .await;
4304
4305        let channels_b = cx_b
4306            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4307        channels_b
4308            .condition(cx_b, |list, _| list.available_channels().is_some())
4309            .await;
4310        channels_b.read_with(cx_b, |list, _| {
4311            assert_eq!(
4312                list.available_channels().unwrap(),
4313                &[ChannelDetails {
4314                    id: channel_id.to_proto(),
4315                    name: "test-channel".to_string()
4316                }]
4317            )
4318        });
4319
4320        let channel_b = channels_b.update(cx_b, |this, cx| {
4321            this.get_channel(channel_id.to_proto(), cx).unwrap()
4322        });
4323        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4324        channel_b
4325            .condition(&cx_b, |channel, _| {
4326                channel_messages(channel)
4327                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4328            })
4329            .await;
4330
4331        // Disconnect client B, ensuring we can still access its cached channel data.
4332        server.forbid_connections();
4333        server.disconnect_client(client_b.current_user_id(&cx_b));
4334        cx_b.foreground().advance_clock(Duration::from_secs(3));
4335        while !matches!(
4336            status_b.next().await,
4337            Some(client::Status::ReconnectionError { .. })
4338        ) {}
4339
4340        channels_b.read_with(cx_b, |channels, _| {
4341            assert_eq!(
4342                channels.available_channels().unwrap(),
4343                [ChannelDetails {
4344                    id: channel_id.to_proto(),
4345                    name: "test-channel".to_string()
4346                }]
4347            )
4348        });
4349        channel_b.read_with(cx_b, |channel, _| {
4350            assert_eq!(
4351                channel_messages(channel),
4352                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4353            )
4354        });
4355
4356        // Send a message from client B while it is disconnected.
4357        channel_b
4358            .update(cx_b, |channel, cx| {
4359                let task = channel
4360                    .send_message("can you see this?".to_string(), cx)
4361                    .unwrap();
4362                assert_eq!(
4363                    channel_messages(channel),
4364                    &[
4365                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4366                        ("user_b".to_string(), "can you see this?".to_string(), true)
4367                    ]
4368                );
4369                task
4370            })
4371            .await
4372            .unwrap_err();
4373
4374        // Send a message from client A while B is disconnected.
4375        channel_a
4376            .update(cx_a, |channel, cx| {
4377                channel
4378                    .send_message("oh, hi B.".to_string(), cx)
4379                    .unwrap()
4380                    .detach();
4381                let task = channel.send_message("sup".to_string(), cx).unwrap();
4382                assert_eq!(
4383                    channel_messages(channel),
4384                    &[
4385                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4386                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4387                        ("user_a".to_string(), "sup".to_string(), true)
4388                    ]
4389                );
4390                task
4391            })
4392            .await
4393            .unwrap();
4394
4395        // Give client B a chance to reconnect.
4396        server.allow_connections();
4397        cx_b.foreground().advance_clock(Duration::from_secs(10));
4398
4399        // Verify that B sees the new messages upon reconnection, as well as the message client B
4400        // sent while offline.
4401        channel_b
4402            .condition(&cx_b, |channel, _| {
4403                channel_messages(channel)
4404                    == [
4405                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4406                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4407                        ("user_a".to_string(), "sup".to_string(), false),
4408                        ("user_b".to_string(), "can you see this?".to_string(), false),
4409                    ]
4410            })
4411            .await;
4412
4413        // Ensure client A and B can communicate normally after reconnection.
4414        channel_a
4415            .update(cx_a, |channel, cx| {
4416                channel.send_message("you online?".to_string(), cx).unwrap()
4417            })
4418            .await
4419            .unwrap();
4420        channel_b
4421            .condition(&cx_b, |channel, _| {
4422                channel_messages(channel)
4423                    == [
4424                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4425                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4426                        ("user_a".to_string(), "sup".to_string(), false),
4427                        ("user_b".to_string(), "can you see this?".to_string(), false),
4428                        ("user_a".to_string(), "you online?".to_string(), false),
4429                    ]
4430            })
4431            .await;
4432
4433        channel_b
4434            .update(cx_b, |channel, cx| {
4435                channel.send_message("yep".to_string(), cx).unwrap()
4436            })
4437            .await
4438            .unwrap();
4439        channel_a
4440            .condition(&cx_a, |channel, _| {
4441                channel_messages(channel)
4442                    == [
4443                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4444                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4445                        ("user_a".to_string(), "sup".to_string(), false),
4446                        ("user_b".to_string(), "can you see this?".to_string(), false),
4447                        ("user_a".to_string(), "you online?".to_string(), false),
4448                        ("user_b".to_string(), "yep".to_string(), false),
4449                    ]
4450            })
4451            .await;
4452    }
4453
4454    #[gpui::test(iterations = 10)]
4455    async fn test_contacts(
4456        cx_a: &mut TestAppContext,
4457        cx_b: &mut TestAppContext,
4458        cx_c: &mut TestAppContext,
4459    ) {
4460        cx_a.foreground().forbid_parking();
4461        let lang_registry = Arc::new(LanguageRegistry::test());
4462        let fs = FakeFs::new(cx_a.background());
4463
4464        // Connect to a server as 3 clients.
4465        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4466        let client_a = server.create_client(cx_a, "user_a").await;
4467        let client_b = server.create_client(cx_b, "user_b").await;
4468        let client_c = server.create_client(cx_c, "user_c").await;
4469
4470        // Share a worktree as client A.
4471        fs.insert_tree(
4472            "/a",
4473            json!({
4474                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4475            }),
4476        )
4477        .await;
4478
4479        let project_a = cx_a.update(|cx| {
4480            Project::local(
4481                client_a.clone(),
4482                client_a.user_store.clone(),
4483                lang_registry.clone(),
4484                fs.clone(),
4485                cx,
4486            )
4487        });
4488        let (worktree_a, _) = project_a
4489            .update(cx_a, |p, cx| {
4490                p.find_or_create_local_worktree("/a", true, cx)
4491            })
4492            .await
4493            .unwrap();
4494        worktree_a
4495            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4496            .await;
4497
4498        client_a
4499            .user_store
4500            .condition(&cx_a, |user_store, _| {
4501                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4502            })
4503            .await;
4504        client_b
4505            .user_store
4506            .condition(&cx_b, |user_store, _| {
4507                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4508            })
4509            .await;
4510        client_c
4511            .user_store
4512            .condition(&cx_c, |user_store, _| {
4513                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4514            })
4515            .await;
4516
4517        let project_id = project_a
4518            .update(cx_a, |project, _| project.next_remote_id())
4519            .await;
4520        project_a
4521            .update(cx_a, |project, cx| project.share(cx))
4522            .await
4523            .unwrap();
4524
4525        let _project_b = Project::remote(
4526            project_id,
4527            client_b.clone(),
4528            client_b.user_store.clone(),
4529            lang_registry.clone(),
4530            fs.clone(),
4531            &mut cx_b.to_async(),
4532        )
4533        .await
4534        .unwrap();
4535
4536        client_a
4537            .user_store
4538            .condition(&cx_a, |user_store, _| {
4539                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4540            })
4541            .await;
4542        client_b
4543            .user_store
4544            .condition(&cx_b, |user_store, _| {
4545                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4546            })
4547            .await;
4548        client_c
4549            .user_store
4550            .condition(&cx_c, |user_store, _| {
4551                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4552            })
4553            .await;
4554
4555        project_a
4556            .condition(&cx_a, |project, _| {
4557                project.collaborators().contains_key(&client_b.peer_id)
4558            })
4559            .await;
4560
4561        cx_a.update(move |_| drop(project_a));
4562        client_a
4563            .user_store
4564            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4565            .await;
4566        client_b
4567            .user_store
4568            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4569            .await;
4570        client_c
4571            .user_store
4572            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4573            .await;
4574
4575        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4576            user_store
4577                .contacts()
4578                .iter()
4579                .map(|contact| {
4580                    let worktrees = contact
4581                        .projects
4582                        .iter()
4583                        .map(|p| {
4584                            (
4585                                p.worktree_root_names[0].as_str(),
4586                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4587                            )
4588                        })
4589                        .collect();
4590                    (contact.user.github_login.as_str(), worktrees)
4591                })
4592                .collect()
4593        }
4594    }
4595
4596    #[gpui::test(iterations = 10)]
4597    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4598        cx_a.foreground().forbid_parking();
4599        let fs = FakeFs::new(cx_a.background());
4600
4601        // 2 clients connect to a server.
4602        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4603        let mut client_a = server.create_client(cx_a, "user_a").await;
4604        let mut client_b = server.create_client(cx_b, "user_b").await;
4605        cx_a.update(editor::init);
4606        cx_b.update(editor::init);
4607
4608        // Client A shares a project.
4609        fs.insert_tree(
4610            "/a",
4611            json!({
4612                ".zed.toml": r#"collaborators = ["user_b"]"#,
4613                "1.txt": "one",
4614                "2.txt": "two",
4615                "3.txt": "three",
4616            }),
4617        )
4618        .await;
4619        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4620        project_a
4621            .update(cx_a, |project, cx| project.share(cx))
4622            .await
4623            .unwrap();
4624
4625        // Client B joins the project.
4626        let project_b = client_b
4627            .build_remote_project(
4628                project_a
4629                    .read_with(cx_a, |project, _| project.remote_id())
4630                    .unwrap(),
4631                cx_b,
4632            )
4633            .await;
4634
4635        // Client A opens some editors.
4636        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4637        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4638        let editor_a1 = workspace_a
4639            .update(cx_a, |workspace, cx| {
4640                workspace.open_path((worktree_id, "1.txt"), cx)
4641            })
4642            .await
4643            .unwrap()
4644            .downcast::<Editor>()
4645            .unwrap();
4646        let editor_a2 = workspace_a
4647            .update(cx_a, |workspace, cx| {
4648                workspace.open_path((worktree_id, "2.txt"), cx)
4649            })
4650            .await
4651            .unwrap()
4652            .downcast::<Editor>()
4653            .unwrap();
4654
4655        // Client B opens an editor.
4656        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4657        let editor_b1 = workspace_b
4658            .update(cx_b, |workspace, cx| {
4659                workspace.open_path((worktree_id, "1.txt"), cx)
4660            })
4661            .await
4662            .unwrap()
4663            .downcast::<Editor>()
4664            .unwrap();
4665
4666        let client_a_id = project_b.read_with(cx_b, |project, _| {
4667            project.collaborators().values().next().unwrap().peer_id
4668        });
4669        let client_b_id = project_a.read_with(cx_a, |project, _| {
4670            project.collaborators().values().next().unwrap().peer_id
4671        });
4672
4673        // When client B starts following client A, all visible view states are replicated to client B.
4674        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4675        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4676        workspace_b
4677            .update(cx_b, |workspace, cx| {
4678                workspace
4679                    .toggle_follow(&ToggleFollow(client_a_id), cx)
4680                    .unwrap()
4681            })
4682            .await
4683            .unwrap();
4684        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4685            workspace
4686                .active_item(cx)
4687                .unwrap()
4688                .downcast::<Editor>()
4689                .unwrap()
4690        });
4691        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4692        assert_eq!(
4693            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4694            Some((worktree_id, "2.txt").into())
4695        );
4696        assert_eq!(
4697            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4698            vec![2..3]
4699        );
4700        assert_eq!(
4701            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4702            vec![0..1]
4703        );
4704
4705        // When client A activates a different editor, client B does so as well.
4706        workspace_a.update(cx_a, |workspace, cx| {
4707            workspace.activate_item(&editor_a1, cx)
4708        });
4709        workspace_b
4710            .condition(cx_b, |workspace, cx| {
4711                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4712            })
4713            .await;
4714
4715        // When client A navigates back and forth, client B does so as well.
4716        workspace_a
4717            .update(cx_a, |workspace, cx| {
4718                workspace::Pane::go_back(workspace, None, cx)
4719            })
4720            .await;
4721        workspace_b
4722            .condition(cx_b, |workspace, cx| {
4723                workspace.active_item(cx).unwrap().id() == editor_b2.id()
4724            })
4725            .await;
4726
4727        workspace_a
4728            .update(cx_a, |workspace, cx| {
4729                workspace::Pane::go_forward(workspace, None, cx)
4730            })
4731            .await;
4732        workspace_b
4733            .condition(cx_b, |workspace, cx| {
4734                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4735            })
4736            .await;
4737
4738        // Changes to client A's editor are reflected on client B.
4739        editor_a1.update(cx_a, |editor, cx| {
4740            editor.select_ranges([1..1, 2..2], None, cx);
4741        });
4742        editor_b1
4743            .condition(cx_b, |editor, cx| {
4744                editor.selected_ranges(cx) == vec![1..1, 2..2]
4745            })
4746            .await;
4747
4748        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
4749        editor_b1
4750            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
4751            .await;
4752
4753        editor_a1.update(cx_a, |editor, cx| {
4754            editor.select_ranges([3..3], None, cx);
4755            editor.set_scroll_position(vec2f(0., 100.), cx);
4756        });
4757        editor_b1
4758            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
4759            .await;
4760
4761        // After unfollowing, client B stops receiving updates from client A.
4762        workspace_b.update(cx_b, |workspace, cx| {
4763            workspace.unfollow(&workspace.active_pane().clone(), cx)
4764        });
4765        workspace_a.update(cx_a, |workspace, cx| {
4766            workspace.activate_item(&editor_a2, cx)
4767        });
4768        cx_a.foreground().run_until_parked();
4769        assert_eq!(
4770            workspace_b.read_with(cx_b, |workspace, cx| workspace
4771                .active_item(cx)
4772                .unwrap()
4773                .id()),
4774            editor_b1.id()
4775        );
4776
4777        // Client A starts following client B.
4778        workspace_a
4779            .update(cx_a, |workspace, cx| {
4780                workspace
4781                    .toggle_follow(&ToggleFollow(client_b_id), cx)
4782                    .unwrap()
4783            })
4784            .await
4785            .unwrap();
4786        assert_eq!(
4787            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4788            Some(client_b_id)
4789        );
4790        assert_eq!(
4791            workspace_a.read_with(cx_a, |workspace, cx| workspace
4792                .active_item(cx)
4793                .unwrap()
4794                .id()),
4795            editor_a1.id()
4796        );
4797
4798        // Following interrupts when client B disconnects.
4799        client_b.disconnect(&cx_b.to_async()).unwrap();
4800        cx_a.foreground().run_until_parked();
4801        assert_eq!(
4802            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4803            None
4804        );
4805    }
4806
4807    #[gpui::test(iterations = 10)]
4808    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4809        cx_a.foreground().forbid_parking();
4810        let fs = FakeFs::new(cx_a.background());
4811
4812        // 2 clients connect to a server.
4813        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4814        let mut client_a = server.create_client(cx_a, "user_a").await;
4815        let mut client_b = server.create_client(cx_b, "user_b").await;
4816        cx_a.update(editor::init);
4817        cx_b.update(editor::init);
4818
4819        // Client A shares a project.
4820        fs.insert_tree(
4821            "/a",
4822            json!({
4823                ".zed.toml": r#"collaborators = ["user_b"]"#,
4824                "1.txt": "one",
4825                "2.txt": "two",
4826                "3.txt": "three",
4827                "4.txt": "four",
4828            }),
4829        )
4830        .await;
4831        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4832        project_a
4833            .update(cx_a, |project, cx| project.share(cx))
4834            .await
4835            .unwrap();
4836
4837        // Client B joins the project.
4838        let project_b = client_b
4839            .build_remote_project(
4840                project_a
4841                    .read_with(cx_a, |project, _| project.remote_id())
4842                    .unwrap(),
4843                cx_b,
4844            )
4845            .await;
4846
4847        // Client A opens some editors.
4848        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4849        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4850        let _editor_a1 = workspace_a
4851            .update(cx_a, |workspace, cx| {
4852                workspace.open_path((worktree_id, "1.txt"), cx)
4853            })
4854            .await
4855            .unwrap()
4856            .downcast::<Editor>()
4857            .unwrap();
4858
4859        // Client B opens an editor.
4860        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4861        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
4862        let _editor_b1 = workspace_b
4863            .update(cx_b, |workspace, cx| {
4864                workspace.open_path((worktree_id, "2.txt"), cx)
4865            })
4866            .await
4867            .unwrap()
4868            .downcast::<Editor>()
4869            .unwrap();
4870
4871        // Clients A and B follow each other in split panes
4872        workspace_a
4873            .update(cx_a, |workspace, cx| {
4874                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4875                assert_ne!(*workspace.active_pane(), pane_a1);
4876                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
4877                workspace
4878                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4879                    .unwrap()
4880            })
4881            .await
4882            .unwrap();
4883        workspace_b
4884            .update(cx_b, |workspace, cx| {
4885                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4886                assert_ne!(*workspace.active_pane(), pane_b1);
4887                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
4888                workspace
4889                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4890                    .unwrap()
4891            })
4892            .await
4893            .unwrap();
4894
4895        workspace_a
4896            .update(cx_a, |workspace, cx| {
4897                workspace.activate_next_pane(cx);
4898                assert_eq!(*workspace.active_pane(), pane_a1);
4899                workspace.open_path((worktree_id, "3.txt"), cx)
4900            })
4901            .await
4902            .unwrap();
4903        workspace_b
4904            .update(cx_b, |workspace, cx| {
4905                workspace.activate_next_pane(cx);
4906                assert_eq!(*workspace.active_pane(), pane_b1);
4907                workspace.open_path((worktree_id, "4.txt"), cx)
4908            })
4909            .await
4910            .unwrap();
4911        cx_a.foreground().run_until_parked();
4912
4913        // Ensure leader updates don't change the active pane of followers
4914        workspace_a.read_with(cx_a, |workspace, _| {
4915            assert_eq!(*workspace.active_pane(), pane_a1);
4916        });
4917        workspace_b.read_with(cx_b, |workspace, _| {
4918            assert_eq!(*workspace.active_pane(), pane_b1);
4919        });
4920
4921        // Ensure peers following each other doesn't cause an infinite loop.
4922        assert_eq!(
4923            workspace_a.read_with(cx_a, |workspace, cx| workspace
4924                .active_item(cx)
4925                .unwrap()
4926                .project_path(cx)),
4927            Some((worktree_id, "3.txt").into())
4928        );
4929        workspace_a.update(cx_a, |workspace, cx| {
4930            assert_eq!(
4931                workspace.active_item(cx).unwrap().project_path(cx),
4932                Some((worktree_id, "3.txt").into())
4933            );
4934            workspace.activate_next_pane(cx);
4935            assert_eq!(
4936                workspace.active_item(cx).unwrap().project_path(cx),
4937                Some((worktree_id, "4.txt").into())
4938            );
4939        });
4940        workspace_b.update(cx_b, |workspace, cx| {
4941            assert_eq!(
4942                workspace.active_item(cx).unwrap().project_path(cx),
4943                Some((worktree_id, "4.txt").into())
4944            );
4945            workspace.activate_next_pane(cx);
4946            assert_eq!(
4947                workspace.active_item(cx).unwrap().project_path(cx),
4948                Some((worktree_id, "3.txt").into())
4949            );
4950        });
4951    }
4952
4953    #[gpui::test(iterations = 10)]
4954    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4955        cx_a.foreground().forbid_parking();
4956        let fs = FakeFs::new(cx_a.background());
4957
4958        // 2 clients connect to a server.
4959        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4960        let mut client_a = server.create_client(cx_a, "user_a").await;
4961        let mut client_b = server.create_client(cx_b, "user_b").await;
4962        cx_a.update(editor::init);
4963        cx_b.update(editor::init);
4964
4965        // Client A shares a project.
4966        fs.insert_tree(
4967            "/a",
4968            json!({
4969                ".zed.toml": r#"collaborators = ["user_b"]"#,
4970                "1.txt": "one",
4971                "2.txt": "two",
4972                "3.txt": "three",
4973            }),
4974        )
4975        .await;
4976        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4977        project_a
4978            .update(cx_a, |project, cx| project.share(cx))
4979            .await
4980            .unwrap();
4981
4982        // Client B joins the project.
4983        let project_b = client_b
4984            .build_remote_project(
4985                project_a
4986                    .read_with(cx_a, |project, _| project.remote_id())
4987                    .unwrap(),
4988                cx_b,
4989            )
4990            .await;
4991
4992        // Client A opens some editors.
4993        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4994        let _editor_a1 = workspace_a
4995            .update(cx_a, |workspace, cx| {
4996                workspace.open_path((worktree_id, "1.txt"), cx)
4997            })
4998            .await
4999            .unwrap()
5000            .downcast::<Editor>()
5001            .unwrap();
5002
5003        // Client B starts following client A.
5004        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5005        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5006        let leader_id = project_b.read_with(cx_b, |project, _| {
5007            project.collaborators().values().next().unwrap().peer_id
5008        });
5009        workspace_b
5010            .update(cx_b, |workspace, cx| {
5011                workspace
5012                    .toggle_follow(&ToggleFollow(leader_id), cx)
5013                    .unwrap()
5014            })
5015            .await
5016            .unwrap();
5017        assert_eq!(
5018            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5019            Some(leader_id)
5020        );
5021        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5022            workspace
5023                .active_item(cx)
5024                .unwrap()
5025                .downcast::<Editor>()
5026                .unwrap()
5027        });
5028
5029        // When client B moves, it automatically stops following client A.
5030        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5031        assert_eq!(
5032            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5033            None
5034        );
5035
5036        workspace_b
5037            .update(cx_b, |workspace, cx| {
5038                workspace
5039                    .toggle_follow(&ToggleFollow(leader_id), cx)
5040                    .unwrap()
5041            })
5042            .await
5043            .unwrap();
5044        assert_eq!(
5045            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5046            Some(leader_id)
5047        );
5048
5049        // When client B edits, it automatically stops following client A.
5050        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5051        assert_eq!(
5052            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5053            None
5054        );
5055
5056        workspace_b
5057            .update(cx_b, |workspace, cx| {
5058                workspace
5059                    .toggle_follow(&ToggleFollow(leader_id), cx)
5060                    .unwrap()
5061            })
5062            .await
5063            .unwrap();
5064        assert_eq!(
5065            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5066            Some(leader_id)
5067        );
5068
5069        // When client B scrolls, it automatically stops following client A.
5070        editor_b2.update(cx_b, |editor, cx| {
5071            editor.set_scroll_position(vec2f(0., 3.), cx)
5072        });
5073        assert_eq!(
5074            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5075            None
5076        );
5077
5078        workspace_b
5079            .update(cx_b, |workspace, cx| {
5080                workspace
5081                    .toggle_follow(&ToggleFollow(leader_id), cx)
5082                    .unwrap()
5083            })
5084            .await
5085            .unwrap();
5086        assert_eq!(
5087            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5088            Some(leader_id)
5089        );
5090
5091        // When client B activates a different pane, it continues following client A in the original pane.
5092        workspace_b.update(cx_b, |workspace, cx| {
5093            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5094        });
5095        assert_eq!(
5096            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5097            Some(leader_id)
5098        );
5099
5100        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5101        assert_eq!(
5102            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5103            Some(leader_id)
5104        );
5105
5106        // When client B activates a different item in the original pane, it automatically stops following client A.
5107        workspace_b
5108            .update(cx_b, |workspace, cx| {
5109                workspace.open_path((worktree_id, "2.txt"), cx)
5110            })
5111            .await
5112            .unwrap();
5113        assert_eq!(
5114            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5115            None
5116        );
5117    }
5118
5119    #[gpui::test(iterations = 100)]
5120    async fn test_random_collaboration(
5121        cx: &mut TestAppContext,
5122        deterministic: Arc<Deterministic>,
5123        rng: StdRng,
5124    ) {
5125        cx.foreground().forbid_parking();
5126        let max_peers = env::var("MAX_PEERS")
5127            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5128            .unwrap_or(5);
5129        assert!(max_peers <= 5);
5130
5131        let max_operations = env::var("OPERATIONS")
5132            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5133            .unwrap_or(10);
5134
5135        let rng = Arc::new(Mutex::new(rng));
5136
5137        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5138        let host_language_registry = Arc::new(LanguageRegistry::test());
5139
5140        let fs = FakeFs::new(cx.background());
5141        fs.insert_tree(
5142            "/_collab",
5143            json!({
5144                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5145            }),
5146        )
5147        .await;
5148
5149        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5150        let mut clients = Vec::new();
5151        let mut user_ids = Vec::new();
5152        let mut op_start_signals = Vec::new();
5153        let files = Arc::new(Mutex::new(Vec::new()));
5154
5155        let mut next_entity_id = 100000;
5156        let mut host_cx = TestAppContext::new(
5157            cx.foreground_platform(),
5158            cx.platform(),
5159            deterministic.build_foreground(next_entity_id),
5160            deterministic.build_background(),
5161            cx.font_cache(),
5162            cx.leak_detector(),
5163            next_entity_id,
5164        );
5165        let host = server.create_client(&mut host_cx, "host").await;
5166        let host_project = host_cx.update(|cx| {
5167            Project::local(
5168                host.client.clone(),
5169                host.user_store.clone(),
5170                host_language_registry.clone(),
5171                fs.clone(),
5172                cx,
5173            )
5174        });
5175        let host_project_id = host_project
5176            .update(&mut host_cx, |p, _| p.next_remote_id())
5177            .await;
5178
5179        let (collab_worktree, _) = host_project
5180            .update(&mut host_cx, |project, cx| {
5181                project.find_or_create_local_worktree("/_collab", true, cx)
5182            })
5183            .await
5184            .unwrap();
5185        collab_worktree
5186            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5187            .await;
5188        host_project
5189            .update(&mut host_cx, |project, cx| project.share(cx))
5190            .await
5191            .unwrap();
5192
5193        // Set up fake language servers.
5194        let mut language = Language::new(
5195            LanguageConfig {
5196                name: "Rust".into(),
5197                path_suffixes: vec!["rs".to_string()],
5198                ..Default::default()
5199            },
5200            None,
5201        );
5202        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5203            name: "the-fake-language-server",
5204            capabilities: lsp::LanguageServer::full_capabilities(),
5205            initializer: Some(Box::new({
5206                let rng = rng.clone();
5207                let files = files.clone();
5208                let project = host_project.downgrade();
5209                move |fake_server: &mut FakeLanguageServer| {
5210                    fake_server.handle_request::<lsp::request::Completion, _, _>(
5211                        |_, _| async move {
5212                            Ok(Some(lsp::CompletionResponse::Array(vec![
5213                                lsp::CompletionItem {
5214                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5215                                        range: lsp::Range::new(
5216                                            lsp::Position::new(0, 0),
5217                                            lsp::Position::new(0, 0),
5218                                        ),
5219                                        new_text: "the-new-text".to_string(),
5220                                    })),
5221                                    ..Default::default()
5222                                },
5223                            ])))
5224                        },
5225                    );
5226
5227                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5228                        |_, _| async move {
5229                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5230                                lsp::CodeAction {
5231                                    title: "the-code-action".to_string(),
5232                                    ..Default::default()
5233                                },
5234                            )]))
5235                        },
5236                    );
5237
5238                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5239                        |params, _| async move {
5240                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5241                                params.position,
5242                                params.position,
5243                            ))))
5244                        },
5245                    );
5246
5247                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5248                        let files = files.clone();
5249                        let rng = rng.clone();
5250                        move |_, _| {
5251                            let files = files.clone();
5252                            let rng = rng.clone();
5253                            async move {
5254                                let files = files.lock();
5255                                let mut rng = rng.lock();
5256                                let count = rng.gen_range::<usize, _>(1..3);
5257                                let files = (0..count)
5258                                    .map(|_| files.choose(&mut *rng).unwrap())
5259                                    .collect::<Vec<_>>();
5260                                log::info!("LSP: Returning definitions in files {:?}", &files);
5261                                Ok(Some(lsp::GotoDefinitionResponse::Array(
5262                                    files
5263                                        .into_iter()
5264                                        .map(|file| lsp::Location {
5265                                            uri: lsp::Url::from_file_path(file).unwrap(),
5266                                            range: Default::default(),
5267                                        })
5268                                        .collect(),
5269                                )))
5270                            }
5271                        }
5272                    });
5273
5274                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5275                        let rng = rng.clone();
5276                        let project = project.clone();
5277                        move |params, mut cx| {
5278                            let highlights = if let Some(project) = project.upgrade(&cx) {
5279                                project.update(&mut cx, |project, cx| {
5280                                    let path = params
5281                                        .text_document_position_params
5282                                        .text_document
5283                                        .uri
5284                                        .to_file_path()
5285                                        .unwrap();
5286                                    let (worktree, relative_path) =
5287                                        project.find_local_worktree(&path, cx)?;
5288                                    let project_path =
5289                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
5290                                    let buffer =
5291                                        project.get_open_buffer(&project_path, cx)?.read(cx);
5292
5293                                    let mut highlights = Vec::new();
5294                                    let highlight_count = rng.lock().gen_range(1..=5);
5295                                    let mut prev_end = 0;
5296                                    for _ in 0..highlight_count {
5297                                        let range =
5298                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
5299
5300                                        highlights.push(lsp::DocumentHighlight {
5301                                            range: range_to_lsp(range.to_point_utf16(buffer)),
5302                                            kind: Some(lsp::DocumentHighlightKind::READ),
5303                                        });
5304                                        prev_end = range.end;
5305                                    }
5306                                    Some(highlights)
5307                                })
5308                            } else {
5309                                None
5310                            };
5311                            async move { Ok(highlights) }
5312                        }
5313                    });
5314                }
5315            })),
5316            ..Default::default()
5317        });
5318        host_language_registry.add(Arc::new(language));
5319
5320        let op_start_signal = futures::channel::mpsc::unbounded();
5321        user_ids.push(host.current_user_id(&host_cx));
5322        op_start_signals.push(op_start_signal.0);
5323        clients.push(host_cx.foreground().spawn(host.simulate_host(
5324            host_project,
5325            files,
5326            op_start_signal.1,
5327            rng.clone(),
5328            host_cx,
5329        )));
5330
5331        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5332            rng.lock().gen_range(0..max_operations)
5333        } else {
5334            max_operations
5335        };
5336        let mut available_guests = vec![
5337            "guest-1".to_string(),
5338            "guest-2".to_string(),
5339            "guest-3".to_string(),
5340            "guest-4".to_string(),
5341        ];
5342        let mut operations = 0;
5343        while operations < max_operations {
5344            if operations == disconnect_host_at {
5345                server.disconnect_client(user_ids[0]);
5346                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5347                drop(op_start_signals);
5348                let mut clients = futures::future::join_all(clients).await;
5349                cx.foreground().run_until_parked();
5350
5351                let (host, mut host_cx, host_err) = clients.remove(0);
5352                if let Some(host_err) = host_err {
5353                    log::error!("host error - {}", host_err);
5354                }
5355                host.project
5356                    .as_ref()
5357                    .unwrap()
5358                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5359                for (guest, mut guest_cx, guest_err) in clients {
5360                    if let Some(guest_err) = guest_err {
5361                        log::error!("{} error - {}", guest.username, guest_err);
5362                    }
5363                    let contacts = server
5364                        .store
5365                        .read()
5366                        .await
5367                        .contacts_for_user(guest.current_user_id(&guest_cx));
5368                    assert!(!contacts
5369                        .iter()
5370                        .flat_map(|contact| &contact.projects)
5371                        .any(|project| project.id == host_project_id));
5372                    guest
5373                        .project
5374                        .as_ref()
5375                        .unwrap()
5376                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5377                    guest_cx.update(|_| drop(guest));
5378                }
5379                host_cx.update(|_| drop(host));
5380
5381                return;
5382            }
5383
5384            let distribution = rng.lock().gen_range(0..100);
5385            match distribution {
5386                0..=19 if !available_guests.is_empty() => {
5387                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
5388                    let guest_username = available_guests.remove(guest_ix);
5389                    log::info!("Adding new connection for {}", guest_username);
5390                    next_entity_id += 100000;
5391                    let mut guest_cx = TestAppContext::new(
5392                        cx.foreground_platform(),
5393                        cx.platform(),
5394                        deterministic.build_foreground(next_entity_id),
5395                        deterministic.build_background(),
5396                        cx.font_cache(),
5397                        cx.leak_detector(),
5398                        next_entity_id,
5399                    );
5400                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
5401                    let guest_project = Project::remote(
5402                        host_project_id,
5403                        guest.client.clone(),
5404                        guest.user_store.clone(),
5405                        guest_lang_registry.clone(),
5406                        FakeFs::new(cx.background()),
5407                        &mut guest_cx.to_async(),
5408                    )
5409                    .await
5410                    .unwrap();
5411                    let op_start_signal = futures::channel::mpsc::unbounded();
5412                    user_ids.push(guest.current_user_id(&guest_cx));
5413                    op_start_signals.push(op_start_signal.0);
5414                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5415                        guest_username.clone(),
5416                        guest_project,
5417                        op_start_signal.1,
5418                        rng.clone(),
5419                        guest_cx,
5420                    )));
5421
5422                    log::info!("Added connection for {}", guest_username);
5423                    operations += 1;
5424                }
5425                20..=30 if clients.len() > 1 => {
5426                    log::info!("Removing guest");
5427                    let guest_ix = rng.lock().gen_range(1..clients.len());
5428                    let removed_guest_id = user_ids.remove(guest_ix);
5429                    let guest = clients.remove(guest_ix);
5430                    op_start_signals.remove(guest_ix);
5431                    server.disconnect_client(removed_guest_id);
5432                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5433                    let (guest, mut guest_cx, guest_err) = guest.await;
5434                    if let Some(guest_err) = guest_err {
5435                        log::error!("{} error - {}", guest.username, guest_err);
5436                    }
5437                    guest
5438                        .project
5439                        .as_ref()
5440                        .unwrap()
5441                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5442                    for user_id in &user_ids {
5443                        for contact in server.store.read().await.contacts_for_user(*user_id) {
5444                            assert_ne!(
5445                                contact.user_id, removed_guest_id.0 as u64,
5446                                "removed guest is still a contact of another peer"
5447                            );
5448                            for project in contact.projects {
5449                                for project_guest_id in project.guests {
5450                                    assert_ne!(
5451                                        project_guest_id, removed_guest_id.0 as u64,
5452                                        "removed guest appears as still participating on a project"
5453                                    );
5454                                }
5455                            }
5456                        }
5457                    }
5458
5459                    log::info!("{} removed", guest.username);
5460                    available_guests.push(guest.username.clone());
5461                    guest_cx.update(|_| drop(guest));
5462
5463                    operations += 1;
5464                }
5465                _ => {
5466                    while operations < max_operations && rng.lock().gen_bool(0.7) {
5467                        op_start_signals
5468                            .choose(&mut *rng.lock())
5469                            .unwrap()
5470                            .unbounded_send(())
5471                            .unwrap();
5472                        operations += 1;
5473                    }
5474
5475                    if rng.lock().gen_bool(0.8) {
5476                        cx.foreground().run_until_parked();
5477                    }
5478                }
5479            }
5480        }
5481
5482        drop(op_start_signals);
5483        let mut clients = futures::future::join_all(clients).await;
5484        cx.foreground().run_until_parked();
5485
5486        let (host_client, mut host_cx, host_err) = clients.remove(0);
5487        if let Some(host_err) = host_err {
5488            panic!("host error - {}", host_err);
5489        }
5490        let host_project = host_client.project.as_ref().unwrap();
5491        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5492            project
5493                .worktrees(cx)
5494                .map(|worktree| {
5495                    let snapshot = worktree.read(cx).snapshot();
5496                    (snapshot.id(), snapshot)
5497                })
5498                .collect::<BTreeMap<_, _>>()
5499        });
5500
5501        host_client
5502            .project
5503            .as_ref()
5504            .unwrap()
5505            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5506
5507        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5508            if let Some(guest_err) = guest_err {
5509                panic!("{} error - {}", guest_client.username, guest_err);
5510            }
5511            let worktree_snapshots =
5512                guest_client
5513                    .project
5514                    .as_ref()
5515                    .unwrap()
5516                    .read_with(&guest_cx, |project, cx| {
5517                        project
5518                            .worktrees(cx)
5519                            .map(|worktree| {
5520                                let worktree = worktree.read(cx);
5521                                (worktree.id(), worktree.snapshot())
5522                            })
5523                            .collect::<BTreeMap<_, _>>()
5524                    });
5525
5526            assert_eq!(
5527                worktree_snapshots.keys().collect::<Vec<_>>(),
5528                host_worktree_snapshots.keys().collect::<Vec<_>>(),
5529                "{} has different worktrees than the host",
5530                guest_client.username
5531            );
5532            for (id, host_snapshot) in &host_worktree_snapshots {
5533                let guest_snapshot = &worktree_snapshots[id];
5534                assert_eq!(
5535                    guest_snapshot.root_name(),
5536                    host_snapshot.root_name(),
5537                    "{} has different root name than the host for worktree {}",
5538                    guest_client.username,
5539                    id
5540                );
5541                assert_eq!(
5542                    guest_snapshot.entries(false).collect::<Vec<_>>(),
5543                    host_snapshot.entries(false).collect::<Vec<_>>(),
5544                    "{} has different snapshot than the host for worktree {}",
5545                    guest_client.username,
5546                    id
5547                );
5548            }
5549
5550            guest_client
5551                .project
5552                .as_ref()
5553                .unwrap()
5554                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5555
5556            for guest_buffer in &guest_client.buffers {
5557                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5558                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5559                    project.buffer_for_id(buffer_id, cx).expect(&format!(
5560                        "host does not have buffer for guest:{}, peer:{}, id:{}",
5561                        guest_client.username, guest_client.peer_id, buffer_id
5562                    ))
5563                });
5564                let path = host_buffer
5565                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5566
5567                assert_eq!(
5568                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5569                    0,
5570                    "{}, buffer {}, path {:?} has deferred operations",
5571                    guest_client.username,
5572                    buffer_id,
5573                    path,
5574                );
5575                assert_eq!(
5576                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5577                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5578                    "{}, buffer {}, path {:?}, differs from the host's buffer",
5579                    guest_client.username,
5580                    buffer_id,
5581                    path
5582                );
5583            }
5584
5585            guest_cx.update(|_| drop(guest_client));
5586        }
5587
5588        host_cx.update(|_| drop(host_client));
5589    }
5590
5591    struct TestServer {
5592        peer: Arc<Peer>,
5593        app_state: Arc<AppState>,
5594        server: Arc<Server>,
5595        foreground: Rc<executor::Foreground>,
5596        notifications: mpsc::UnboundedReceiver<()>,
5597        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5598        forbid_connections: Arc<AtomicBool>,
5599        _test_db: TestDb,
5600    }
5601
5602    impl TestServer {
5603        async fn start(
5604            foreground: Rc<executor::Foreground>,
5605            background: Arc<executor::Background>,
5606        ) -> Self {
5607            let test_db = TestDb::fake(background);
5608            let app_state = Self::build_app_state(&test_db).await;
5609            let peer = Peer::new();
5610            let notifications = mpsc::unbounded();
5611            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
5612            Self {
5613                peer,
5614                app_state,
5615                server,
5616                foreground,
5617                notifications: notifications.1,
5618                connection_killers: Default::default(),
5619                forbid_connections: Default::default(),
5620                _test_db: test_db,
5621            }
5622        }
5623
5624        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
5625            cx.update(|cx| {
5626                let settings = Settings::test(cx);
5627                cx.set_global(settings);
5628            });
5629
5630            let http = FakeHttpClient::with_404_response();
5631            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
5632            let client_name = name.to_string();
5633            let mut client = Client::new(http.clone());
5634            let server = self.server.clone();
5635            let connection_killers = self.connection_killers.clone();
5636            let forbid_connections = self.forbid_connections.clone();
5637            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
5638
5639            Arc::get_mut(&mut client)
5640                .unwrap()
5641                .override_authenticate(move |cx| {
5642                    cx.spawn(|_| async move {
5643                        let access_token = "the-token".to_string();
5644                        Ok(Credentials {
5645                            user_id: user_id.0 as u64,
5646                            access_token,
5647                        })
5648                    })
5649                })
5650                .override_establish_connection(move |credentials, cx| {
5651                    assert_eq!(credentials.user_id, user_id.0 as u64);
5652                    assert_eq!(credentials.access_token, "the-token");
5653
5654                    let server = server.clone();
5655                    let connection_killers = connection_killers.clone();
5656                    let forbid_connections = forbid_connections.clone();
5657                    let client_name = client_name.clone();
5658                    let connection_id_tx = connection_id_tx.clone();
5659                    cx.spawn(move |cx| async move {
5660                        if forbid_connections.load(SeqCst) {
5661                            Err(EstablishConnectionError::other(anyhow!(
5662                                "server is forbidding connections"
5663                            )))
5664                        } else {
5665                            let (client_conn, server_conn, killed) =
5666                                Connection::in_memory(cx.background());
5667                            connection_killers.lock().insert(user_id, killed);
5668                            cx.background()
5669                                .spawn(server.handle_connection(
5670                                    server_conn,
5671                                    client_name,
5672                                    user_id,
5673                                    Some(connection_id_tx),
5674                                    cx.background(),
5675                                ))
5676                                .detach();
5677                            Ok(client_conn)
5678                        }
5679                    })
5680                });
5681
5682            client
5683                .authenticate_and_connect(false, &cx.to_async())
5684                .await
5685                .unwrap();
5686
5687            Channel::init(&client);
5688            Project::init(&client);
5689            cx.update(|cx| {
5690                workspace::init(&client, cx);
5691            });
5692
5693            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5694            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5695
5696            let client = TestClient {
5697                client,
5698                peer_id,
5699                username: name.to_string(),
5700                user_store,
5701                language_registry: Arc::new(LanguageRegistry::test()),
5702                project: Default::default(),
5703                buffers: Default::default(),
5704            };
5705            client.wait_for_current_user(cx).await;
5706            client
5707        }
5708
5709        fn disconnect_client(&self, user_id: UserId) {
5710            self.connection_killers
5711                .lock()
5712                .remove(&user_id)
5713                .unwrap()
5714                .store(true, SeqCst);
5715        }
5716
5717        fn forbid_connections(&self) {
5718            self.forbid_connections.store(true, SeqCst);
5719        }
5720
5721        fn allow_connections(&self) {
5722            self.forbid_connections.store(false, SeqCst);
5723        }
5724
5725        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
5726            let mut config = Config::default();
5727            config.session_secret = "a".repeat(32);
5728            config.database_url = test_db.url.clone();
5729            let github_client = github::AppClient::test();
5730            Arc::new(AppState {
5731                db: test_db.db().clone(),
5732                handlebars: Default::default(),
5733                auth_client: auth::build_client("", ""),
5734                repo_client: github::RepoClient::test(&github_client),
5735                github_client,
5736                config,
5737            })
5738        }
5739
5740        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
5741            self.server.store.read().await
5742        }
5743
5744        async fn condition<F>(&mut self, mut predicate: F)
5745        where
5746            F: FnMut(&Store) -> bool,
5747        {
5748            async_std::future::timeout(Duration::from_millis(500), async {
5749                while !(predicate)(&*self.server.store.read().await) {
5750                    self.foreground.start_waiting();
5751                    self.notifications.next().await;
5752                    self.foreground.finish_waiting();
5753                }
5754            })
5755            .await
5756            .expect("condition timed out");
5757        }
5758    }
5759
5760    impl Deref for TestServer {
5761        type Target = Server;
5762
5763        fn deref(&self) -> &Self::Target {
5764            &self.server
5765        }
5766    }
5767
5768    impl Drop for TestServer {
5769        fn drop(&mut self) {
5770            self.peer.reset();
5771        }
5772    }
5773
5774    struct TestClient {
5775        client: Arc<Client>,
5776        username: String,
5777        pub peer_id: PeerId,
5778        pub user_store: ModelHandle<UserStore>,
5779        language_registry: Arc<LanguageRegistry>,
5780        project: Option<ModelHandle<Project>>,
5781        buffers: HashSet<ModelHandle<language::Buffer>>,
5782    }
5783
5784    impl Deref for TestClient {
5785        type Target = Arc<Client>;
5786
5787        fn deref(&self) -> &Self::Target {
5788            &self.client
5789        }
5790    }
5791
5792    impl TestClient {
5793        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
5794            UserId::from_proto(
5795                self.user_store
5796                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
5797            )
5798        }
5799
5800        async fn wait_for_current_user(&self, cx: &TestAppContext) {
5801            let mut authed_user = self
5802                .user_store
5803                .read_with(cx, |user_store, _| user_store.watch_current_user());
5804            while authed_user.next().await.unwrap().is_none() {}
5805        }
5806
5807        async fn build_local_project(
5808            &mut self,
5809            fs: Arc<FakeFs>,
5810            root_path: impl AsRef<Path>,
5811            cx: &mut TestAppContext,
5812        ) -> (ModelHandle<Project>, WorktreeId) {
5813            let project = cx.update(|cx| {
5814                Project::local(
5815                    self.client.clone(),
5816                    self.user_store.clone(),
5817                    self.language_registry.clone(),
5818                    fs,
5819                    cx,
5820                )
5821            });
5822            self.project = Some(project.clone());
5823            let (worktree, _) = project
5824                .update(cx, |p, cx| {
5825                    p.find_or_create_local_worktree(root_path, true, cx)
5826                })
5827                .await
5828                .unwrap();
5829            worktree
5830                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
5831                .await;
5832            project
5833                .update(cx, |project, _| project.next_remote_id())
5834                .await;
5835            (project, worktree.read_with(cx, |tree, _| tree.id()))
5836        }
5837
5838        async fn build_remote_project(
5839            &mut self,
5840            project_id: u64,
5841            cx: &mut TestAppContext,
5842        ) -> ModelHandle<Project> {
5843            let project = Project::remote(
5844                project_id,
5845                self.client.clone(),
5846                self.user_store.clone(),
5847                self.language_registry.clone(),
5848                FakeFs::new(cx.background()),
5849                &mut cx.to_async(),
5850            )
5851            .await
5852            .unwrap();
5853            self.project = Some(project.clone());
5854            project
5855        }
5856
5857        fn build_workspace(
5858            &self,
5859            project: &ModelHandle<Project>,
5860            cx: &mut TestAppContext,
5861        ) -> ViewHandle<Workspace> {
5862            let (window_id, _) = cx.add_window(|_| EmptyView);
5863            cx.add_view(window_id, |cx| {
5864                let fs = project.read(cx).fs().clone();
5865                Workspace::new(
5866                    &WorkspaceParams {
5867                        fs,
5868                        project: project.clone(),
5869                        user_store: self.user_store.clone(),
5870                        languages: self.language_registry.clone(),
5871                        channel_list: cx.add_model(|cx| {
5872                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
5873                        }),
5874                        client: self.client.clone(),
5875                    },
5876                    cx,
5877                )
5878            })
5879        }
5880
5881        async fn simulate_host(
5882            mut self,
5883            project: ModelHandle<Project>,
5884            files: Arc<Mutex<Vec<PathBuf>>>,
5885            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5886            rng: Arc<Mutex<StdRng>>,
5887            mut cx: TestAppContext,
5888        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
5889            async fn simulate_host_internal(
5890                client: &mut TestClient,
5891                project: ModelHandle<Project>,
5892                files: Arc<Mutex<Vec<PathBuf>>>,
5893                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5894                rng: Arc<Mutex<StdRng>>,
5895                cx: &mut TestAppContext,
5896            ) -> anyhow::Result<()> {
5897                let fs = project.read_with(cx, |project, _| project.fs().clone());
5898
5899                while op_start_signal.next().await.is_some() {
5900                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
5901                    match distribution {
5902                        0..=20 if !files.lock().is_empty() => {
5903                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5904                            let mut path = path.as_path();
5905                            while let Some(parent_path) = path.parent() {
5906                                path = parent_path;
5907                                if rng.lock().gen() {
5908                                    break;
5909                                }
5910                            }
5911
5912                            log::info!("Host: find/create local worktree {:?}", path);
5913                            let find_or_create_worktree = project.update(cx, |project, cx| {
5914                                project.find_or_create_local_worktree(path, true, cx)
5915                            });
5916                            if rng.lock().gen() {
5917                                cx.background().spawn(find_or_create_worktree).detach();
5918                            } else {
5919                                find_or_create_worktree.await?;
5920                            }
5921                        }
5922                        10..=80 if !files.lock().is_empty() => {
5923                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
5924                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5925                                let (worktree, path) = project
5926                                    .update(cx, |project, cx| {
5927                                        project.find_or_create_local_worktree(
5928                                            file.clone(),
5929                                            true,
5930                                            cx,
5931                                        )
5932                                    })
5933                                    .await?;
5934                                let project_path =
5935                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
5936                                log::info!(
5937                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
5938                                    file,
5939                                    project_path.0,
5940                                    project_path.1
5941                                );
5942                                let buffer = project
5943                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
5944                                    .await
5945                                    .unwrap();
5946                                client.buffers.insert(buffer.clone());
5947                                buffer
5948                            } else {
5949                                client
5950                                    .buffers
5951                                    .iter()
5952                                    .choose(&mut *rng.lock())
5953                                    .unwrap()
5954                                    .clone()
5955                            };
5956
5957                            if rng.lock().gen_bool(0.1) {
5958                                cx.update(|cx| {
5959                                    log::info!(
5960                                        "Host: dropping buffer {:?}",
5961                                        buffer.read(cx).file().unwrap().full_path(cx)
5962                                    );
5963                                    client.buffers.remove(&buffer);
5964                                    drop(buffer);
5965                                });
5966                            } else {
5967                                buffer.update(cx, |buffer, cx| {
5968                                    log::info!(
5969                                        "Host: updating buffer {:?} ({})",
5970                                        buffer.file().unwrap().full_path(cx),
5971                                        buffer.remote_id()
5972                                    );
5973
5974                                    if rng.lock().gen_bool(0.7) {
5975                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
5976                                    } else {
5977                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
5978                                    }
5979                                });
5980                            }
5981                        }
5982                        _ => loop {
5983                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
5984                            let mut path = PathBuf::new();
5985                            path.push("/");
5986                            for _ in 0..path_component_count {
5987                                let letter = rng.lock().gen_range(b'a'..=b'z');
5988                                path.push(std::str::from_utf8(&[letter]).unwrap());
5989                            }
5990                            path.set_extension("rs");
5991                            let parent_path = path.parent().unwrap();
5992
5993                            log::info!("Host: creating file {:?}", path,);
5994
5995                            if fs.create_dir(&parent_path).await.is_ok()
5996                                && fs.create_file(&path, Default::default()).await.is_ok()
5997                            {
5998                                files.lock().push(path);
5999                                break;
6000                            } else {
6001                                log::info!("Host: cannot create file");
6002                            }
6003                        },
6004                    }
6005
6006                    cx.background().simulate_random_delay().await;
6007                }
6008
6009                Ok(())
6010            }
6011
6012            let result = simulate_host_internal(
6013                &mut self,
6014                project.clone(),
6015                files,
6016                op_start_signal,
6017                rng,
6018                &mut cx,
6019            )
6020            .await;
6021            log::info!("Host done");
6022            self.project = Some(project);
6023            (self, cx, result.err())
6024        }
6025
6026        pub async fn simulate_guest(
6027            mut self,
6028            guest_username: String,
6029            project: ModelHandle<Project>,
6030            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6031            rng: Arc<Mutex<StdRng>>,
6032            mut cx: TestAppContext,
6033        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6034            async fn simulate_guest_internal(
6035                client: &mut TestClient,
6036                guest_username: &str,
6037                project: ModelHandle<Project>,
6038                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6039                rng: Arc<Mutex<StdRng>>,
6040                cx: &mut TestAppContext,
6041            ) -> anyhow::Result<()> {
6042                while op_start_signal.next().await.is_some() {
6043                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6044                        let worktree = if let Some(worktree) =
6045                            project.read_with(cx, |project, cx| {
6046                                project
6047                                    .worktrees(&cx)
6048                                    .filter(|worktree| {
6049                                        let worktree = worktree.read(cx);
6050                                        worktree.is_visible()
6051                                            && worktree.entries(false).any(|e| e.is_file())
6052                                    })
6053                                    .choose(&mut *rng.lock())
6054                            }) {
6055                            worktree
6056                        } else {
6057                            cx.background().simulate_random_delay().await;
6058                            continue;
6059                        };
6060
6061                        let (worktree_root_name, project_path) =
6062                            worktree.read_with(cx, |worktree, _| {
6063                                let entry = worktree
6064                                    .entries(false)
6065                                    .filter(|e| e.is_file())
6066                                    .choose(&mut *rng.lock())
6067                                    .unwrap();
6068                                (
6069                                    worktree.root_name().to_string(),
6070                                    (worktree.id(), entry.path.clone()),
6071                                )
6072                            });
6073                        log::info!(
6074                            "{}: opening path {:?} in worktree {} ({})",
6075                            guest_username,
6076                            project_path.1,
6077                            project_path.0,
6078                            worktree_root_name,
6079                        );
6080                        let buffer = project
6081                            .update(cx, |project, cx| {
6082                                project.open_buffer(project_path.clone(), cx)
6083                            })
6084                            .await?;
6085                        log::info!(
6086                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6087                            guest_username,
6088                            project_path.1,
6089                            project_path.0,
6090                            worktree_root_name,
6091                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6092                        );
6093                        client.buffers.insert(buffer.clone());
6094                        buffer
6095                    } else {
6096                        client
6097                            .buffers
6098                            .iter()
6099                            .choose(&mut *rng.lock())
6100                            .unwrap()
6101                            .clone()
6102                    };
6103
6104                    let choice = rng.lock().gen_range(0..100);
6105                    match choice {
6106                        0..=9 => {
6107                            cx.update(|cx| {
6108                                log::info!(
6109                                    "{}: dropping buffer {:?}",
6110                                    guest_username,
6111                                    buffer.read(cx).file().unwrap().full_path(cx)
6112                                );
6113                                client.buffers.remove(&buffer);
6114                                drop(buffer);
6115                            });
6116                        }
6117                        10..=19 => {
6118                            let completions = project.update(cx, |project, cx| {
6119                                log::info!(
6120                                    "{}: requesting completions for buffer {} ({:?})",
6121                                    guest_username,
6122                                    buffer.read(cx).remote_id(),
6123                                    buffer.read(cx).file().unwrap().full_path(cx)
6124                                );
6125                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6126                                project.completions(&buffer, offset, cx)
6127                            });
6128                            let completions = cx.background().spawn(async move {
6129                                completions
6130                                    .await
6131                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6132                            });
6133                            if rng.lock().gen_bool(0.3) {
6134                                log::info!("{}: detaching completions request", guest_username);
6135                                cx.update(|cx| completions.detach_and_log_err(cx));
6136                            } else {
6137                                completions.await?;
6138                            }
6139                        }
6140                        20..=29 => {
6141                            let code_actions = project.update(cx, |project, cx| {
6142                                log::info!(
6143                                    "{}: requesting code actions for buffer {} ({:?})",
6144                                    guest_username,
6145                                    buffer.read(cx).remote_id(),
6146                                    buffer.read(cx).file().unwrap().full_path(cx)
6147                                );
6148                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6149                                project.code_actions(&buffer, range, cx)
6150                            });
6151                            let code_actions = cx.background().spawn(async move {
6152                                code_actions.await.map_err(|err| {
6153                                    anyhow!("code actions request failed: {:?}", err)
6154                                })
6155                            });
6156                            if rng.lock().gen_bool(0.3) {
6157                                log::info!("{}: detaching code actions request", guest_username);
6158                                cx.update(|cx| code_actions.detach_and_log_err(cx));
6159                            } else {
6160                                code_actions.await?;
6161                            }
6162                        }
6163                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6164                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6165                                log::info!(
6166                                    "{}: saving buffer {} ({:?})",
6167                                    guest_username,
6168                                    buffer.remote_id(),
6169                                    buffer.file().unwrap().full_path(cx)
6170                                );
6171                                (buffer.version(), buffer.save(cx))
6172                            });
6173                            let save = cx.background().spawn(async move {
6174                                let (saved_version, _) = save
6175                                    .await
6176                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6177                                assert!(saved_version.observed_all(&requested_version));
6178                                Ok::<_, anyhow::Error>(())
6179                            });
6180                            if rng.lock().gen_bool(0.3) {
6181                                log::info!("{}: detaching save request", guest_username);
6182                                cx.update(|cx| save.detach_and_log_err(cx));
6183                            } else {
6184                                save.await?;
6185                            }
6186                        }
6187                        40..=44 => {
6188                            let prepare_rename = project.update(cx, |project, cx| {
6189                                log::info!(
6190                                    "{}: preparing rename for buffer {} ({:?})",
6191                                    guest_username,
6192                                    buffer.read(cx).remote_id(),
6193                                    buffer.read(cx).file().unwrap().full_path(cx)
6194                                );
6195                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6196                                project.prepare_rename(buffer, offset, cx)
6197                            });
6198                            let prepare_rename = cx.background().spawn(async move {
6199                                prepare_rename.await.map_err(|err| {
6200                                    anyhow!("prepare rename request failed: {:?}", err)
6201                                })
6202                            });
6203                            if rng.lock().gen_bool(0.3) {
6204                                log::info!("{}: detaching prepare rename request", guest_username);
6205                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6206                            } else {
6207                                prepare_rename.await?;
6208                            }
6209                        }
6210                        45..=49 => {
6211                            let definitions = project.update(cx, |project, cx| {
6212                                log::info!(
6213                                    "{}: requesting definitions for buffer {} ({:?})",
6214                                    guest_username,
6215                                    buffer.read(cx).remote_id(),
6216                                    buffer.read(cx).file().unwrap().full_path(cx)
6217                                );
6218                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6219                                project.definition(&buffer, offset, cx)
6220                            });
6221                            let definitions = cx.background().spawn(async move {
6222                                definitions
6223                                    .await
6224                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6225                            });
6226                            if rng.lock().gen_bool(0.3) {
6227                                log::info!("{}: detaching definitions request", guest_username);
6228                                cx.update(|cx| definitions.detach_and_log_err(cx));
6229                            } else {
6230                                client
6231                                    .buffers
6232                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6233                            }
6234                        }
6235                        50..=54 => {
6236                            let highlights = project.update(cx, |project, cx| {
6237                                log::info!(
6238                                    "{}: requesting highlights for buffer {} ({:?})",
6239                                    guest_username,
6240                                    buffer.read(cx).remote_id(),
6241                                    buffer.read(cx).file().unwrap().full_path(cx)
6242                                );
6243                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6244                                project.document_highlights(&buffer, offset, cx)
6245                            });
6246                            let highlights = cx.background().spawn(async move {
6247                                highlights
6248                                    .await
6249                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6250                            });
6251                            if rng.lock().gen_bool(0.3) {
6252                                log::info!("{}: detaching highlights request", guest_username);
6253                                cx.update(|cx| highlights.detach_and_log_err(cx));
6254                            } else {
6255                                highlights.await?;
6256                            }
6257                        }
6258                        55..=59 => {
6259                            let search = project.update(cx, |project, cx| {
6260                                let query = rng.lock().gen_range('a'..='z');
6261                                log::info!("{}: project-wide search {:?}", guest_username, query);
6262                                project.search(SearchQuery::text(query, false, false), cx)
6263                            });
6264                            let search = cx.background().spawn(async move {
6265                                search
6266                                    .await
6267                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
6268                            });
6269                            if rng.lock().gen_bool(0.3) {
6270                                log::info!("{}: detaching search request", guest_username);
6271                                cx.update(|cx| search.detach_and_log_err(cx));
6272                            } else {
6273                                client.buffers.extend(search.await?.into_keys());
6274                            }
6275                        }
6276                        _ => {
6277                            buffer.update(cx, |buffer, cx| {
6278                                log::info!(
6279                                    "{}: updating buffer {} ({:?})",
6280                                    guest_username,
6281                                    buffer.remote_id(),
6282                                    buffer.file().unwrap().full_path(cx)
6283                                );
6284                                if rng.lock().gen_bool(0.7) {
6285                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6286                                } else {
6287                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6288                                }
6289                            });
6290                        }
6291                    }
6292                    cx.background().simulate_random_delay().await;
6293                }
6294                Ok(())
6295            }
6296
6297            let result = simulate_guest_internal(
6298                &mut self,
6299                &guest_username,
6300                project.clone(),
6301                op_start_signal,
6302                rng,
6303                &mut cx,
6304            )
6305            .await;
6306            log::info!("{}: done", guest_username);
6307
6308            self.project = Some(project);
6309            (self, cx, result.err())
6310        }
6311    }
6312
6313    impl Drop for TestClient {
6314        fn drop(&mut self) {
6315            self.client.tear_down();
6316        }
6317    }
6318
6319    impl Executor for Arc<gpui::executor::Background> {
6320        type Timer = gpui::executor::Timer;
6321
6322        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6323            self.spawn(future).detach();
6324        }
6325
6326        fn timer(&self, duration: Duration) -> Self::Timer {
6327            self.as_ref().timer(duration)
6328        }
6329    }
6330
6331    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6332        channel
6333            .messages()
6334            .cursor::<()>()
6335            .map(|m| {
6336                (
6337                    m.sender.github_login.clone(),
6338                    m.body.clone(),
6339                    m.is_pending(),
6340                )
6341            })
6342            .collect()
6343    }
6344
6345    struct EmptyView;
6346
6347    impl gpui::Entity for EmptyView {
6348        type Event = ();
6349    }
6350
6351    impl gpui::View for EmptyView {
6352        fn ui_name() -> &'static str {
6353            "empty view"
6354        }
6355
6356        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6357            gpui::Element::boxed(gpui::elements::Empty)
6358        }
6359    }
6360}